summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/handlers/device.py4
-rw-r--r--synapse/handlers/room.py2
-rw-r--r--synapse/replication/tcp/client.py6
-rw-r--r--synapse/storage/databases/main/stream.py3
-rw-r--r--synapse/storage/persist_events.py5
5 files changed, 9 insertions, 11 deletions
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 4149520d6c..b9d9098104 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -29,7 +29,6 @@ from synapse.api.errors import (
 from synapse.logging.opentracing import log_kv, set_tag, trace
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.types import (
-    RoomStreamToken,
     StreamToken,
     get_domain_from_id,
     get_verify_key_from_cross_signing_key,
@@ -113,8 +112,7 @@ class DeviceWorkerHandler(BaseHandler):
 
         set_tag("user_id", user_id)
         set_tag("from_token", from_token)
-        now_room_id = self.store.get_room_max_stream_ordering()
-        now_room_key = RoomStreamToken(None, now_room_id)
+        now_room_key = self.store.get_room_max_token()
 
         room_ids = await self.store.get_rooms_for_user(user_id)
 
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 11bf146bed..b34d014db1 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -1141,7 +1141,7 @@ class RoomEventSource:
         return (events, end_key)
 
     def get_current_key(self) -> RoomStreamToken:
-        return RoomStreamToken(None, self.store.get_room_max_stream_ordering())
+        return self.store.get_room_max_token()
 
     def get_current_key_for_room(self, room_id: str) -> Awaitable[str]:
         return self.store.get_room_events_max_id(room_id)
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index 55af3d41ea..e165429cad 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -29,7 +29,7 @@ from synapse.replication.tcp.streams.events import (
     EventsStreamEventRow,
     EventsStreamRow,
 )
-from synapse.types import PersistedEventPosition, RoomStreamToken, UserID
+from synapse.types import PersistedEventPosition, UserID
 from synapse.util.async_helpers import timeout_deferred
 from synapse.util.metrics import Measure
 
@@ -152,9 +152,7 @@ class ReplicationDataHandler:
                 if event.type == EventTypes.Member:
                     extra_users = (UserID.from_string(event.state_key),)
 
-                max_token = RoomStreamToken(
-                    None, self.store.get_room_max_stream_ordering()
-                )
+                max_token = self.store.get_room_max_token()
                 event_pos = PersistedEventPosition(instance_name, token)
                 self.notifier.on_new_room_event(
                     event, event_pos, max_token, extra_users
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index 92e96468b4..255d363b33 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -305,6 +305,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
     def get_room_min_stream_ordering(self) -> int:
         raise NotImplementedError()
 
+    def get_room_max_token(self) -> RoomStreamToken:
+        return RoomStreamToken(None, self.get_room_max_stream_ordering())
+
     async def get_room_events_stream_for_rooms(
         self,
         room_ids: Collection[str],
diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py
index 603cd7d825..c7f1170dc8 100644
--- a/synapse/storage/persist_events.py
+++ b/synapse/storage/persist_events.py
@@ -229,7 +229,7 @@ class EventsPersistenceStorage:
             defer.gatherResults(deferreds, consumeErrors=True)
         )
 
-        return RoomStreamToken(None, self.main_store.get_current_events_token())
+        return self.main_store.get_room_max_token()
 
     async def persist_event(
         self, event: EventBase, context: EventContext, backfilled: bool = False
@@ -247,11 +247,10 @@ class EventsPersistenceStorage:
 
         await make_deferred_yieldable(deferred)
 
-        max_persisted_id = self.main_store.get_current_events_token()
         event_stream_id = event.internal_metadata.stream_ordering
 
         pos = PersistedEventPosition(self._instance_name, event_stream_id)
-        return pos, RoomStreamToken(None, max_persisted_id)
+        return pos, self.main_store.get_room_max_token()
 
     def _maybe_start_persisting(self, room_id: str):
         async def persisting_queue(item):