summary refs log tree commit diff
path: root/synapse/storage/databases
diff options
context:
space:
mode:
authorEric Eastwood <eric.eastwood@beta.gouv.fr>2024-06-17 11:27:14 -0500
committerGitHub <noreply@github.com>2024-06-17 11:27:14 -0500
commite5b8a3e37f10168953124282c296821b9d9d81ad (patch)
tree0f3bea2d5067d6a5bd5be08c23fb5ad159a09577 /synapse/storage/databases
parentMerge branch 'release-v1.109' into develop (diff)
downloadsynapse-e5b8a3e37f10168953124282c296821b9d9d81ad.tar.xz
Add `stream_ordering` sort to Sliding Sync `/sync` (#17293)
Sort is no longer configurable and we always sort rooms by the `stream_ordering` of the last event in the room or the point where the user can see up to in cases of leave/ban/invite/knock.
Diffstat (limited to 'synapse/storage/databases')
-rw-r--r--synapse/storage/databases/main/stream.py42
1 files changed, 36 insertions, 6 deletions
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py

index 61373f0bfb..ff0d723684 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py
@@ -895,7 +895,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): "get_room_event_before_stream_ordering", _f ) - async def get_last_event_in_room_before_stream_ordering( + async def get_last_event_id_in_room_before_stream_ordering( self, room_id: str, end_token: RoomStreamToken, @@ -910,10 +910,38 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): The ID of the most recent event, or None if there are no events in the room before this stream ordering. """ + last_event_result = ( + await self.get_last_event_pos_in_room_before_stream_ordering( + room_id, end_token + ) + ) + + if last_event_result: + return last_event_result[0] + + return None + + async def get_last_event_pos_in_room_before_stream_ordering( + self, + room_id: str, + end_token: RoomStreamToken, + ) -> Optional[Tuple[str, PersistedEventPosition]]: + """ + Returns the ID and event position of the last event in a room at or before a + stream ordering. + + Args: + room_id + end_token: The token used to stream from + + Returns: + The ID of the most recent event and it's position, or None if there are no + events in the room before this stream ordering. + """ - def get_last_event_in_room_before_stream_ordering_txn( + def get_last_event_pos_in_room_before_stream_ordering_txn( txn: LoggingTransaction, - ) -> Optional[str]: + ) -> Optional[Tuple[str, PersistedEventPosition]]: # We're looking for the closest event at or before the token. We need to # handle the fact that the stream token can be a vector clock (with an # `instance_map`) and events can be persisted on different instances @@ -975,13 +1003,15 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): topological_ordering=topological_ordering, stream_ordering=stream_ordering, ): - return event_id + return event_id, PersistedEventPosition( + instance_name, stream_ordering + ) return None return await self.db_pool.runInteraction( - "get_last_event_in_room_before_stream_ordering", - get_last_event_in_room_before_stream_ordering_txn, + "get_last_event_pos_in_room_before_stream_ordering", + get_last_event_pos_in_room_before_stream_ordering_txn, ) async def get_current_room_stream_token_for_room_id(