summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorErik Johnston <erikj@matrix.org>2023-10-05 12:46:28 +0300
committerGitHub <noreply@github.com>2023-10-05 10:46:28 +0100
commit009b47badfed7593cff5f8acbd61e8fddb3ca788 (patch)
tree98061b9652da95e5892430c5058d2880156d0ea9 /synapse/storage
parentAdd type hints to synmark. (#16421) (diff)
downloadsynapse-009b47badfed7593cff5f8acbd61e8fddb3ca788.tar.xz
Factor out `MultiWriter` token from `RoomStreamToken` (#16427)
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/databases/main/stream.py22
1 files changed, 13 insertions, 9 deletions
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index 5a3611c415..ea06e4eee0 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -266,7 +266,7 @@ def generate_next_token(
         # when we are going backwards so we subtract one from the
         # stream part.
         last_stream_ordering -= 1
-    return RoomStreamToken(last_topo_ordering, last_stream_ordering)
+    return RoomStreamToken(topological=last_topo_ordering, stream=last_stream_ordering)
 
 
 def _make_generic_sql_bound(
@@ -558,7 +558,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
                 if p > min_pos
             }
 
-        return RoomStreamToken(None, min_pos, immutabledict(positions))
+        return RoomStreamToken(stream=min_pos, instance_map=immutabledict(positions))
 
     async def get_room_events_stream_for_rooms(
         self,
@@ -708,7 +708,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             ret.reverse()
 
         if rows:
-            key = RoomStreamToken(None, min(r.stream_ordering for r in rows))
+            key = RoomStreamToken(stream=min(r.stream_ordering for r in rows))
         else:
             # Assume we didn't get anything because there was nothing to
             # get.
@@ -969,7 +969,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
         topo = await self.db_pool.runInteraction(
             "_get_max_topological_txn", self._get_max_topological_txn, room_id
         )
-        return RoomStreamToken(topo, stream_ordering)
+        return RoomStreamToken(topological=topo, stream=stream_ordering)
 
     @overload
     def get_stream_id_for_event_txn(
@@ -1033,7 +1033,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             retcols=("stream_ordering", "topological_ordering"),
             desc="get_topological_token_for_event",
         )
-        return RoomStreamToken(row["topological_ordering"], row["stream_ordering"])
+        return RoomStreamToken(
+            topological=row["topological_ordering"], stream=row["stream_ordering"]
+        )
 
     async def get_current_topological_token(self, room_id: str, stream_key: int) -> int:
         """Gets the topological token in a room after or at the given stream
@@ -1114,8 +1116,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             else:
                 topo = None
             internal = event.internal_metadata
-            internal.before = RoomStreamToken(topo, stream - 1)
-            internal.after = RoomStreamToken(topo, stream)
+            internal.before = RoomStreamToken(topological=topo, stream=stream - 1)
+            internal.after = RoomStreamToken(topological=topo, stream=stream)
             internal.order = (int(topo) if topo else 0, int(stream))
 
     async def get_events_around(
@@ -1191,11 +1193,13 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
         # Paginating backwards includes the event at the token, but paginating
         # forward doesn't.
         before_token = RoomStreamToken(
-            results["topological_ordering"] - 1, results["stream_ordering"]
+            topological=results["topological_ordering"] - 1,
+            stream=results["stream_ordering"],
         )
 
         after_token = RoomStreamToken(
-            results["topological_ordering"], results["stream_ordering"]
+            topological=results["topological_ordering"],
+            stream=results["stream_ordering"],
         )
 
         rows, start_token = self._paginate_room_events_txn(