From e3a49f4784d5c915355ac9306e60b09433db60b5 Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Wed, 13 Apr 2022 11:38:35 +0100 Subject: Fix missing sync events during historical batch imports (#12319) Discovered after much in-depth investigation in #12281. Closes: #12281 Closes: #3305 Signed off by: Nick Mills-Barrett nick@beeper.com --- synapse/storage/databases/main/stream.py | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) (limited to 'synapse/storage/databases/main/stream.py') diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 6d45a8a9f6..793e906630 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -758,6 +758,32 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): "get_room_event_before_stream_ordering", _f ) + async def get_last_event_in_room_before_stream_ordering( + self, + room_id: str, + end_token: RoomStreamToken, + ) -> Optional[EventBase]: + """Returns the last event in a room at or before a stream ordering + + Args: + room_id + end_token: The token used to stream from + + Returns: + The most recent event. + """ + + last_row = await self.get_room_event_before_stream_ordering( + room_id=room_id, + stream_ordering=end_token.stream, + ) + if last_row: + _, _, event_id = last_row + event = await self.get_event(event_id, get_prev_content=True) + return event + + return None + async def get_current_room_stream_token_for_room_id( self, room_id: Optional[str] = None ) -> RoomStreamToken: -- cgit 1.4.1