diff options
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/federation.py | 16 | ||||
-rw-r--r-- | synapse/handlers/message.py | 6 | ||||
-rw-r--r-- | synapse/handlers/sync.py | 10 |
3 files changed, 18 insertions, 14 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index ea9264e751..9f773aefa7 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -74,6 +74,8 @@ from synapse.storage.databases.main.events_worker import EventRedactBehaviour from synapse.types import ( JsonDict, MutableStateMap, + PersistedEventPosition, + RoomStreamToken, StateMap, UserID, get_domain_from_id, @@ -2956,7 +2958,7 @@ class FederationHandler(BaseHandler): ) return result["max_stream_id"] else: - max_stream_id = await self.storage.persistence.persist_events( + max_stream_token = await self.storage.persistence.persist_events( event_and_contexts, backfilled=backfilled ) @@ -2967,12 +2969,12 @@ class FederationHandler(BaseHandler): if not backfilled: # Never notify for backfilled events for event, _ in event_and_contexts: - await self._notify_persisted_event(event, max_stream_id) + await self._notify_persisted_event(event, max_stream_token) - return max_stream_id + return max_stream_token.stream async def _notify_persisted_event( - self, event: EventBase, max_stream_id: int + self, event: EventBase, max_stream_token: RoomStreamToken ) -> None: """Checks to see if notifier/pushers should be notified about the event or not. @@ -2998,9 +3000,11 @@ class FederationHandler(BaseHandler): elif event.internal_metadata.is_outlier(): return - event_stream_id = event.internal_metadata.stream_ordering + event_pos = PersistedEventPosition( + self._instance_name, event.internal_metadata.stream_ordering + ) self.notifier.on_new_room_event( - event, event_stream_id, max_stream_id, extra_users=extra_users + event, event_pos, max_stream_token, extra_users=extra_users ) async def _clean_room_for_join(self, room_id: str) -> None: diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 6ee559fd1d..ee271e85e5 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -1138,7 +1138,7 @@ class EventCreationHandler: if prev_state_ids: raise AuthError(403, "Changing the room create event is forbidden") - event_stream_id, max_stream_id = await self.storage.persistence.persist_event( + event_pos, max_stream_token = await self.storage.persistence.persist_event( event, context=context ) @@ -1149,7 +1149,7 @@ class EventCreationHandler: def _notify(): try: self.notifier.on_new_room_event( - event, event_stream_id, max_stream_id, extra_users=extra_users + event, event_pos, max_stream_token, extra_users=extra_users ) except Exception: logger.exception("Error notifying about new room event") @@ -1161,7 +1161,7 @@ class EventCreationHandler: # matters as sometimes presence code can take a while. run_in_background(self._bump_active_time, requester.user) - return event_stream_id + return event_pos.stream async def _bump_active_time(self, user: UserID) -> None: try: diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 9b3a4f638b..e948efef2e 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -967,7 +967,7 @@ class SyncHandler: raise NotImplementedError() else: joined_room_ids = await self.get_rooms_for_user_at( - user_id, now_token.room_stream_id + user_id, now_token.room_key ) sync_result_builder = SyncResultBuilder( sync_config, @@ -1916,7 +1916,7 @@ class SyncHandler: raise Exception("Unrecognized rtype: %r", room_builder.rtype) async def get_rooms_for_user_at( - self, user_id: str, stream_ordering: int + self, user_id: str, room_key: RoomStreamToken ) -> FrozenSet[str]: """Get set of joined rooms for a user at the given stream ordering. @@ -1942,15 +1942,15 @@ class SyncHandler: # If the membership's stream ordering is after the given stream # ordering, we need to go and work out if the user was in the room # before. - for room_id, membership_stream_ordering in joined_rooms: - if membership_stream_ordering <= stream_ordering: + for room_id, event_pos in joined_rooms: + if not event_pos.persisted_after(room_key): joined_room_ids.add(room_id) continue logger.info("User joined room after current token: %s", room_id) extrems = await self.store.get_forward_extremeties_for_room( - room_id, stream_ordering + room_id, event_pos.stream ) users_in_room = await self.state.get_current_users_in_room(room_id, extrems) if user_id in users_in_room: |