summary refs log tree commit diff
path: root/synapse/storage/stream.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/stream.py')
-rw-r--r--synapse/storage/stream.py257
1 files changed, 149 insertions, 108 deletions
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index bedc3c6c52..3ccb6f8a61 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -39,6 +39,8 @@ from ._base import SQLBaseStore
 from synapse.api.errors import SynapseError
 from synapse.util.logutils import log_function
 
+from collections import namedtuple
+
 import logging
 
 
@@ -52,91 +54,79 @@ _STREAM_TOKEN = "stream"
 _TOPOLOGICAL_TOKEN = "topological"
 
 
-def _parse_stream_token(string):
-    try:
-        if string[0] != 's':
-            raise
-        return int(string[1:])
-    except:
-        raise SynapseError(400, "Invalid token")
-
-
-def _parse_topological_token(string):
-    try:
-        if string[0] != 't':
-            raise
-        parts = string[1:].split('-', 1)
-        return (int(parts[0]), int(parts[1]))
-    except:
-        raise SynapseError(400, "Invalid token")
-
-
-def is_stream_token(string):
-    try:
-        _parse_stream_token(string)
-        return True
-    except:
-        return False
-
-
-def is_topological_token(string):
-    try:
-        _parse_topological_token(string)
-        return True
-    except:
-        return False
-
-
-def _get_token_bound(token, comparison):
-    try:
-        s = _parse_stream_token(token)
-        return "%s %s %d" % ("stream_ordering", comparison, s)
-    except:
-        pass
-
-    try:
-        top, stream = _parse_topological_token(token)
-        return "%s %s %d AND %s %s %d" % (
-            "topological_ordering", comparison, top,
-            "stream_ordering", comparison, stream,
-        )
-    except:
-        pass
-
-    raise SynapseError(400, "Invalid token")
-
-
-class StreamStore(SQLBaseStore):
-    @log_function
-    def get_room_events(self, user_id, from_key, to_key, room_id, limit=0,
-                        direction='f', with_feedback=False):
-        # We deal with events request in two different ways depending on if
-        # this looks like an /events request or a pagination request.
-        is_events = (
-            direction == 'f'
-            and user_id
-            and is_stream_token(from_key)
-            and to_key and is_stream_token(to_key)
-        )
+class _StreamToken(namedtuple("_StreamToken", "topological stream")):
+    """Tokens are positions between events. The token "s1" comes after event 1.
+
+            s0    s1
+            |     |
+        [0] V [1] V [2]
+
+    Tokens can either be a point in the live event stream or a cursor going
+    through historic events.
+
+    When traversing the live event stream events are ordered by when they
+    arrived at the homeserver.
+
+    When traversing historic events the events are ordered by their depth in
+    the event graph "topological_ordering" and then by when they arrived at the
+    homeserver "stream_ordering".
+
+    Live tokens start with an "s" followed by the "stream_ordering" id of the
+    event it comes after. Historic tokens start with a "t" followed by the
+    "topological_ordering" id of the event it comes after, follewed by "-",
+    followed by the "stream_ordering" id of the event it comes after.
+    """
+    __slots__ = []
+
+    @classmethod
+    def parse(cls, string):
+        try:
+            if string[0] == 's':
+                return cls(topological=None, stream=int(string[1:]))
+            if string[0] == 't':
+                parts = string[1:].split('-', 1)
+                return cls(topological=int(parts[0]), stream=int(parts[1]))
+        except:
+            pass
+        raise SynapseError(400, "Invalid token %r" % (string,))
+
+    @classmethod
+    def parse_stream_token(cls, string):
+        try:
+            if string[0] == 's':
+                return cls(topological=None, stream=int(string[1:]))
+        except:
+            pass
+        raise SynapseError(400, "Invalid token %r" % (string,))
+
+    def __str__(self):
+        if self.topological is not None:
+            return "t%d-%d" % (self.topological, self.stream)
+        else:
+            return "s%d" % (self.stream,)
 
-        if is_events:
-            return self.get_room_events_stream(
-                user_id=user_id,
-                from_key=from_key,
-                to_key=to_key,
-                room_id=room_id,
-                limit=limit,
-                with_feedback=with_feedback,
+    def lower_bound(self):
+        if self.topological is None:
+            return "(%d < %s)" % (self.stream, "stream_ordering")
+        else:
+            return "(%d < %s OR (%d == %s AND %d < %s))" % (
+                self.topological, "topological_ordering",
+                self.topological, "topological_ordering",
+                self.stream, "stream_ordering",
             )
+
+    def upper_bound(self):
+        if self.topological is None:
+            return "(%d >= %s)" % (self.stream, "stream_ordering")
         else:
-            return self.paginate_room_events(
-                from_key=from_key,
-                to_key=to_key,
-                room_id=room_id,
-                limit=limit,
-                with_feedback=with_feedback,
+            return "(%d > %s OR (%d == %s AND %d >= %s))" % (
+                self.topological, "topological_ordering",
+                self.topological, "topological_ordering",
+                self.stream, "stream_ordering",
             )
 
+
+class StreamStore(SQLBaseStore):
     @log_function
     def get_room_events_stream(self, user_id, from_key, to_key, room_id,
                                limit=0, with_feedback=False):
@@ -162,8 +152,8 @@ class StreamStore(SQLBaseStore):
             limit = MAX_STREAM_SIZE
 
         # From and to keys should be integers from ordering.
-        from_id = _parse_stream_token(from_key)
-        to_id = _parse_stream_token(to_key)
+        from_id = _StreamToken.parse_stream_token(from_key)
+        to_id = _StreamToken.parse_stream_token(to_key)
 
         if from_key == to_key:
             return defer.succeed(([], to_key))
@@ -181,7 +171,7 @@ class StreamStore(SQLBaseStore):
         }
 
         def f(txn):
-            txn.execute(sql, (user_id, user_id, from_id, to_id,))
+            txn.execute(sql, (user_id, user_id, from_id.stream, to_id.stream,))
 
             rows = self.cursor_to_dict(txn)
 
@@ -191,8 +181,11 @@ class StreamStore(SQLBaseStore):
                 get_prev_content=True
             )
 
+            self._set_before_and_after(ret, rows)
+
             if rows:
                 key = "s%d" % max([r["stream_ordering"] for r in rows])
+
             else:
                 # Assume we didn't get anything because there was nothing to
                 # get.
@@ -211,17 +204,21 @@ class StreamStore(SQLBaseStore):
         # Tokens really represent positions between elements, but we use
         # the convention of pointing to the event before the gap. Hence
         # we have a bit of asymmetry when it comes to equalities.
-        from_comp = '<=' if direction == 'b' else '>'
-        to_comp = '>' if direction == 'b' else '<='
-        order = "DESC" if direction == 'b' else "ASC"
-
         args = [room_id]
-
-        bounds = _get_token_bound(from_key, from_comp)
-        if to_key:
-            bounds = "%s AND %s" % (
-                bounds, _get_token_bound(to_key, to_comp)
-            )
+        if direction == 'b':
+            order = "DESC"
+            bounds = _StreamToken.parse(from_key).upper_bound()
+            if to_key:
+                bounds = "%s AND %s" % (
+                    bounds, _StreamToken.parse(to_key).lower_bound()
+                )
+        else:
+            order = "ASC"
+            bounds = _StreamToken.parse(from_key).lower_bound()
+            if to_key:
+                bounds = "%s AND %s" % (
+                    bounds, _StreamToken.parse(to_key).upper_bound()
+                )
 
         if int(limit) > 0:
             args.append(int(limit))
@@ -249,9 +246,13 @@ class StreamStore(SQLBaseStore):
                 topo = rows[-1]["topological_ordering"]
                 toke = rows[-1]["stream_ordering"]
                 if direction == 'b':
-                    topo -= 1
+                    # Tokens are positions between events.
+                    # This token points *after* the last event in the chunk.
+                    # We need it to point to the event before it in the chunk
+                    # when we are going backwards so we subtract one from the
+                    # stream part.
                     toke -= 1
-                next_token = "t%s-%s" % (topo, toke)
+                next_token = str(_StreamToken(topo, toke))
             else:
                 # TODO (erikj): We should work out what to do here instead.
                 next_token = to_key if to_key else from_key
@@ -262,35 +263,62 @@ class StreamStore(SQLBaseStore):
                 get_prev_content=True
             )
 
+            self._set_before_and_after(events, rows)
+
             return events, next_token,
 
         return self.runInteraction("paginate_room_events", f)
 
     def get_recent_events_for_room(self, room_id, limit, end_token,
-                                   with_feedback=False):
+                                   with_feedback=False, from_token=None):
         # TODO (erikj): Handle compressed feedback
 
-        sql = (
-            "SELECT stream_ordering, topological_ordering, event_id FROM events "
-            "WHERE room_id = ? AND stream_ordering <= ? AND outlier = 0 "
-            "ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ? "
-        )
+        end_token = _StreamToken.parse_stream_token(end_token)
 
-        def f(txn):
-            txn.execute(sql, (room_id, end_token, limit,))
+        if from_token is None:
+            sql = (
+                "SELECT stream_ordering, topological_ordering, event_id"
+                " FROM events"
+                " WHERE room_id = ? AND stream_ordering <= ? AND outlier = 0"
+                " ORDER BY topological_ordering DESC, stream_ordering DESC"
+                " LIMIT ?"
+            )
+        else:
+            from_token = _StreamToken.parse_stream_token(from_token)
+            sql = (
+                "SELECT stream_ordering, topological_ordering, event_id"
+                " FROM events"
+                " WHERE room_id = ? AND stream_ordering > ?"
+                " AND stream_ordering <= ? AND outlier = 0"
+                " ORDER BY topological_ordering DESC, stream_ordering DESC"
+                " LIMIT ?"
+            )
+
+        def get_recent_events_for_room_txn(txn):
+            if from_token is None:
+                txn.execute(sql, (room_id, end_token.stream, limit,))
+            else:
+                txn.execute(sql, (
+                    room_id, from_token.stream, end_token.stream, limit
+                ))
 
             rows = self.cursor_to_dict(txn)
 
             rows.reverse()  # As we selected with reverse ordering
 
             if rows:
+                # Tokens are positions between events.
+                # This token points *after* the last event in the chunk.
+                # We need it to point to the event before it in the chunk
+                # since we are going backwards so we subtract one from the
+                # stream part.
                 topo = rows[0]["topological_ordering"]
-                toke = rows[0]["stream_ordering"]
-                start_token = "t%s-%s" % (topo, toke)
+                toke = rows[0]["stream_ordering"] - 1
+                start_token = str(_StreamToken(topo, toke))
 
-                token = (start_token, end_token)
+                token = (start_token, str(end_token))
             else:
-                token = (end_token, end_token)
+                token = (str(end_token), str(end_token))
 
             events = self._get_events_txn(
                 txn,
@@ -298,9 +326,13 @@ class StreamStore(SQLBaseStore):
                 get_prev_content=True
             )
 
+            self._set_before_and_after(events, rows)
+
             return events, token
 
-        return self.runInteraction("get_recent_events_for_room", f)
+        return self.runInteraction(
+            "get_recent_events_for_room", get_recent_events_for_room_txn
+        )
 
     def get_room_events_max_id(self):
         return self.runInteraction(
@@ -322,3 +354,12 @@ class StreamStore(SQLBaseStore):
 
         key = res[0]["m"]
         return "s%d" % (key,)
+
+    @staticmethod
+    def _set_before_and_after(events, rows):
+        for event, row in zip(events, rows):
+            stream = row["stream_ordering"]
+            topo = event.depth
+            internal = event.internal_metadata
+            internal.before = str(_StreamToken(topo, stream - 1))
+            internal.after = str(_StreamToken(topo, stream))