diff options
Diffstat (limited to '')
-rw-r--r-- | synapse/storage/stream.py | 257 |
1 files changed, 149 insertions, 108 deletions
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index bedc3c6c52..3ccb6f8a61 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -39,6 +39,8 @@ from ._base import SQLBaseStore from synapse.api.errors import SynapseError from synapse.util.logutils import log_function +from collections import namedtuple + import logging @@ -52,91 +54,79 @@ _STREAM_TOKEN = "stream" _TOPOLOGICAL_TOKEN = "topological" -def _parse_stream_token(string): - try: - if string[0] != 's': - raise - return int(string[1:]) - except: - raise SynapseError(400, "Invalid token") - - -def _parse_topological_token(string): - try: - if string[0] != 't': - raise - parts = string[1:].split('-', 1) - return (int(parts[0]), int(parts[1])) - except: - raise SynapseError(400, "Invalid token") - - -def is_stream_token(string): - try: - _parse_stream_token(string) - return True - except: - return False - - -def is_topological_token(string): - try: - _parse_topological_token(string) - return True - except: - return False - - -def _get_token_bound(token, comparison): - try: - s = _parse_stream_token(token) - return "%s %s %d" % ("stream_ordering", comparison, s) - except: - pass - - try: - top, stream = _parse_topological_token(token) - return "%s %s %d AND %s %s %d" % ( - "topological_ordering", comparison, top, - "stream_ordering", comparison, stream, - ) - except: - pass - - raise SynapseError(400, "Invalid token") - - -class StreamStore(SQLBaseStore): - @log_function - def get_room_events(self, user_id, from_key, to_key, room_id, limit=0, - direction='f', with_feedback=False): - # We deal with events request in two different ways depending on if - # this looks like an /events request or a pagination request. - is_events = ( - direction == 'f' - and user_id - and is_stream_token(from_key) - and to_key and is_stream_token(to_key) - ) +class _StreamToken(namedtuple("_StreamToken", "topological stream")): + """Tokens are positions between events. The token "s1" comes after event 1. + + s0 s1 + | | + [0] V [1] V [2] + + Tokens can either be a point in the live event stream or a cursor going + through historic events. + + When traversing the live event stream events are ordered by when they + arrived at the homeserver. + + When traversing historic events the events are ordered by their depth in + the event graph "topological_ordering" and then by when they arrived at the + homeserver "stream_ordering". + + Live tokens start with an "s" followed by the "stream_ordering" id of the + event it comes after. Historic tokens start with a "t" followed by the + "topological_ordering" id of the event it comes after, follewed by "-", + followed by the "stream_ordering" id of the event it comes after. + """ + __slots__ = [] + + @classmethod + def parse(cls, string): + try: + if string[0] == 's': + return cls(topological=None, stream=int(string[1:])) + if string[0] == 't': + parts = string[1:].split('-', 1) + return cls(topological=int(parts[0]), stream=int(parts[1])) + except: + pass + raise SynapseError(400, "Invalid token %r" % (string,)) + + @classmethod + def parse_stream_token(cls, string): + try: + if string[0] == 's': + return cls(topological=None, stream=int(string[1:])) + except: + pass + raise SynapseError(400, "Invalid token %r" % (string,)) + + def __str__(self): + if self.topological is not None: + return "t%d-%d" % (self.topological, self.stream) + else: + return "s%d" % (self.stream,) - if is_events: - return self.get_room_events_stream( - user_id=user_id, - from_key=from_key, - to_key=to_key, - room_id=room_id, - limit=limit, - with_feedback=with_feedback, + def lower_bound(self): + if self.topological is None: + return "(%d < %s)" % (self.stream, "stream_ordering") + else: + return "(%d < %s OR (%d == %s AND %d < %s))" % ( + self.topological, "topological_ordering", + self.topological, "topological_ordering", + self.stream, "stream_ordering", ) + + def upper_bound(self): + if self.topological is None: + return "(%d >= %s)" % (self.stream, "stream_ordering") else: - return self.paginate_room_events( - from_key=from_key, - to_key=to_key, - room_id=room_id, - limit=limit, - with_feedback=with_feedback, + return "(%d > %s OR (%d == %s AND %d >= %s))" % ( + self.topological, "topological_ordering", + self.topological, "topological_ordering", + self.stream, "stream_ordering", ) + +class StreamStore(SQLBaseStore): @log_function def get_room_events_stream(self, user_id, from_key, to_key, room_id, limit=0, with_feedback=False): @@ -162,8 +152,8 @@ class StreamStore(SQLBaseStore): limit = MAX_STREAM_SIZE # From and to keys should be integers from ordering. - from_id = _parse_stream_token(from_key) - to_id = _parse_stream_token(to_key) + from_id = _StreamToken.parse_stream_token(from_key) + to_id = _StreamToken.parse_stream_token(to_key) if from_key == to_key: return defer.succeed(([], to_key)) @@ -181,7 +171,7 @@ class StreamStore(SQLBaseStore): } def f(txn): - txn.execute(sql, (user_id, user_id, from_id, to_id,)) + txn.execute(sql, (user_id, user_id, from_id.stream, to_id.stream,)) rows = self.cursor_to_dict(txn) @@ -191,8 +181,11 @@ class StreamStore(SQLBaseStore): get_prev_content=True ) + self._set_before_and_after(ret, rows) + if rows: key = "s%d" % max([r["stream_ordering"] for r in rows]) + else: # Assume we didn't get anything because there was nothing to # get. @@ -211,17 +204,21 @@ class StreamStore(SQLBaseStore): # Tokens really represent positions between elements, but we use # the convention of pointing to the event before the gap. Hence # we have a bit of asymmetry when it comes to equalities. - from_comp = '<=' if direction == 'b' else '>' - to_comp = '>' if direction == 'b' else '<=' - order = "DESC" if direction == 'b' else "ASC" - args = [room_id] - - bounds = _get_token_bound(from_key, from_comp) - if to_key: - bounds = "%s AND %s" % ( - bounds, _get_token_bound(to_key, to_comp) - ) + if direction == 'b': + order = "DESC" + bounds = _StreamToken.parse(from_key).upper_bound() + if to_key: + bounds = "%s AND %s" % ( + bounds, _StreamToken.parse(to_key).lower_bound() + ) + else: + order = "ASC" + bounds = _StreamToken.parse(from_key).lower_bound() + if to_key: + bounds = "%s AND %s" % ( + bounds, _StreamToken.parse(to_key).upper_bound() + ) if int(limit) > 0: args.append(int(limit)) @@ -249,9 +246,13 @@ class StreamStore(SQLBaseStore): topo = rows[-1]["topological_ordering"] toke = rows[-1]["stream_ordering"] if direction == 'b': - topo -= 1 + # Tokens are positions between events. + # This token points *after* the last event in the chunk. + # We need it to point to the event before it in the chunk + # when we are going backwards so we subtract one from the + # stream part. toke -= 1 - next_token = "t%s-%s" % (topo, toke) + next_token = str(_StreamToken(topo, toke)) else: # TODO (erikj): We should work out what to do here instead. next_token = to_key if to_key else from_key @@ -262,35 +263,62 @@ class StreamStore(SQLBaseStore): get_prev_content=True ) + self._set_before_and_after(events, rows) + return events, next_token, return self.runInteraction("paginate_room_events", f) def get_recent_events_for_room(self, room_id, limit, end_token, - with_feedback=False): + with_feedback=False, from_token=None): # TODO (erikj): Handle compressed feedback - sql = ( - "SELECT stream_ordering, topological_ordering, event_id FROM events " - "WHERE room_id = ? AND stream_ordering <= ? AND outlier = 0 " - "ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ? " - ) + end_token = _StreamToken.parse_stream_token(end_token) - def f(txn): - txn.execute(sql, (room_id, end_token, limit,)) + if from_token is None: + sql = ( + "SELECT stream_ordering, topological_ordering, event_id" + " FROM events" + " WHERE room_id = ? AND stream_ordering <= ? AND outlier = 0" + " ORDER BY topological_ordering DESC, stream_ordering DESC" + " LIMIT ?" + ) + else: + from_token = _StreamToken.parse_stream_token(from_token) + sql = ( + "SELECT stream_ordering, topological_ordering, event_id" + " FROM events" + " WHERE room_id = ? AND stream_ordering > ?" + " AND stream_ordering <= ? AND outlier = 0" + " ORDER BY topological_ordering DESC, stream_ordering DESC" + " LIMIT ?" + ) + + def get_recent_events_for_room_txn(txn): + if from_token is None: + txn.execute(sql, (room_id, end_token.stream, limit,)) + else: + txn.execute(sql, ( + room_id, from_token.stream, end_token.stream, limit + )) rows = self.cursor_to_dict(txn) rows.reverse() # As we selected with reverse ordering if rows: + # Tokens are positions between events. + # This token points *after* the last event in the chunk. + # We need it to point to the event before it in the chunk + # since we are going backwards so we subtract one from the + # stream part. topo = rows[0]["topological_ordering"] - toke = rows[0]["stream_ordering"] - start_token = "t%s-%s" % (topo, toke) + toke = rows[0]["stream_ordering"] - 1 + start_token = str(_StreamToken(topo, toke)) - token = (start_token, end_token) + token = (start_token, str(end_token)) else: - token = (end_token, end_token) + token = (str(end_token), str(end_token)) events = self._get_events_txn( txn, @@ -298,9 +326,13 @@ class StreamStore(SQLBaseStore): get_prev_content=True ) + self._set_before_and_after(events, rows) + return events, token - return self.runInteraction("get_recent_events_for_room", f) + return self.runInteraction( + "get_recent_events_for_room", get_recent_events_for_room_txn + ) def get_room_events_max_id(self): return self.runInteraction( @@ -322,3 +354,12 @@ class StreamStore(SQLBaseStore): key = res[0]["m"] return "s%d" % (key,) + + @staticmethod + def _set_before_and_after(events, rows): + for event, row in zip(events, rows): + stream = row["stream_ordering"] + topo = event.depth + internal = event.internal_metadata + internal.before = str(_StreamToken(topo, stream - 1)) + internal.after = str(_StreamToken(topo, stream)) |