summary refs log tree commit diff
path: root/synapse/storage/databases/main
diff options
context:
space:
mode:
authorNick Mills-Barrett <nick@fizzadar.com>2022-04-13 11:38:35 +0100
committerGitHub <noreply@github.com>2022-04-13 11:38:35 +0100
commite3a49f4784d5c915355ac9306e60b09433db60b5 (patch)
tree37a4bfca3764b4792738abc1d22d4ff4419af317 /synapse/storage/databases/main
parentUse poetry lockfile in twisted trunk CI job (#12425) (diff)
downloadsynapse-e3a49f4784d5c915355ac9306e60b09433db60b5.tar.xz
Fix missing sync events during historical batch imports (#12319)
Discovered after much in-depth investigation in #12281.

Closes: #12281
Closes: #3305

Signed off by: Nick Mills-Barrett nick@beeper.com
Diffstat (limited to 'synapse/storage/databases/main')
-rw-r--r--synapse/storage/databases/main/stream.py26
1 files changed, 26 insertions, 0 deletions
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index 6d45a8a9f6..793e906630 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -758,6 +758,32 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             "get_room_event_before_stream_ordering", _f
         )
 
+    async def get_last_event_in_room_before_stream_ordering(
+        self,
+        room_id: str,
+        end_token: RoomStreamToken,
+    ) -> Optional[EventBase]:
+        """Returns the last event in a room at or before a stream ordering
+
+        Args:
+            room_id
+            end_token: The token used to stream from
+
+        Returns:
+            The most recent event.
+        """
+
+        last_row = await self.get_room_event_before_stream_ordering(
+            room_id=room_id,
+            stream_ordering=end_token.stream,
+        )
+        if last_row:
+            _, _, event_id = last_row
+            event = await self.get_event(event_id, get_prev_content=True)
+            return event
+
+        return None
+
     async def get_current_room_stream_token_for_room_id(
         self, room_id: Optional[str] = None
     ) -> RoomStreamToken: