summary refs log tree commit diff
path: root/synapse/storage/stream.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/stream.py')
-rw-r--r--synapse/storage/stream.py32
1 files changed, 18 insertions, 14 deletions
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index d3adb0bf37..ce98587608 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -738,16 +738,16 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
     def has_room_changed_since(self, room_id, stream_id):
         return self._events_stream_cache.has_entity_changed(room_id, stream_id)
 
-    def paginate_room_events_txn(self, txn, room_id, from_key, to_key=None,
+    def paginate_room_events_txn(self, txn, room_id, from_token, to_token=None,
                                  direction='b', limit=-1, event_filter=None):
         """Returns list of events before or after a given token.
 
         Args:
             txn
             room_id (str)
-            from_key (str): The token used to stream from
-            to_key (str|None): A token which if given limits the results to
-                only those before
+            from_token (RoomStreamToken): The token used to stream from
+            to_token (RoomStreamToken|None): A token which if given limits the
+                results to only those before
             direction(char): Either 'b' or 'f' to indicate whether we are
                 paginating forwards or backwards from `from_key`.
             limit (int): The maximum number of events to return. Zero or less
@@ -757,7 +757,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
 
         Returns:
             tuple[list[dict], str]: Returns the results as a list of dicts and
-            a token that points to the end of the result set. The dicts have
+            a token that points to the end of the result set. The dicts haveq
             the keys "event_id", "toplogical_ordering" and "stream_orderign".
         """
         # Tokens really represent positions between elements, but we use
@@ -767,20 +767,20 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
         if direction == 'b':
             order = "DESC"
             bounds = upper_bound(
-                RoomStreamToken.parse(from_key), self.database_engine
+                from_token, self.database_engine
             )
-            if to_key:
+            if to_token:
                 bounds = "%s AND %s" % (bounds, lower_bound(
-                    RoomStreamToken.parse(to_key), self.database_engine
+                    to_token, self.database_engine
                 ))
         else:
             order = "ASC"
             bounds = lower_bound(
-                RoomStreamToken.parse(from_key), self.database_engine
+                from_token, self.database_engine
             )
-            if to_key:
+            if to_token:
                 bounds = "%s AND %s" % (bounds, upper_bound(
-                    RoomStreamToken.parse(to_key), self.database_engine
+                    to_token, self.database_engine
                 ))
 
         filter_clause, filter_args = filter_to_clause(event_filter)
@@ -821,12 +821,12 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
                 # when we are going backwards so we subtract one from the
                 # stream part.
                 toke -= 1
-            next_token = str(RoomStreamToken(topo, toke))
+            next_token = RoomStreamToken(topo, toke)
         else:
             # TODO (erikj): We should work out what to do here instead.
-            next_token = to_key if to_key else from_key
+            next_token = to_token if to_token else from_token
 
-        return rows, next_token,
+        return rows, str(next_token),
 
     @defer.inlineCallbacks
     def paginate_room_events(self, room_id, from_key, to_key=None,
@@ -851,6 +851,10 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             the keys "event_id", "toplogical_ordering" and "stream_orderign".
         """
 
+        from_key = RoomStreamToken.parse(from_key)
+        if to_key:
+            to_key = RoomStreamToken.parse(to_key)
+
         rows, token = yield self.runInteraction(
             "paginate_room_events", self.paginate_room_events_txn,
             room_id, from_key, to_key, direction, limit, event_filter,