diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index bedc3c6c52..3ccb6f8a61 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -39,6 +39,8 @@ from ._base import SQLBaseStore
from synapse.api.errors import SynapseError
from synapse.util.logutils import log_function
+from collections import namedtuple
+
import logging
@@ -52,91 +54,79 @@ _STREAM_TOKEN = "stream"
_TOPOLOGICAL_TOKEN = "topological"
-def _parse_stream_token(string):
- try:
- if string[0] != 's':
- raise
- return int(string[1:])
- except:
- raise SynapseError(400, "Invalid token")
-
-
-def _parse_topological_token(string):
- try:
- if string[0] != 't':
- raise
- parts = string[1:].split('-', 1)
- return (int(parts[0]), int(parts[1]))
- except:
- raise SynapseError(400, "Invalid token")
-
-
-def is_stream_token(string):
- try:
- _parse_stream_token(string)
- return True
- except:
- return False
-
-
-def is_topological_token(string):
- try:
- _parse_topological_token(string)
- return True
- except:
- return False
-
-
-def _get_token_bound(token, comparison):
- try:
- s = _parse_stream_token(token)
- return "%s %s %d" % ("stream_ordering", comparison, s)
- except:
- pass
-
- try:
- top, stream = _parse_topological_token(token)
- return "%s %s %d AND %s %s %d" % (
- "topological_ordering", comparison, top,
- "stream_ordering", comparison, stream,
- )
- except:
- pass
-
- raise SynapseError(400, "Invalid token")
-
-
-class StreamStore(SQLBaseStore):
- @log_function
- def get_room_events(self, user_id, from_key, to_key, room_id, limit=0,
- direction='f', with_feedback=False):
- # We deal with events request in two different ways depending on if
- # this looks like an /events request or a pagination request.
- is_events = (
- direction == 'f'
- and user_id
- and is_stream_token(from_key)
- and to_key and is_stream_token(to_key)
- )
+class _StreamToken(namedtuple("_StreamToken", "topological stream")):
+ """Tokens are positions between events. The token "s1" comes after event 1.
+
+ s0 s1
+ | |
+ [0] V [1] V [2]
+
+ Tokens can either be a point in the live event stream or a cursor going
+ through historic events.
+
+ When traversing the live event stream events are ordered by when they
+ arrived at the homeserver.
+
+ When traversing historic events the events are ordered by their depth in
+ the event graph "topological_ordering" and then by when they arrived at the
+ homeserver "stream_ordering".
+
+ Live tokens start with an "s" followed by the "stream_ordering" id of the
+ event it comes after. Historic tokens start with a "t" followed by the
+ "topological_ordering" id of the event it comes after, follewed by "-",
+ followed by the "stream_ordering" id of the event it comes after.
+ """
+ __slots__ = []
+
+ @classmethod
+ def parse(cls, string):
+ try:
+ if string[0] == 's':
+ return cls(topological=None, stream=int(string[1:]))
+ if string[0] == 't':
+ parts = string[1:].split('-', 1)
+ return cls(topological=int(parts[0]), stream=int(parts[1]))
+ except:
+ pass
+ raise SynapseError(400, "Invalid token %r" % (string,))
+
+ @classmethod
+ def parse_stream_token(cls, string):
+ try:
+ if string[0] == 's':
+ return cls(topological=None, stream=int(string[1:]))
+ except:
+ pass
+ raise SynapseError(400, "Invalid token %r" % (string,))
+
+ def __str__(self):
+ if self.topological is not None:
+ return "t%d-%d" % (self.topological, self.stream)
+ else:
+ return "s%d" % (self.stream,)
- if is_events:
- return self.get_room_events_stream(
- user_id=user_id,
- from_key=from_key,
- to_key=to_key,
- room_id=room_id,
- limit=limit,
- with_feedback=with_feedback,
+ def lower_bound(self):
+ if self.topological is None:
+ return "(%d < %s)" % (self.stream, "stream_ordering")
+ else:
+ return "(%d < %s OR (%d == %s AND %d < %s))" % (
+ self.topological, "topological_ordering",
+ self.topological, "topological_ordering",
+ self.stream, "stream_ordering",
)
+
+ def upper_bound(self):
+ if self.topological is None:
+ return "(%d >= %s)" % (self.stream, "stream_ordering")
else:
- return self.paginate_room_events(
- from_key=from_key,
- to_key=to_key,
- room_id=room_id,
- limit=limit,
- with_feedback=with_feedback,
+ return "(%d > %s OR (%d == %s AND %d >= %s))" % (
+ self.topological, "topological_ordering",
+ self.topological, "topological_ordering",
+ self.stream, "stream_ordering",
)
+
+class StreamStore(SQLBaseStore):
@log_function
def get_room_events_stream(self, user_id, from_key, to_key, room_id,
limit=0, with_feedback=False):
@@ -162,8 +152,8 @@ class StreamStore(SQLBaseStore):
limit = MAX_STREAM_SIZE
# From and to keys should be integers from ordering.
- from_id = _parse_stream_token(from_key)
- to_id = _parse_stream_token(to_key)
+ from_id = _StreamToken.parse_stream_token(from_key)
+ to_id = _StreamToken.parse_stream_token(to_key)
if from_key == to_key:
return defer.succeed(([], to_key))
@@ -181,7 +171,7 @@ class StreamStore(SQLBaseStore):
}
def f(txn):
- txn.execute(sql, (user_id, user_id, from_id, to_id,))
+ txn.execute(sql, (user_id, user_id, from_id.stream, to_id.stream,))
rows = self.cursor_to_dict(txn)
@@ -191,8 +181,11 @@ class StreamStore(SQLBaseStore):
get_prev_content=True
)
+ self._set_before_and_after(ret, rows)
+
if rows:
key = "s%d" % max([r["stream_ordering"] for r in rows])
+
else:
# Assume we didn't get anything because there was nothing to
# get.
@@ -211,17 +204,21 @@ class StreamStore(SQLBaseStore):
# Tokens really represent positions between elements, but we use
# the convention of pointing to the event before the gap. Hence
# we have a bit of asymmetry when it comes to equalities.
- from_comp = '<=' if direction == 'b' else '>'
- to_comp = '>' if direction == 'b' else '<='
- order = "DESC" if direction == 'b' else "ASC"
-
args = [room_id]
-
- bounds = _get_token_bound(from_key, from_comp)
- if to_key:
- bounds = "%s AND %s" % (
- bounds, _get_token_bound(to_key, to_comp)
- )
+ if direction == 'b':
+ order = "DESC"
+ bounds = _StreamToken.parse(from_key).upper_bound()
+ if to_key:
+ bounds = "%s AND %s" % (
+ bounds, _StreamToken.parse(to_key).lower_bound()
+ )
+ else:
+ order = "ASC"
+ bounds = _StreamToken.parse(from_key).lower_bound()
+ if to_key:
+ bounds = "%s AND %s" % (
+ bounds, _StreamToken.parse(to_key).upper_bound()
+ )
if int(limit) > 0:
args.append(int(limit))
@@ -249,9 +246,13 @@ class StreamStore(SQLBaseStore):
topo = rows[-1]["topological_ordering"]
toke = rows[-1]["stream_ordering"]
if direction == 'b':
- topo -= 1
+ # Tokens are positions between events.
+ # This token points *after* the last event in the chunk.
+ # We need it to point to the event before it in the chunk
+ # when we are going backwards so we subtract one from the
+ # stream part.
toke -= 1
- next_token = "t%s-%s" % (topo, toke)
+ next_token = str(_StreamToken(topo, toke))
else:
# TODO (erikj): We should work out what to do here instead.
next_token = to_key if to_key else from_key
@@ -262,35 +263,62 @@ class StreamStore(SQLBaseStore):
get_prev_content=True
)
+ self._set_before_and_after(events, rows)
+
return events, next_token,
return self.runInteraction("paginate_room_events", f)
def get_recent_events_for_room(self, room_id, limit, end_token,
- with_feedback=False):
+ with_feedback=False, from_token=None):
# TODO (erikj): Handle compressed feedback
- sql = (
- "SELECT stream_ordering, topological_ordering, event_id FROM events "
- "WHERE room_id = ? AND stream_ordering <= ? AND outlier = 0 "
- "ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ? "
- )
+ end_token = _StreamToken.parse_stream_token(end_token)
- def f(txn):
- txn.execute(sql, (room_id, end_token, limit,))
+ if from_token is None:
+ sql = (
+ "SELECT stream_ordering, topological_ordering, event_id"
+ " FROM events"
+ " WHERE room_id = ? AND stream_ordering <= ? AND outlier = 0"
+ " ORDER BY topological_ordering DESC, stream_ordering DESC"
+ " LIMIT ?"
+ )
+ else:
+ from_token = _StreamToken.parse_stream_token(from_token)
+ sql = (
+ "SELECT stream_ordering, topological_ordering, event_id"
+ " FROM events"
+ " WHERE room_id = ? AND stream_ordering > ?"
+ " AND stream_ordering <= ? AND outlier = 0"
+ " ORDER BY topological_ordering DESC, stream_ordering DESC"
+ " LIMIT ?"
+ )
+
+ def get_recent_events_for_room_txn(txn):
+ if from_token is None:
+ txn.execute(sql, (room_id, end_token.stream, limit,))
+ else:
+ txn.execute(sql, (
+ room_id, from_token.stream, end_token.stream, limit
+ ))
rows = self.cursor_to_dict(txn)
rows.reverse() # As we selected with reverse ordering
if rows:
+ # Tokens are positions between events.
+ # This token points *after* the last event in the chunk.
+ # We need it to point to the event before it in the chunk
+ # since we are going backwards so we subtract one from the
+ # stream part.
topo = rows[0]["topological_ordering"]
- toke = rows[0]["stream_ordering"]
- start_token = "t%s-%s" % (topo, toke)
+ toke = rows[0]["stream_ordering"] - 1
+ start_token = str(_StreamToken(topo, toke))
- token = (start_token, end_token)
+ token = (start_token, str(end_token))
else:
- token = (end_token, end_token)
+ token = (str(end_token), str(end_token))
events = self._get_events_txn(
txn,
@@ -298,9 +326,13 @@ class StreamStore(SQLBaseStore):
get_prev_content=True
)
+ self._set_before_and_after(events, rows)
+
return events, token
- return self.runInteraction("get_recent_events_for_room", f)
+ return self.runInteraction(
+ "get_recent_events_for_room", get_recent_events_for_room_txn
+ )
def get_room_events_max_id(self):
return self.runInteraction(
@@ -322,3 +354,12 @@ class StreamStore(SQLBaseStore):
key = res[0]["m"]
return "s%d" % (key,)
+
+ @staticmethod
+ def _set_before_and_after(events, rows):
+ for event, row in zip(events, rows):
+ stream = row["stream_ordering"]
+ topo = event.depth
+ internal = event.internal_metadata
+ internal.before = str(_StreamToken(topo, stream - 1))
+ internal.after = str(_StreamToken(topo, stream))
|