summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/databases/main/stream.py42
1 files changed, 36 insertions, 6 deletions
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index 61373f0bfb..ff0d723684 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -895,7 +895,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             "get_room_event_before_stream_ordering", _f
         )
 
-    async def get_last_event_in_room_before_stream_ordering(
+    async def get_last_event_id_in_room_before_stream_ordering(
         self,
         room_id: str,
         end_token: RoomStreamToken,
@@ -910,10 +910,38 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             The ID of the most recent event, or None if there are no events in the room
             before this stream ordering.
         """
+        last_event_result = (
+            await self.get_last_event_pos_in_room_before_stream_ordering(
+                room_id, end_token
+            )
+        )
+
+        if last_event_result:
+            return last_event_result[0]
+
+        return None
+
+    async def get_last_event_pos_in_room_before_stream_ordering(
+        self,
+        room_id: str,
+        end_token: RoomStreamToken,
+    ) -> Optional[Tuple[str, PersistedEventPosition]]:
+        """
+        Returns the ID and event position of the last event in a room at or before a
+        stream ordering.
+
+        Args:
+            room_id
+            end_token: The token used to stream from
+
+        Returns:
+            The ID of the most recent event and it's position, or None if there are no
+            events in the room before this stream ordering.
+        """
 
-        def get_last_event_in_room_before_stream_ordering_txn(
+        def get_last_event_pos_in_room_before_stream_ordering_txn(
             txn: LoggingTransaction,
-        ) -> Optional[str]:
+        ) -> Optional[Tuple[str, PersistedEventPosition]]:
             # We're looking for the closest event at or before the token. We need to
             # handle the fact that the stream token can be a vector clock (with an
             # `instance_map`) and events can be persisted on different instances
@@ -975,13 +1003,15 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
                     topological_ordering=topological_ordering,
                     stream_ordering=stream_ordering,
                 ):
-                    return event_id
+                    return event_id, PersistedEventPosition(
+                        instance_name, stream_ordering
+                    )
 
             return None
 
         return await self.db_pool.runInteraction(
-            "get_last_event_in_room_before_stream_ordering",
-            get_last_event_in_room_before_stream_ordering_txn,
+            "get_last_event_pos_in_room_before_stream_ordering",
+            get_last_event_pos_in_room_before_stream_ordering_txn,
         )
 
     async def get_current_room_stream_token_for_room_id(