summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2024-07-16 12:32:24 +0100
committerErik Johnston <erik@matrix.org>2024-07-17 13:59:34 +0100
commite2a88e44ef6b33a691411ff7ae93d68c2011297e (patch)
treec7d050d3f610bd52f1be1f0f46e577c56561f12c
parentAdd conn_id field (diff)
downloadsynapse-e2a88e44ef6b33a691411ff7ae93d68c2011297e.tar.xz
Use new room store to track if we've sent a room down
-rw-r--r--synapse/handlers/sliding_sync.py55
1 files changed, 44 insertions, 11 deletions
diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py
index b108e8a4a2..618c69d5fe 100644
--- a/synapse/handlers/sliding_sync.py
+++ b/synapse/handlers/sliding_sync.py
@@ -24,6 +24,7 @@ from typing import TYPE_CHECKING, Any, Dict, Final, List, Mapping, Optional, Set
 
 import attr
 from immutabledict import immutabledict
+from typing_extensions import assert_never
 
 from synapse.api.constants import AccountDataTypes, Direction, EventTypes, Membership
 from synapse.events import EventBase
@@ -342,6 +343,8 @@ class SlidingSyncHandler:
         self.relations_handler = hs.get_relations_handler()
         self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync
 
+        self.connection_store = SlidingSyncConnectionStore()
+
     async def wait_for_sync_for_user(
         self,
         requester: Requester,
@@ -449,6 +452,12 @@ class SlidingSyncHandler:
             # See https://github.com/matrix-org/matrix-doc/issues/1144
             raise NotImplementedError()
 
+        await self.connection_store.mark_token_seen(
+            user_id,
+            conn_id=sync_config.connection_id(),
+            from_token=from_token,
+        )
+
         # Get all of the room IDs that the user should be able to see in the sync
         # response
         has_lists = sync_config.lists is not None and len(sync_config.lists) > 0
@@ -596,7 +605,7 @@ class SlidingSyncHandler:
         rooms: Dict[str, SlidingSyncResult.RoomResult] = {}
         for room_id, room_sync_config in relevant_room_map.items():
             room_sync_result = await self.get_room_sync_data(
-                user=sync_config.user,
+                sync_config=sync_config,
                 room_id=room_id,
                 room_sync_config=room_sync_config,
                 room_membership_for_user_at_to_token=room_membership_for_user_map[
@@ -612,8 +621,18 @@ class SlidingSyncHandler:
             sync_config=sync_config, to_token=to_token
         )
 
-        # TODO: Update this when we implement per-connection state
-        connection_token = 0
+        if has_lists or has_room_subscriptions:
+            connection_token = await self.connection_store.record_rooms(
+                user_id,
+                conn_id=sync_config.connection_id(),
+                from_token=from_token,
+                sent_room_ids=relevant_room_map.keys(),
+                unsent_room_ids=[],  # TODO: We currently ssume that we have sent down all updates.
+            )
+        elif from_token:
+            connection_token = from_token.connection_token
+        else:
+            connection_token = 0
 
         return SlidingSyncResult(
             next_pos=SlidingSyncStreamToken(to_token, connection_token),
@@ -1348,7 +1367,7 @@ class SlidingSyncHandler:
 
     async def get_room_sync_data(
         self,
-        user: UserID,
+        sync_config: SlidingSyncConfig,
         room_id: str,
         room_sync_config: RoomSyncConfig,
         room_membership_for_user_at_to_token: _RoomMembershipForUser,
@@ -1370,6 +1389,7 @@ class SlidingSyncHandler:
             from_token: The point in the stream to sync from.
             to_token: The point in the stream to sync up to.
         """
+        user = sync_config.user
 
         # Assemble the list of timeline events
         #
@@ -1413,14 +1433,27 @@ class SlidingSyncHandler:
             # screen of information:
             #  - Initial sync (because no `from_token` to limit us anyway)
             #  - When users `newly_joined`
-            #  - TODO: For an incremental sync where we haven't sent it down this
+            #  - For an incremental sync where we haven't sent it down this
             #    connection before
-            to_bound = (
-                from_token.stream_token.room_key
-                if from_token is not None
-                and not room_membership_for_user_at_to_token.newly_joined
-                else None
-            )
+
+            if from_token and not room_membership_for_user_at_to_token.newly_joined:
+                room_status = await self.connection_store.have_sent_room(
+                    user_id=user.to_string(),
+                    conn_id=sync_config.connection_id(),
+                    connection_token=from_token.connection_token,
+                    room_id=room_id,
+                )
+                if room_status.status == HaveSentRoomFlag.LIVE:
+                    to_bound = from_token.stream_token.room_key
+                elif room_status.status == HaveSentRoomFlag.PREVIOUSLY:
+                    assert room_status.last_token is not None
+                    to_bound = room_status.last_token
+                elif room_status.status == HaveSentRoomFlag.NEVER:
+                    to_bound = None
+                else:
+                    assert_never(room_status.status)
+            else:
+                to_bound = None
 
             timeline_events, new_room_key = await self.store.paginate_room_events(
                 room_id=room_id,