summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erikj@element.io>2024-07-30 09:30:44 +0100
committerGitHub <noreply@github.com>2024-07-30 09:30:44 +0100
commit34306be5aa7ebf7913dd28d048887802dc7e079b (patch)
treedf6b87b9abcafb1ddcc4d4f28cd46a03aef19987
parentSliding Sync: Track whether we have sent rooms down to clients (#17447) (diff)
downloadsynapse-34306be5aa7ebf7913dd28d048887802dc7e079b.tar.xz
Only send rooms with updates down sliding sync (#17479)
Rather than always including all rooms in range.

Also adds a pre-filter to rooms that checks the stream change cache to
see if anything might have happened.

Based on #17447

---------

Co-authored-by: Eric Eastwood <eric.eastwood@beta.gouv.fr>
-rw-r--r--changelog.d/17479.misc1
-rw-r--r--synapse/handlers/sliding_sync.py51
-rw-r--r--synapse/storage/databases/main/stream.py10
-rw-r--r--synapse/types/handlers/__init__.py17
-rw-r--r--tests/rest/client/test_sync.py89
5 files changed, 138 insertions, 30 deletions
diff --git a/changelog.d/17479.misc b/changelog.d/17479.misc
new file mode 100644
index 0000000000..4502f71662
--- /dev/null
+++ b/changelog.d/17479.misc
@@ -0,0 +1 @@
+Do not send down empty room entries down experimental sliding sync endpoint.
diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py
index 2b74f1c9c9..84f2fa18ff 100644
--- a/synapse/handlers/sliding_sync.py
+++ b/synapse/handlers/sliding_sync.py
@@ -619,6 +619,51 @@ class SlidingSyncHandler:
         # Fetch room data
         rooms: Dict[str, SlidingSyncResult.RoomResult] = {}
 
+        # Filter out rooms that haven't received updates and we've sent down
+        # previously.
+        if from_token:
+            rooms_should_send = set()
+
+            # First we check if there are rooms that match a list/room
+            # subscription and have updates we need to send (i.e. either because
+            # we haven't sent the room down, or we have but there are missing
+            # updates).
+            for room_id in relevant_room_map:
+                status = await self.connection_store.have_sent_room(
+                    sync_config,
+                    from_token.connection_position,
+                    room_id,
+                )
+                if (
+                    # The room was never sent down before so the client needs to know
+                    # about it regardless of any updates.
+                    status.status == HaveSentRoomFlag.NEVER
+                    # `PREVIOUSLY` literally means the "room was sent down before *AND*
+                    # there are updates we haven't sent down" so we already know this
+                    # room has updates.
+                    or status.status == HaveSentRoomFlag.PREVIOUSLY
+                ):
+                    rooms_should_send.add(room_id)
+                elif status.status == HaveSentRoomFlag.LIVE:
+                    # We know that we've sent all updates up until `from_token`,
+                    # so we just need to check if there have been updates since
+                    # then.
+                    pass
+                else:
+                    assert_never(status.status)
+
+            # We only need to check for new events since any state changes
+            # will also come down as new events.
+            rooms_that_have_updates = self.store.get_rooms_that_might_have_updates(
+                relevant_room_map.keys(), from_token.stream_token.room_key
+            )
+            rooms_should_send.update(rooms_that_have_updates)
+            relevant_room_map = {
+                room_id: room_sync_config
+                for room_id, room_sync_config in relevant_room_map.items()
+                if room_id in rooms_should_send
+            }
+
         @trace
         @tag_args
         async def handle_room(room_id: str) -> None:
@@ -633,7 +678,9 @@ class SlidingSyncHandler:
                 to_token=to_token,
             )
 
-            rooms[room_id] = room_sync_result
+            # Filter out empty room results during incremental sync
+            if room_sync_result or not from_token:
+                rooms[room_id] = room_sync_result
 
         with start_active_span("sliding_sync.generate_room_entries"):
             await concurrently_execute(handle_room, relevant_room_map, 10)
@@ -2198,7 +2245,7 @@ class SlidingSyncConnectionStore:
     a connection position of 5 might have totally different states on worker A and
     worker B.
 
-     One complication that we need to deal with here is needing to handle requests being
+    One complication that we need to deal with here is needing to handle requests being
     resent, i.e. if we sent down a room in a response that the client received, we must
     consider the room *not* sent when we get the request again.
 
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index b034361aec..4207e73c7f 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -2104,3 +2104,13 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             return RoomStreamToken(stream=last_position.stream - 1)
 
         return None
+
+    def get_rooms_that_might_have_updates(
+        self, room_ids: StrCollection, from_token: RoomStreamToken
+    ) -> StrCollection:
+        """Filters given room IDs down to those that might have updates, i.e.
+        removes rooms that definitely do not have updates.
+        """
+        return self._events_stream_cache.get_entities_changed(
+            room_ids, from_token.stream
+        )
diff --git a/synapse/types/handlers/__init__.py b/synapse/types/handlers/__init__.py
index f3141b05a0..f26cc0e903 100644
--- a/synapse/types/handlers/__init__.py
+++ b/synapse/types/handlers/__init__.py
@@ -238,6 +238,17 @@ class SlidingSyncResult:
         notification_count: int
         highlight_count: int
 
+        def __bool__(self) -> bool:
+            return (
+                # If this is the first time the client is seeing the room, we should not filter it out
+                # under any circumstance.
+                self.initial
+                # We need to let the client know if there are any new events
+                or bool(self.required_state)
+                or bool(self.timeline_events)
+                or bool(self.stripped_state)
+            )
+
     @attr.s(slots=True, frozen=True, auto_attribs=True)
     class SlidingWindowList:
         """
@@ -367,7 +378,11 @@ class SlidingSyncResult:
         to tell if the notifier needs to wait for more events when polling for
         events.
         """
-        return bool(self.lists or self.rooms or self.extensions)
+        # We don't include `self.lists` here, as a) `lists` is always non-empty even if
+        # there are no changes, and b) since we're sorting rooms by `stream_ordering` of
+        # the latest activity, anything that would cause the order to change would end
+        # up in `self.rooms` and cause us to send down the change.
+        return bool(self.rooms or self.extensions)
 
     @staticmethod
     def empty(next_pos: SlidingSyncStreamToken) -> "SlidingSyncResult":
diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py
index 3e7b8f76a1..5abf1041be 100644
--- a/tests/rest/client/test_sync.py
+++ b/tests/rest/client/test_sync.py
@@ -69,7 +69,6 @@ from tests.federation.transport.test_knocking import (
 )
 from tests.server import TimedOutException
 from tests.test_utils.event_injection import create_event, mark_event_as_partial_state
-from tests.unittest import skip_unless
 
 logger = logging.getLogger(__name__)
 
@@ -1656,12 +1655,6 @@ class SlidingSyncTestCase(SlidingSyncBase):
             channel.json_body["rooms"][room_id]["timeline"],
         )
 
-    # TODO: Once we remove `ops`, we should be able to add a `RoomResult.__bool__` to
-    # check if there are any updates since the `from_token`.
-    @skip_unless(
-        False,
-        "Once we remove ops from the Sliding Sync response, this test should pass",
-    )
     def test_wait_for_new_data_timeout(self) -> None:
         """
         Test to make sure that the Sliding Sync request waits for new data to arrive but
@@ -1711,12 +1704,8 @@ class SlidingSyncTestCase(SlidingSyncBase):
         channel.await_result(timeout_ms=1200)
         self.assertEqual(channel.code, 200, channel.json_body)
 
-        # We still see rooms because that's how Sliding Sync lists work but we reached
-        # the timeout before seeing them
-        self.assertEqual(
-            [event["event_id"] for event in channel.json_body["rooms"].keys()],
-            [room_id],
-        )
+        # There should be no room sent down.
+        self.assertFalse(channel.json_body["rooms"])
 
     def test_filter_list(self) -> None:
         """
@@ -3556,19 +3545,7 @@ class SlidingSyncTestCase(SlidingSyncBase):
         response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
 
         # Nothing to see for this banned user in the room in the token range
-        self.assertIsNone(response_body["rooms"][room_id1].get("timeline"))
-        # No events returned in the timeline so nothing is "live"
-        self.assertEqual(
-            response_body["rooms"][room_id1]["num_live"],
-            0,
-            response_body["rooms"][room_id1],
-        )
-        # There aren't anymore events to paginate to in this range
-        self.assertEqual(
-            response_body["rooms"][room_id1]["limited"],
-            False,
-            response_body["rooms"][room_id1],
-        )
+        self.assertIsNone(response_body["rooms"].get(room_id1))
 
     def test_rooms_no_required_state(self) -> None:
         """
@@ -3668,12 +3645,15 @@ class SlidingSyncTestCase(SlidingSyncBase):
                         # This one doesn't exist in the room
                         [EventTypes.Tombstone, ""],
                     ],
-                    "timeline_limit": 0,
+                    "timeline_limit": 1,
                 }
             }
         }
         _, from_token = self.do_sync(sync_body, tok=user1_tok)
 
+        # Send a message so the room comes down sync.
+        self.helper.send(room_id1, "msg", tok=user1_tok)
+
         # Make the incremental Sliding Sync request
         response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
 
@@ -4880,6 +4860,61 @@ class SlidingSyncTestCase(SlidingSyncBase):
         self.assertEqual(response_body["rooms"][room_id1]["limited"], True)
         self.assertEqual(response_body["rooms"][room_id1]["initial"], True)
 
+    def test_rooms_with_no_updates_do_not_come_down_incremental_sync(self) -> None:
+        """
+        Test that rooms with no updates are returned in subsequent incremental
+        syncs.
+        """
+
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+
+        room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
+
+        sync_body = {
+            "lists": {
+                "foo-list": {
+                    "ranges": [[0, 1]],
+                    "required_state": [],
+                    "timeline_limit": 0,
+                }
+            }
+        }
+
+        _, from_token = self.do_sync(sync_body, tok=user1_tok)
+
+        # Make the incremental Sliding Sync request
+        response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
+
+        # Nothing has happened in the room, so the room should not come down
+        # /sync.
+        self.assertIsNone(response_body["rooms"].get(room_id1))
+
+    def test_empty_initial_room_comes_down_sync(self) -> None:
+        """
+        Test that rooms come down /sync even with empty required state and
+        timeline limit in initial sync.
+        """
+
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+
+        room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
+
+        sync_body = {
+            "lists": {
+                "foo-list": {
+                    "ranges": [[0, 1]],
+                    "required_state": [],
+                    "timeline_limit": 0,
+                }
+            }
+        }
+
+        # Make the Sliding Sync request
+        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
+        self.assertEqual(response_body["rooms"][room_id1]["initial"], True)
+
 
 class SlidingSyncToDeviceExtensionTestCase(SlidingSyncBase):
     """Tests for the to-device sliding sync extension"""