summary refs log tree commit diff
path: root/synapse/storage/stream.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2018-05-09 11:59:45 +0100
committerErik Johnston <erik@matrix.org>2018-05-09 13:43:39 +0100
commite2accd7f1d21e34181dd4543eca30ad1ea971b4c (patch)
treeabfd1d074f7a123141bceeba345b47467822a8b2 /synapse/storage/stream.py
parentDon't unnecessarily require token to be stream token (diff)
downloadsynapse-e2accd7f1d21e34181dd4543eca30ad1ea971b4c.tar.xz
Refactor sync APIs to reuse pagination API
The sync API often returns events in a topological rather than stream
ordering, e.g. when the user joined the room or on initial sync. When
this happens we can reuse existing pagination storage functions.
Diffstat (limited to 'synapse/storage/stream.py')
-rw-r--r--synapse/storage/stream.py73
1 files changed, 35 insertions, 38 deletions
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 5e4327bb96..8bb4e85709 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -233,52 +233,49 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
     @defer.inlineCallbacks
     def get_room_events_stream_for_room(self, room_id, from_key, to_key, limit=0,
                                         order='DESC'):
-        # Note: If from_key is None then we return in topological order. This
-        # is because in that case we're using this as a "get the last few messages
-        # in a room" function, rather than "get new messages since last sync"
-        if from_key is not None:
-            from_id = RoomStreamToken.parse_stream_token(from_key).stream
-        else:
-            from_id = None
-        to_id = RoomStreamToken.parse_stream_token(to_key).stream
 
+        """Get new room events in stream ordering since `from_key`.
+
+        Args:
+            room_id (str)
+            from_key (str): Token from which no events are returned before
+            to_key (str): Token from which no events are returned after. (This
+                is typically the current stream token)
+            limit (int): Maximum number of events to return
+            order (str): Either "DESC" or "ASC". Determines which events are
+                returned when the result is limited. If "DESC" then the most
+                recent `limit` events are returned, otherwise returns the
+                oldest `limit` events.
+
+        Returns:
+            Deferred[tuple[list[FrozenEvent], str]]: Returns the list of
+            events (in ascending order) and the token from the start of
+            the chunk of events returned.
+        """
         if from_key == to_key:
             defer.returnValue(([], from_key))
 
-        if from_id:
-            has_changed = yield self._events_stream_cache.has_entity_changed(
-                room_id, from_id
-            )
+        from_id = RoomStreamToken.parse_stream_token(from_key).stream
+        to_id = RoomStreamToken.parse_stream_token(to_key).stream
 
-            if not has_changed:
-                defer.returnValue(([], from_key))
+        has_changed = yield self._events_stream_cache.has_entity_changed(
+            room_id, from_id
+        )
 
-        def f(txn):
-            if from_id is not None:
-                sql = (
-                    "SELECT event_id, stream_ordering FROM events WHERE"
-                    " room_id = ?"
-                    " AND not outlier"
-                    " AND stream_ordering > ? AND stream_ordering <= ?"
-                    " ORDER BY stream_ordering %s LIMIT ?"
-                ) % (order,)
-                txn.execute(sql, (room_id, from_id, to_id, limit))
-
-                rows = [_EventDictReturn(row[0], None, row[1]) for row in txn]
-            else:
-                sql = (
-                    "SELECT event_id, topological_ordering, stream_ordering"
-                    " FROM events"
-                    " WHERE"
-                    " room_id = ?"
-                    " AND not outlier"
-                    " AND stream_ordering <= ?"
-                    " ORDER BY topological_ordering %s, stream_ordering %s LIMIT ?"
-                ) % (order, order,)
-                txn.execute(sql, (room_id, to_id, limit))
+        if not has_changed:
+            defer.returnValue(([], from_key))
 
-                rows = [_EventDictReturn(row[0], row[1], row[2]) for row in txn]
+        def f(txn):
+            sql = (
+                "SELECT event_id, stream_ordering FROM events WHERE"
+                " room_id = ?"
+                " AND not outlier"
+                " AND stream_ordering > ? AND stream_ordering <= ?"
+                " ORDER BY stream_ordering %s LIMIT ?"
+            ) % (order,)
+            txn.execute(sql, (room_id, from_id, to_id, limit))
 
+            rows = [_EventDictReturn(row[0], None, row[1]) for row in txn]
             return rows
 
         rows = yield self.runInteraction("get_room_events_stream_for_room", f)