diff options
author | Erik Johnston <erik@matrix.org> | 2024-07-16 12:13:40 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2024-07-17 12:14:28 +0100 |
commit | 1ad1cce3f2137f6be2c8d48121de27ea194fef4f (patch) | |
tree | eb7037b5e070bd80c8ab3fa13b98cf62db8944ad /synapse/handlers | |
parent | Add SlidingSyncStreamToken (diff) | |
download | synapse-1ad1cce3f2137f6be2c8d48121de27ea194fef4f.tar.xz |
Pass throught SlidingSyncStreamToken
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/sliding_sync.py | 24 |
1 files changed, 15 insertions, 9 deletions
diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 1b5262d667..3aa139ff1c 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -36,6 +36,7 @@ from synapse.types import ( PersistedEventPosition, Requester, RoomStreamToken, + SlidingSyncStreamToken, StateMap, StreamKeyType, StreamToken, @@ -343,7 +344,7 @@ class SlidingSyncHandler: self, requester: Requester, sync_config: SlidingSyncConfig, - from_token: Optional[StreamToken] = None, + from_token: Optional[SlidingSyncStreamToken] = None, timeout_ms: int = 0, ) -> SlidingSyncResult: """ @@ -378,7 +379,7 @@ class SlidingSyncHandler: # this returns false, it means we timed out waiting, and we should # just return an empty response. before_wait_ts = self.clock.time_msec() - if not await self.notifier.wait_for_stream_token(from_token): + if not await self.notifier.wait_for_stream_token(from_token.stream_token): logger.warning( "Timed out waiting for worker to catch up. Returning empty response" ) @@ -416,7 +417,7 @@ class SlidingSyncHandler: sync_config.user.to_string(), timeout_ms, current_sync_callback, - from_token=from_token, + from_token=from_token.stream_token, ) return result @@ -425,7 +426,7 @@ class SlidingSyncHandler: self, sync_config: SlidingSyncConfig, to_token: StreamToken, - from_token: Optional[StreamToken] = None, + from_token: Optional[SlidingSyncStreamToken] = None, ) -> SlidingSyncResult: """ Generates the response body of a Sliding Sync result, represented as a @@ -458,7 +459,7 @@ class SlidingSyncHandler: await self.get_room_membership_for_user_at_to_token( user=sync_config.user, to_token=to_token, - from_token=from_token, + from_token=from_token.stream_token if from_token else None, ) ) @@ -609,8 +610,11 @@ class SlidingSyncHandler: sync_config=sync_config, to_token=to_token ) + # TODO: Update this when we implement per-connection state + connection_token = 0 + return SlidingSyncResult( - next_pos=to_token, + next_pos=SlidingSyncStreamToken(to_token, connection_token), lists=lists, rooms=rooms, extensions=extensions, @@ -1346,7 +1350,7 @@ class SlidingSyncHandler: room_id: str, room_sync_config: RoomSyncConfig, room_membership_for_user_at_to_token: _RoomMembershipForUser, - from_token: Optional[StreamToken], + from_token: Optional[SlidingSyncStreamToken], to_token: StreamToken, ) -> SlidingSyncResult.RoomResult: """ @@ -1410,7 +1414,7 @@ class SlidingSyncHandler: # - TODO: For an incremental sync where we haven't sent it down this # connection before to_bound = ( - from_token.room_key + from_token.stream_token.room_key if from_token is not None and not room_membership_for_user_at_to_token.newly_joined else None @@ -1477,7 +1481,9 @@ class SlidingSyncHandler: instance_name=timeline_event.internal_metadata.instance_name, stream=timeline_event.internal_metadata.stream_ordering, ) - if persisted_position.persisted_after(from_token.room_key): + if persisted_position.persisted_after( + from_token.stream_token.room_key + ): num_live += 1 else: # Since we're iterating over the timeline events in |