summary refs log tree commit diff
path: root/synapse/handlers/sliding_sync.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/sliding_sync.py')
-rw-r--r--synapse/handlers/sliding_sync.py40
1 files changed, 26 insertions, 14 deletions
diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py

index 554ab59bf3..f1f6f30b95 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py
@@ -49,6 +49,7 @@ from synapse.types import ( PersistedEventPosition, Requester, RoomStreamToken, + SlidingSyncStreamToken, StateMap, StreamKeyType, StreamToken, @@ -362,7 +363,7 @@ class SlidingSyncHandler: self, requester: Requester, sync_config: SlidingSyncConfig, - from_token: Optional[StreamToken] = None, + from_token: Optional[SlidingSyncStreamToken] = None, timeout_ms: int = 0, ) -> SlidingSyncResult: """ @@ -393,7 +394,7 @@ class SlidingSyncHandler: # this returns false, it means we timed out waiting, and we should # just return an empty response. before_wait_ts = self.clock.time_msec() - if not await self.notifier.wait_for_stream_token(from_token): + if not await self.notifier.wait_for_stream_token(from_token.stream_token): logger.warning( "Timed out waiting for worker to catch up. Returning empty response" ) @@ -431,7 +432,7 @@ class SlidingSyncHandler: sync_config.user.to_string(), timeout_ms, current_sync_callback, - from_token=from_token, + from_token=from_token.stream_token, ) return result @@ -440,7 +441,7 @@ class SlidingSyncHandler: self, sync_config: SlidingSyncConfig, to_token: StreamToken, - from_token: Optional[StreamToken] = None, + from_token: Optional[SlidingSyncStreamToken] = None, ) -> SlidingSyncResult: """ Generates the response body of a Sliding Sync result, represented as a @@ -473,7 +474,7 @@ class SlidingSyncHandler: await self.get_room_membership_for_user_at_to_token( user=sync_config.user, to_token=to_token, - from_token=from_token, + from_token=from_token.stream_token if from_token else None, ) ) @@ -631,8 +632,11 @@ class SlidingSyncHandler: to_token=to_token, ) + # TODO: Update this when we implement per-connection state + connection_token = 0 + return SlidingSyncResult( - next_pos=to_token, + next_pos=SlidingSyncStreamToken(to_token, connection_token), lists=lists, rooms=rooms, extensions=extensions, @@ -1367,7 +1371,7 @@ class SlidingSyncHandler: room_id: str, room_sync_config: RoomSyncConfig, room_membership_for_user_at_to_token: _RoomMembershipForUser, - from_token: Optional[StreamToken], + from_token: Optional[SlidingSyncStreamToken], to_token: StreamToken, ) -> SlidingSyncResult.RoomResult: """ @@ -1431,7 +1435,7 @@ class SlidingSyncHandler: # - TODO: For an incremental sync where we haven't sent it down this # connection before to_bound = ( - from_token.room_key + from_token.stream_token.room_key if from_token is not None and not room_membership_for_user_at_to_token.newly_joined else None @@ -1498,7 +1502,9 @@ class SlidingSyncHandler: instance_name=timeline_event.internal_metadata.instance_name, stream=timeline_event.internal_metadata.stream_ordering, ) - if persisted_position.persisted_after(from_token.room_key): + if persisted_position.persisted_after( + from_token.stream_token.room_key + ): num_live += 1 else: # Since we're iterating over the timeline events in @@ -1752,8 +1758,14 @@ class SlidingSyncHandler: bump_stamp = room_membership_for_user_at_to_token.event_pos.stream # But if we found a bump event, use that instead if last_bump_event_result is not None: - _, bump_event_pos = last_bump_event_result - bump_stamp = bump_event_pos.stream + _, new_bump_event_pos = last_bump_event_result + + # If we've just joined a remote room, then the last bump event may + # have been backfilled (and so have a negative stream ordering). + # These negative stream orderings can't sensibly be compared, so + # instead we use the membership event position. + if new_bump_event_pos.stream > 0: + bump_stamp = new_bump_event_pos.stream return SlidingSyncResult.RoomResult( name=room_name, @@ -1786,7 +1798,7 @@ class SlidingSyncHandler: self, sync_config: SlidingSyncConfig, to_token: StreamToken, - from_token: Optional[StreamToken], + from_token: Optional[SlidingSyncStreamToken], ) -> SlidingSyncResult.Extensions: """Handle extension requests. @@ -1900,7 +1912,7 @@ class SlidingSyncHandler: sync_config: SlidingSyncConfig, e2ee_request: SlidingSyncConfig.Extensions.E2eeExtension, to_token: StreamToken, - from_token: Optional[StreamToken], + from_token: Optional[SlidingSyncStreamToken], ) -> Optional[SlidingSyncResult.Extensions.E2eeExtension]: """Handle E2EE device extension (MSC3884) @@ -1922,7 +1934,7 @@ class SlidingSyncHandler: # TODO: This should take into account the `from_token` and `to_token` device_list_updates = await self.device_handler.get_user_ids_changed( user_id=user_id, - from_token=from_token, + from_token=from_token.stream_token, ) device_one_time_keys_count: Mapping[str, int] = {}