summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/federation.py16
-rw-r--r--synapse/handlers/message.py6
-rw-r--r--synapse/handlers/sync.py10
3 files changed, 18 insertions, 14 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index ea9264e751..9f773aefa7 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -74,6 +74,8 @@ from synapse.storage.databases.main.events_worker import EventRedactBehaviour
 from synapse.types import (
     JsonDict,
     MutableStateMap,
+    PersistedEventPosition,
+    RoomStreamToken,
     StateMap,
     UserID,
     get_domain_from_id,
@@ -2956,7 +2958,7 @@ class FederationHandler(BaseHandler):
             )
             return result["max_stream_id"]
         else:
-            max_stream_id = await self.storage.persistence.persist_events(
+            max_stream_token = await self.storage.persistence.persist_events(
                 event_and_contexts, backfilled=backfilled
             )
 
@@ -2967,12 +2969,12 @@ class FederationHandler(BaseHandler):
 
             if not backfilled:  # Never notify for backfilled events
                 for event, _ in event_and_contexts:
-                    await self._notify_persisted_event(event, max_stream_id)
+                    await self._notify_persisted_event(event, max_stream_token)
 
-            return max_stream_id
+            return max_stream_token.stream
 
     async def _notify_persisted_event(
-        self, event: EventBase, max_stream_id: int
+        self, event: EventBase, max_stream_token: RoomStreamToken
     ) -> None:
         """Checks to see if notifier/pushers should be notified about the
         event or not.
@@ -2998,9 +3000,11 @@ class FederationHandler(BaseHandler):
         elif event.internal_metadata.is_outlier():
             return
 
-        event_stream_id = event.internal_metadata.stream_ordering
+        event_pos = PersistedEventPosition(
+            self._instance_name, event.internal_metadata.stream_ordering
+        )
         self.notifier.on_new_room_event(
-            event, event_stream_id, max_stream_id, extra_users=extra_users
+            event, event_pos, max_stream_token, extra_users=extra_users
         )
 
     async def _clean_room_for_join(self, room_id: str) -> None:
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 6ee559fd1d..ee271e85e5 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -1138,7 +1138,7 @@ class EventCreationHandler:
             if prev_state_ids:
                 raise AuthError(403, "Changing the room create event is forbidden")
 
-        event_stream_id, max_stream_id = await self.storage.persistence.persist_event(
+        event_pos, max_stream_token = await self.storage.persistence.persist_event(
             event, context=context
         )
 
@@ -1149,7 +1149,7 @@ class EventCreationHandler:
         def _notify():
             try:
                 self.notifier.on_new_room_event(
-                    event, event_stream_id, max_stream_id, extra_users=extra_users
+                    event, event_pos, max_stream_token, extra_users=extra_users
                 )
             except Exception:
                 logger.exception("Error notifying about new room event")
@@ -1161,7 +1161,7 @@ class EventCreationHandler:
             # matters as sometimes presence code can take a while.
             run_in_background(self._bump_active_time, requester.user)
 
-        return event_stream_id
+        return event_pos.stream
 
     async def _bump_active_time(self, user: UserID) -> None:
         try:
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 9b3a4f638b..e948efef2e 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -967,7 +967,7 @@ class SyncHandler:
             raise NotImplementedError()
         else:
             joined_room_ids = await self.get_rooms_for_user_at(
-                user_id, now_token.room_stream_id
+                user_id, now_token.room_key
             )
         sync_result_builder = SyncResultBuilder(
             sync_config,
@@ -1916,7 +1916,7 @@ class SyncHandler:
             raise Exception("Unrecognized rtype: %r", room_builder.rtype)
 
     async def get_rooms_for_user_at(
-        self, user_id: str, stream_ordering: int
+        self, user_id: str, room_key: RoomStreamToken
     ) -> FrozenSet[str]:
         """Get set of joined rooms for a user at the given stream ordering.
 
@@ -1942,15 +1942,15 @@ class SyncHandler:
         # If the membership's stream ordering is after the given stream
         # ordering, we need to go and work out if the user was in the room
         # before.
-        for room_id, membership_stream_ordering in joined_rooms:
-            if membership_stream_ordering <= stream_ordering:
+        for room_id, event_pos in joined_rooms:
+            if not event_pos.persisted_after(room_key):
                 joined_room_ids.add(room_id)
                 continue
 
             logger.info("User joined room after current token: %s", room_id)
 
             extrems = await self.store.get_forward_extremeties_for_room(
-                room_id, stream_ordering
+                room_id, event_pos.stream
             )
             users_in_room = await self.state.get_current_users_in_room(room_id, extrems)
             if user_id in users_in_room: