diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py
index b108e8a4a2..618c69d5fe 100644
--- a/synapse/handlers/sliding_sync.py
+++ b/synapse/handlers/sliding_sync.py
@@ -24,6 +24,7 @@ from typing import TYPE_CHECKING, Any, Dict, Final, List, Mapping, Optional, Set
import attr
from immutabledict import immutabledict
+from typing_extensions import assert_never
from synapse.api.constants import AccountDataTypes, Direction, EventTypes, Membership
from synapse.events import EventBase
@@ -342,6 +343,8 @@ class SlidingSyncHandler:
self.relations_handler = hs.get_relations_handler()
self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync
+ self.connection_store = SlidingSyncConnectionStore()
+
async def wait_for_sync_for_user(
self,
requester: Requester,
@@ -449,6 +452,12 @@ class SlidingSyncHandler:
# See https://github.com/matrix-org/matrix-doc/issues/1144
raise NotImplementedError()
+ await self.connection_store.mark_token_seen(
+ user_id,
+ conn_id=sync_config.connection_id(),
+ from_token=from_token,
+ )
+
# Get all of the room IDs that the user should be able to see in the sync
# response
has_lists = sync_config.lists is not None and len(sync_config.lists) > 0
@@ -596,7 +605,7 @@ class SlidingSyncHandler:
rooms: Dict[str, SlidingSyncResult.RoomResult] = {}
for room_id, room_sync_config in relevant_room_map.items():
room_sync_result = await self.get_room_sync_data(
- user=sync_config.user,
+ sync_config=sync_config,
room_id=room_id,
room_sync_config=room_sync_config,
room_membership_for_user_at_to_token=room_membership_for_user_map[
@@ -612,8 +621,18 @@ class SlidingSyncHandler:
sync_config=sync_config, to_token=to_token
)
- # TODO: Update this when we implement per-connection state
- connection_token = 0
+ if has_lists or has_room_subscriptions:
+ connection_token = await self.connection_store.record_rooms(
+ user_id,
+ conn_id=sync_config.connection_id(),
+ from_token=from_token,
+ sent_room_ids=relevant_room_map.keys(),
+ unsent_room_ids=[], # TODO: We currently ssume that we have sent down all updates.
+ )
+ elif from_token:
+ connection_token = from_token.connection_token
+ else:
+ connection_token = 0
return SlidingSyncResult(
next_pos=SlidingSyncStreamToken(to_token, connection_token),
@@ -1348,7 +1367,7 @@ class SlidingSyncHandler:
async def get_room_sync_data(
self,
- user: UserID,
+ sync_config: SlidingSyncConfig,
room_id: str,
room_sync_config: RoomSyncConfig,
room_membership_for_user_at_to_token: _RoomMembershipForUser,
@@ -1370,6 +1389,7 @@ class SlidingSyncHandler:
from_token: The point in the stream to sync from.
to_token: The point in the stream to sync up to.
"""
+ user = sync_config.user
# Assemble the list of timeline events
#
@@ -1413,14 +1433,27 @@ class SlidingSyncHandler:
# screen of information:
# - Initial sync (because no `from_token` to limit us anyway)
# - When users `newly_joined`
- # - TODO: For an incremental sync where we haven't sent it down this
+ # - For an incremental sync where we haven't sent it down this
# connection before
- to_bound = (
- from_token.stream_token.room_key
- if from_token is not None
- and not room_membership_for_user_at_to_token.newly_joined
- else None
- )
+
+ if from_token and not room_membership_for_user_at_to_token.newly_joined:
+ room_status = await self.connection_store.have_sent_room(
+ user_id=user.to_string(),
+ conn_id=sync_config.connection_id(),
+ connection_token=from_token.connection_token,
+ room_id=room_id,
+ )
+ if room_status.status == HaveSentRoomFlag.LIVE:
+ to_bound = from_token.stream_token.room_key
+ elif room_status.status == HaveSentRoomFlag.PREVIOUSLY:
+ assert room_status.last_token is not None
+ to_bound = room_status.last_token
+ elif room_status.status == HaveSentRoomFlag.NEVER:
+ to_bound = None
+ else:
+ assert_never(room_status.status)
+ else:
+ to_bound = None
timeline_events, new_room_key = await self.store.paginate_room_events(
room_id=room_id,
|