diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index 61373f0bfb..ff0d723684 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -895,7 +895,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
"get_room_event_before_stream_ordering", _f
)
- async def get_last_event_in_room_before_stream_ordering(
+ async def get_last_event_id_in_room_before_stream_ordering(
self,
room_id: str,
end_token: RoomStreamToken,
@@ -910,10 +910,38 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
The ID of the most recent event, or None if there are no events in the room
before this stream ordering.
"""
+ last_event_result = (
+ await self.get_last_event_pos_in_room_before_stream_ordering(
+ room_id, end_token
+ )
+ )
+
+ if last_event_result:
+ return last_event_result[0]
+
+ return None
+
+ async def get_last_event_pos_in_room_before_stream_ordering(
+ self,
+ room_id: str,
+ end_token: RoomStreamToken,
+ ) -> Optional[Tuple[str, PersistedEventPosition]]:
+ """
+ Returns the ID and event position of the last event in a room at or before a
+ stream ordering.
+
+ Args:
+ room_id
+ end_token: The token used to stream from
+
+ Returns:
+ The ID of the most recent event and it's position, or None if there are no
+ events in the room before this stream ordering.
+ """
- def get_last_event_in_room_before_stream_ordering_txn(
+ def get_last_event_pos_in_room_before_stream_ordering_txn(
txn: LoggingTransaction,
- ) -> Optional[str]:
+ ) -> Optional[Tuple[str, PersistedEventPosition]]:
# We're looking for the closest event at or before the token. We need to
# handle the fact that the stream token can be a vector clock (with an
# `instance_map`) and events can be persisted on different instances
@@ -975,13 +1003,15 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
topological_ordering=topological_ordering,
stream_ordering=stream_ordering,
):
- return event_id
+ return event_id, PersistedEventPosition(
+ instance_name, stream_ordering
+ )
return None
return await self.db_pool.runInteraction(
- "get_last_event_in_room_before_stream_ordering",
- get_last_event_in_room_before_stream_ordering_txn,
+ "get_last_event_pos_in_room_before_stream_ordering",
+ get_last_event_pos_in_room_before_stream_ordering_txn,
)
async def get_current_room_stream_token_for_room_id(
|