diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index d048dd9579..fd59c2d318 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -728,6 +728,30 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
assert int(limit) >= 0
+ # For backwards compatibility we need to check if the token has a
+ # topological part but no chunk part. If that's the case we can use the
+ # stream part to generate an appropriate topological token.
+ if from_token.chunk is None and from_token.topological is not None:
+ res = self._simple_select_one_txn(
+ txn,
+ table="events",
+ keyvalues={
+ "stream_ordering": from_token.stream,
+ },
+ retcols=(
+ "chunk_id",
+ "topological_ordering",
+ "stream_ordering",
+ ),
+ allow_none=True,
+ )
+ if res and res["chunk_id"] is not None:
+ from_token = RoomStreamToken(
+ res["chunk_id"],
+ res["topological_ordering"],
+ res["stream_ordering"],
+ )
+
# Tokens really represent positions between elements, but we use
# the convention of pointing to the event before the gap. Hence
# we have a bit of asymmetry when it comes to equalities.
@@ -778,13 +802,12 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
iterated_chunks = []
chunk_id = None
- if from_token.chunk: # FIXME: may be topological but no chunk.
- if rows:
- chunk_id = rows[-1].chunk_id
- iterated_chunks = [r.chunk_id for r in rows]
- else:
- chunk_id = from_token.chunk
- iterated_chunks = [chunk_id]
+ if rows:
+ chunk_id = rows[-1].chunk_id
+ iterated_chunks = [r.chunk_id for r in rows]
+ elif from_token.chunk:
+ chunk_id = from_token.chunk
+ iterated_chunks = [chunk_id]
table = ChunkDBOrderedListStore(
txn, room_id, self.clock,
|