From e3a49f4784d5c915355ac9306e60b09433db60b5 Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Wed, 13 Apr 2022 11:38:35 +0100 Subject: Fix missing sync events during historical batch imports (#12319) Discovered after much in-depth investigation in #12281. Closes: #12281 Closes: #3305 Signed off by: Nick Mills-Barrett nick@beeper.com --- synapse/handlers/message.py | 14 +++++--------- synapse/handlers/sync.py | 15 +++++++-------- synapse/storage/databases/main/stream.py | 26 ++++++++++++++++++++++++++ 3 files changed, 38 insertions(+), 17 deletions(-) (limited to 'synapse') diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 47a63005a9..1b092e900e 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -175,17 +175,13 @@ class MessageHandler: state_filter = state_filter or StateFilter.all() if at_token: - # FIXME this claims to get the state at a stream position, but - # get_recent_events_for_room operates by topo ordering. This therefore - # does not reliably give you the state at the given stream position. - # (https://github.com/matrix-org/synapse/issues/3305) - last_events, _ = await self.store.get_recent_events_for_room( - room_id, end_token=at_token.room_key, limit=1 + last_event = await self.store.get_last_event_in_room_before_stream_ordering( + room_id, + end_token=at_token.room_key, ) - if not last_events: + if not last_event: raise NotFoundError("Can't find event for token %s" % (at_token,)) - last_event = last_events[0] # check whether the user is in the room at that time to determine # whether they should be treated as peeking. @@ -204,7 +200,7 @@ class MessageHandler: visible_events = await filter_events_for_client( self.storage, user_id, - last_events, + [last_event], filter_send_to_client=False, is_peeking=is_peeking, ) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 6c8b17c420..5125126a80 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -661,16 +661,15 @@ class SyncHandler: stream_position: point at which to get state state_filter: The state filter used to fetch state from the database. """ - # FIXME this claims to get the state at a stream position, but - # get_recent_events_for_room operates by topo ordering. This therefore - # does not reliably give you the state at the given stream position. - # (https://github.com/matrix-org/synapse/issues/3305) - last_events, _ = await self.store.get_recent_events_for_room( - room_id, end_token=stream_position.room_key, limit=1 + # FIXME: This gets the state at the latest event before the stream ordering, + # which might not be the same as the "current state" of the room at the time + # of the stream token if there were multiple forward extremities at the time. + last_event = await self.store.get_last_event_in_room_before_stream_ordering( + room_id, + end_token=stream_position.room_key, ) - if last_events: - last_event = last_events[-1] + if last_event: state = await self.get_state_after_event( last_event, state_filter=state_filter or StateFilter.all() ) diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 6d45a8a9f6..793e906630 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -758,6 +758,32 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): "get_room_event_before_stream_ordering", _f ) + async def get_last_event_in_room_before_stream_ordering( + self, + room_id: str, + end_token: RoomStreamToken, + ) -> Optional[EventBase]: + """Returns the last event in a room at or before a stream ordering + + Args: + room_id + end_token: The token used to stream from + + Returns: + The most recent event. + """ + + last_row = await self.get_room_event_before_stream_ordering( + room_id=room_id, + stream_ordering=end_token.stream, + ) + if last_row: + _, _, event_id = last_row + event = await self.get_event(event_id, get_prev_content=True) + return event + + return None + async def get_current_room_stream_token_for_room_id( self, room_id: Optional[str] = None ) -> RoomStreamToken: -- cgit 1.4.1