summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erikj@element.io>2024-08-08 10:44:17 +0100
committerGitHub <noreply@github.com>2024-08-08 10:44:17 +0100
commit44ac2aa3b69545e02c849276d979c117a2b42070 (patch)
tree0c3ff9b613c6264f5662e2ac9a9f06fbf0af6d09
parentSliding Sync: Use `stream_ordering` based timeline pagination for incremental... (diff)
downloadsynapse-44ac2aa3b69545e02c849276d979c117a2b42070.tar.xz
SSS: Implement PREVIOUSLY room tracking (#17535)
Implement tracking of rooms that have had updates that have not been
sent down to clients.

Simplified Sliding Sync (SSS)
-rw-r--r--changelog.d/17535.bugfix1
-rw-r--r--synapse/handlers/sliding_sync.py68
-rw-r--r--tests/rest/client/sliding_sync/test_connection_tracking.py72
3 files changed, 53 insertions, 88 deletions
diff --git a/changelog.d/17535.bugfix b/changelog.d/17535.bugfix
new file mode 100644
index 0000000000..c5b5da0485
--- /dev/null
+++ b/changelog.d/17535.bugfix
@@ -0,0 +1 @@
+Fix experimental sliding sync implementation to remember any updates in rooms that were not sent down immediately.
diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py
index 0fe66c8bd2..18a96843be 100644
--- a/synapse/handlers/sliding_sync.py
+++ b/synapse/handlers/sliding_sync.py
@@ -543,6 +543,9 @@ class SlidingSyncHandler:
         lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {}
         # Keep track of the rooms that we can display and need to fetch more info about
         relevant_room_map: Dict[str, RoomSyncConfig] = {}
+        # The set of room IDs of all rooms that could appear in any list. These
+        # include rooms that are outside the list ranges.
+        all_rooms: Set[str] = set()
         if has_lists and sync_config.lists is not None:
             with start_active_span("assemble_sliding_window_lists"):
                 sync_room_map = await self.filter_rooms_relevant_for_sync(
@@ -561,11 +564,6 @@ class SlidingSyncHandler:
                             to_token,
                         )
 
-                    # Sort the list
-                    sorted_room_info = await self.sort_rooms(
-                        filtered_sync_room_map, to_token
-                    )
-
                     # Find which rooms are partially stated and may need to be filtered out
                     # depending on the `required_state` requested (see below).
                     partial_state_room_map = (
@@ -586,6 +584,23 @@ class SlidingSyncHandler:
                         and StateValues.LAZY in membership_state_keys
                     )
 
+                    if not lazy_loading:
+                        # Exclude partially-stated rooms unless the `required_state`
+                        # only has `["m.room.member", "$LAZY"]` for membership
+                        # (lazy-loading room members).
+                        filtered_sync_room_map = {
+                            room_id: room
+                            for room_id, room in filtered_sync_room_map.items()
+                            if not partial_state_room_map.get(room_id)
+                        }
+
+                    all_rooms.update(filtered_sync_room_map)
+
+                    # Sort the list
+                    sorted_room_info = await self.sort_rooms(
+                        filtered_sync_room_map, to_token
+                    )
+
                     ops: List[SlidingSyncResult.SlidingWindowList.Operation] = []
                     if list_config.ranges:
                         for range in list_config.ranges:
@@ -603,15 +618,6 @@ class SlidingSyncHandler:
                                 if len(room_ids_in_list) >= max_num_rooms:
                                     break
 
-                                # Exclude partially-stated rooms unless the `required_state`
-                                # only has `["m.room.member", "$LAZY"]` for membership
-                                # (lazy-loading room members).
-                                if (
-                                    partial_state_room_map.get(room_id)
-                                    and not lazy_loading
-                                ):
-                                    continue
-
                                 # Take the superset of the `RoomSyncConfig` for each room.
                                 #
                                 # Update our `relevant_room_map` with the room we're going
@@ -664,6 +670,8 @@ class SlidingSyncHandler:
                     if not room_membership_for_user_at_to_token:
                         continue
 
+                    all_rooms.add(room_id)
+
                     room_membership_for_user_map[room_id] = (
                         room_membership_for_user_at_to_token
                     )
@@ -771,12 +779,40 @@ class SlidingSyncHandler:
         )
 
         if has_lists or has_room_subscriptions:
+            # We now calculate if any rooms outside the range have had updates,
+            # which we are not sending down.
+            #
+            # We *must* record rooms that have had updates, but it is also fine
+            # to record rooms as having updates even if there might not actually
+            # be anything new for the user (e.g. due to event filters, events
+            # having happened after the user left, etc).
+            unsent_room_ids = []
+            if from_token:
+                # The set of rooms that the client (may) care about, but aren't
+                # in any list range (or subscribed to).
+                missing_rooms = all_rooms - relevant_room_map.keys()
+
+                # We now just go and try fetching any events in the above rooms
+                # to see if anything has happened since the `from_token`.
+                #
+                # TODO: Replace this with something faster. When we land the
+                # sliding sync tables that record the most recent event
+                # positions we can use that.
+                missing_event_map_by_room = (
+                    await self.store.get_room_events_stream_for_rooms(
+                        room_ids=missing_rooms,
+                        from_key=to_token.room_key,
+                        to_key=from_token.stream_token.room_key,
+                        limit=1,
+                    )
+                )
+                unsent_room_ids = list(missing_event_map_by_room)
+
             connection_position = await self.connection_store.record_rooms(
                 sync_config=sync_config,
                 from_token=from_token,
                 sent_room_ids=relevant_rooms_to_send_map.keys(),
-                # TODO: We need to calculate which rooms have had updates since the `from_token` but were not included in the `sent_room_ids`
-                unsent_room_ids=[],
+                unsent_room_ids=unsent_room_ids,
             )
         elif from_token:
             connection_position = from_token.connection_position
diff --git a/tests/rest/client/sliding_sync/test_connection_tracking.py b/tests/rest/client/sliding_sync/test_connection_tracking.py
index 4d8866b30a..6863c32f7c 100644
--- a/tests/rest/client/sliding_sync/test_connection_tracking.py
+++ b/tests/rest/client/sliding_sync/test_connection_tracking.py
@@ -21,8 +21,6 @@ import synapse.rest.admin
 from synapse.api.constants import EventTypes
 from synapse.rest.client import login, room, sync
 from synapse.server import HomeServer
-from synapse.types import SlidingSyncStreamToken
-from synapse.types.handlers import SlidingSyncConfig
 from synapse.util import Clock
 
 from tests.rest.client.sliding_sync.test_sliding_sync import SlidingSyncBase
@@ -130,7 +128,6 @@ class SlidingSyncConnectionTrackingTestCase(SlidingSyncBase):
         self.helper.send(room_id1, "msg", tok=user1_tok)
 
         timeline_limit = 5
-        conn_id = "conn_id"
         sync_body = {
             "lists": {
                 "foo-list": {
@@ -170,40 +167,6 @@ class SlidingSyncConnectionTrackingTestCase(SlidingSyncBase):
             response_body["rooms"].keys(), {room_id2}, response_body["rooms"]
         )
 
-        # FIXME: This is a hack to record that the first room wasn't sent down
-        # sync, as we don't implement that currently.
-        sliding_sync_handler = self.hs.get_sliding_sync_handler()
-        requester = self.get_success(
-            self.hs.get_auth().get_user_by_access_token(user1_tok)
-        )
-        sync_config = SlidingSyncConfig(
-            user=requester.user,
-            requester=requester,
-            conn_id=conn_id,
-        )
-
-        parsed_initial_from_token = self.get_success(
-            SlidingSyncStreamToken.from_string(self.store, initial_from_token)
-        )
-        connection_position = self.get_success(
-            sliding_sync_handler.connection_store.record_rooms(
-                sync_config,
-                parsed_initial_from_token,
-                sent_room_ids=[],
-                unsent_room_ids=[room_id1],
-            )
-        )
-
-        # FIXME: Now fix up `from_token` with new connect position above.
-        parsed_from_token = self.get_success(
-            SlidingSyncStreamToken.from_string(self.store, from_token)
-        )
-        parsed_from_token = SlidingSyncStreamToken(
-            stream_token=parsed_from_token.stream_token,
-            connection_position=connection_position,
-        )
-        from_token = self.get_success(parsed_from_token.to_string(self.store))
-
         # We now send another event to room1, so we should sync all the missing events.
         resp = self.helper.send(room_id1, "msg2", tok=user1_tok)
         expected_events.append(resp["event_id"])
@@ -238,7 +201,6 @@ class SlidingSyncConnectionTrackingTestCase(SlidingSyncBase):
 
         self.helper.send(room_id1, "msg", tok=user1_tok)
 
-        conn_id = "conn_id"
         sync_body = {
             "lists": {
                 "foo-list": {
@@ -279,40 +241,6 @@ class SlidingSyncConnectionTrackingTestCase(SlidingSyncBase):
             response_body["rooms"].keys(), {room_id2}, response_body["rooms"]
         )
 
-        # FIXME: This is a hack to record that the first room wasn't sent down
-        # sync, as we don't implement that currently.
-        sliding_sync_handler = self.hs.get_sliding_sync_handler()
-        requester = self.get_success(
-            self.hs.get_auth().get_user_by_access_token(user1_tok)
-        )
-        sync_config = SlidingSyncConfig(
-            user=requester.user,
-            requester=requester,
-            conn_id=conn_id,
-        )
-
-        parsed_initial_from_token = self.get_success(
-            SlidingSyncStreamToken.from_string(self.store, initial_from_token)
-        )
-        connection_position = self.get_success(
-            sliding_sync_handler.connection_store.record_rooms(
-                sync_config,
-                parsed_initial_from_token,
-                sent_room_ids=[],
-                unsent_room_ids=[room_id1],
-            )
-        )
-
-        # FIXME: Now fix up `from_token` with new connect position above.
-        parsed_from_token = self.get_success(
-            SlidingSyncStreamToken.from_string(self.store, from_token)
-        )
-        parsed_from_token = SlidingSyncStreamToken(
-            stream_token=parsed_from_token.stream_token,
-            connection_position=connection_position,
-        )
-        from_token = self.get_success(parsed_from_token.to_string(self.store))
-
         # We now send another event to room1, so we should sync all the missing state.
         self.helper.send(room_id1, "msg", tok=user1_tok)