summary refs log tree commit diff
path: root/synapse/storage/stream.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/stream.py')
-rw-r--r--synapse/storage/stream.py37
1 files changed, 30 insertions, 7 deletions
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index d048dd9579..fd59c2d318 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -728,6 +728,30 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
 
         assert int(limit) >= 0
 
+        # For backwards compatibility we need to check if the token has a
+        # topological part but no chunk part. If that's the case we can use the
+        # stream part to generate an appropriate topological token.
+        if from_token.chunk is None and from_token.topological is not None:
+            res = self._simple_select_one_txn(
+                txn,
+                table="events",
+                keyvalues={
+                    "stream_ordering": from_token.stream,
+                },
+                retcols=(
+                    "chunk_id",
+                    "topological_ordering",
+                    "stream_ordering",
+                ),
+                allow_none=True,
+            )
+            if res and res["chunk_id"] is not None:
+                from_token = RoomStreamToken(
+                    res["chunk_id"],
+                    res["topological_ordering"],
+                    res["stream_ordering"],
+                )
+
         # Tokens really represent positions between elements, but we use
         # the convention of pointing to the event before the gap. Hence
         # we have a bit of asymmetry when it comes to equalities.
@@ -778,13 +802,12 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
         iterated_chunks = []
 
         chunk_id = None
-        if from_token.chunk:  # FIXME: may be topological but no chunk.
-            if rows:
-                chunk_id = rows[-1].chunk_id
-                iterated_chunks = [r.chunk_id for r in rows]
-            else:
-                chunk_id = from_token.chunk
-                iterated_chunks = [chunk_id]
+        if rows:
+            chunk_id = rows[-1].chunk_id
+            iterated_chunks = [r.chunk_id for r in rows]
+        elif from_token.chunk:
+            chunk_id = from_token.chunk
+            iterated_chunks = [chunk_id]
 
         table = ChunkDBOrderedListStore(
             txn, room_id, self.clock,