diff options
author | Eric Eastwood <eric.eastwood@beta.gouv.fr> | 2024-06-17 11:27:14 -0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-06-17 11:27:14 -0500 |
commit | e5b8a3e37f10168953124282c296821b9d9d81ad (patch) | |
tree | 0f3bea2d5067d6a5bd5be08c23fb5ad159a09577 /synapse/storage | |
parent | Merge branch 'release-v1.109' into develop (diff) | |
download | synapse-e5b8a3e37f10168953124282c296821b9d9d81ad.tar.xz |
Add `stream_ordering` sort to Sliding Sync `/sync` (#17293)
Sort is no longer configurable and we always sort rooms by the `stream_ordering` of the last event in the room or the point where the user can see up to in cases of leave/ban/invite/knock.
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/databases/main/stream.py | 42 |
1 files changed, 36 insertions, 6 deletions
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 61373f0bfb..ff0d723684 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -895,7 +895,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): "get_room_event_before_stream_ordering", _f ) - async def get_last_event_in_room_before_stream_ordering( + async def get_last_event_id_in_room_before_stream_ordering( self, room_id: str, end_token: RoomStreamToken, @@ -910,10 +910,38 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): The ID of the most recent event, or None if there are no events in the room before this stream ordering. """ + last_event_result = ( + await self.get_last_event_pos_in_room_before_stream_ordering( + room_id, end_token + ) + ) + + if last_event_result: + return last_event_result[0] + + return None + + async def get_last_event_pos_in_room_before_stream_ordering( + self, + room_id: str, + end_token: RoomStreamToken, + ) -> Optional[Tuple[str, PersistedEventPosition]]: + """ + Returns the ID and event position of the last event in a room at or before a + stream ordering. + + Args: + room_id + end_token: The token used to stream from + + Returns: + The ID of the most recent event and it's position, or None if there are no + events in the room before this stream ordering. + """ - def get_last_event_in_room_before_stream_ordering_txn( + def get_last_event_pos_in_room_before_stream_ordering_txn( txn: LoggingTransaction, - ) -> Optional[str]: + ) -> Optional[Tuple[str, PersistedEventPosition]]: # We're looking for the closest event at or before the token. We need to # handle the fact that the stream token can be a vector clock (with an # `instance_map`) and events can be persisted on different instances @@ -975,13 +1003,15 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): topological_ordering=topological_ordering, stream_ordering=stream_ordering, ): - return event_id + return event_id, PersistedEventPosition( + instance_name, stream_ordering + ) return None return await self.db_pool.runInteraction( - "get_last_event_in_room_before_stream_ordering", - get_last_event_in_room_before_stream_ordering_txn, + "get_last_event_pos_in_room_before_stream_ordering", + get_last_event_pos_in_room_before_stream_ordering_txn, ) async def get_current_room_stream_token_for_room_id( |