diff options
author | Erik Johnston <erik@matrix.org> | 2024-07-16 12:32:24 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2024-07-17 13:59:34 +0100 |
commit | e2a88e44ef6b33a691411ff7ae93d68c2011297e (patch) | |
tree | c7d050d3f610bd52f1be1f0f46e577c56561f12c | |
parent | Add conn_id field (diff) | |
download | synapse-e2a88e44ef6b33a691411ff7ae93d68c2011297e.tar.xz |
Use new room store to track if we've sent a room down
-rw-r--r-- | synapse/handlers/sliding_sync.py | 55 |
1 files changed, 44 insertions, 11 deletions
diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index b108e8a4a2..618c69d5fe 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -24,6 +24,7 @@ from typing import TYPE_CHECKING, Any, Dict, Final, List, Mapping, Optional, Set import attr from immutabledict import immutabledict +from typing_extensions import assert_never from synapse.api.constants import AccountDataTypes, Direction, EventTypes, Membership from synapse.events import EventBase @@ -342,6 +343,8 @@ class SlidingSyncHandler: self.relations_handler = hs.get_relations_handler() self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync + self.connection_store = SlidingSyncConnectionStore() + async def wait_for_sync_for_user( self, requester: Requester, @@ -449,6 +452,12 @@ class SlidingSyncHandler: # See https://github.com/matrix-org/matrix-doc/issues/1144 raise NotImplementedError() + await self.connection_store.mark_token_seen( + user_id, + conn_id=sync_config.connection_id(), + from_token=from_token, + ) + # Get all of the room IDs that the user should be able to see in the sync # response has_lists = sync_config.lists is not None and len(sync_config.lists) > 0 @@ -596,7 +605,7 @@ class SlidingSyncHandler: rooms: Dict[str, SlidingSyncResult.RoomResult] = {} for room_id, room_sync_config in relevant_room_map.items(): room_sync_result = await self.get_room_sync_data( - user=sync_config.user, + sync_config=sync_config, room_id=room_id, room_sync_config=room_sync_config, room_membership_for_user_at_to_token=room_membership_for_user_map[ @@ -612,8 +621,18 @@ class SlidingSyncHandler: sync_config=sync_config, to_token=to_token ) - # TODO: Update this when we implement per-connection state - connection_token = 0 + if has_lists or has_room_subscriptions: + connection_token = await self.connection_store.record_rooms( + user_id, + conn_id=sync_config.connection_id(), + from_token=from_token, + sent_room_ids=relevant_room_map.keys(), + unsent_room_ids=[], # TODO: We currently ssume that we have sent down all updates. + ) + elif from_token: + connection_token = from_token.connection_token + else: + connection_token = 0 return SlidingSyncResult( next_pos=SlidingSyncStreamToken(to_token, connection_token), @@ -1348,7 +1367,7 @@ class SlidingSyncHandler: async def get_room_sync_data( self, - user: UserID, + sync_config: SlidingSyncConfig, room_id: str, room_sync_config: RoomSyncConfig, room_membership_for_user_at_to_token: _RoomMembershipForUser, @@ -1370,6 +1389,7 @@ class SlidingSyncHandler: from_token: The point in the stream to sync from. to_token: The point in the stream to sync up to. """ + user = sync_config.user # Assemble the list of timeline events # @@ -1413,14 +1433,27 @@ class SlidingSyncHandler: # screen of information: # - Initial sync (because no `from_token` to limit us anyway) # - When users `newly_joined` - # - TODO: For an incremental sync where we haven't sent it down this + # - For an incremental sync where we haven't sent it down this # connection before - to_bound = ( - from_token.stream_token.room_key - if from_token is not None - and not room_membership_for_user_at_to_token.newly_joined - else None - ) + + if from_token and not room_membership_for_user_at_to_token.newly_joined: + room_status = await self.connection_store.have_sent_room( + user_id=user.to_string(), + conn_id=sync_config.connection_id(), + connection_token=from_token.connection_token, + room_id=room_id, + ) + if room_status.status == HaveSentRoomFlag.LIVE: + to_bound = from_token.stream_token.room_key + elif room_status.status == HaveSentRoomFlag.PREVIOUSLY: + assert room_status.last_token is not None + to_bound = room_status.last_token + elif room_status.status == HaveSentRoomFlag.NEVER: + to_bound = None + else: + assert_never(room_status.status) + else: + to_bound = None timeline_events, new_room_key = await self.store.paginate_room_events( room_id=room_id, |