diff options
-rw-r--r-- | changelog.d/17479.misc | 1 | ||||
-rw-r--r-- | synapse/handlers/sliding_sync.py | 51 | ||||
-rw-r--r-- | synapse/storage/databases/main/stream.py | 10 | ||||
-rw-r--r-- | synapse/types/handlers/__init__.py | 17 | ||||
-rw-r--r-- | tests/rest/client/test_sync.py | 89 |
5 files changed, 138 insertions, 30 deletions
diff --git a/changelog.d/17479.misc b/changelog.d/17479.misc new file mode 100644 index 0000000000..4502f71662 --- /dev/null +++ b/changelog.d/17479.misc @@ -0,0 +1 @@ +Do not send down empty room entries down experimental sliding sync endpoint. diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 2b74f1c9c9..84f2fa18ff 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -619,6 +619,51 @@ class SlidingSyncHandler: # Fetch room data rooms: Dict[str, SlidingSyncResult.RoomResult] = {} + # Filter out rooms that haven't received updates and we've sent down + # previously. + if from_token: + rooms_should_send = set() + + # First we check if there are rooms that match a list/room + # subscription and have updates we need to send (i.e. either because + # we haven't sent the room down, or we have but there are missing + # updates). + for room_id in relevant_room_map: + status = await self.connection_store.have_sent_room( + sync_config, + from_token.connection_position, + room_id, + ) + if ( + # The room was never sent down before so the client needs to know + # about it regardless of any updates. + status.status == HaveSentRoomFlag.NEVER + # `PREVIOUSLY` literally means the "room was sent down before *AND* + # there are updates we haven't sent down" so we already know this + # room has updates. + or status.status == HaveSentRoomFlag.PREVIOUSLY + ): + rooms_should_send.add(room_id) + elif status.status == HaveSentRoomFlag.LIVE: + # We know that we've sent all updates up until `from_token`, + # so we just need to check if there have been updates since + # then. + pass + else: + assert_never(status.status) + + # We only need to check for new events since any state changes + # will also come down as new events. + rooms_that_have_updates = self.store.get_rooms_that_might_have_updates( + relevant_room_map.keys(), from_token.stream_token.room_key + ) + rooms_should_send.update(rooms_that_have_updates) + relevant_room_map = { + room_id: room_sync_config + for room_id, room_sync_config in relevant_room_map.items() + if room_id in rooms_should_send + } + @trace @tag_args async def handle_room(room_id: str) -> None: @@ -633,7 +678,9 @@ class SlidingSyncHandler: to_token=to_token, ) - rooms[room_id] = room_sync_result + # Filter out empty room results during incremental sync + if room_sync_result or not from_token: + rooms[room_id] = room_sync_result with start_active_span("sliding_sync.generate_room_entries"): await concurrently_execute(handle_room, relevant_room_map, 10) @@ -2198,7 +2245,7 @@ class SlidingSyncConnectionStore: a connection position of 5 might have totally different states on worker A and worker B. - One complication that we need to deal with here is needing to handle requests being + One complication that we need to deal with here is needing to handle requests being resent, i.e. if we sent down a room in a response that the client received, we must consider the room *not* sent when we get the request again. diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index b034361aec..4207e73c7f 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -2104,3 +2104,13 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): return RoomStreamToken(stream=last_position.stream - 1) return None + + def get_rooms_that_might_have_updates( + self, room_ids: StrCollection, from_token: RoomStreamToken + ) -> StrCollection: + """Filters given room IDs down to those that might have updates, i.e. + removes rooms that definitely do not have updates. + """ + return self._events_stream_cache.get_entities_changed( + room_ids, from_token.stream + ) diff --git a/synapse/types/handlers/__init__.py b/synapse/types/handlers/__init__.py index f3141b05a0..f26cc0e903 100644 --- a/synapse/types/handlers/__init__.py +++ b/synapse/types/handlers/__init__.py @@ -238,6 +238,17 @@ class SlidingSyncResult: notification_count: int highlight_count: int + def __bool__(self) -> bool: + return ( + # If this is the first time the client is seeing the room, we should not filter it out + # under any circumstance. + self.initial + # We need to let the client know if there are any new events + or bool(self.required_state) + or bool(self.timeline_events) + or bool(self.stripped_state) + ) + @attr.s(slots=True, frozen=True, auto_attribs=True) class SlidingWindowList: """ @@ -367,7 +378,11 @@ class SlidingSyncResult: to tell if the notifier needs to wait for more events when polling for events. """ - return bool(self.lists or self.rooms or self.extensions) + # We don't include `self.lists` here, as a) `lists` is always non-empty even if + # there are no changes, and b) since we're sorting rooms by `stream_ordering` of + # the latest activity, anything that would cause the order to change would end + # up in `self.rooms` and cause us to send down the change. + return bool(self.rooms or self.extensions) @staticmethod def empty(next_pos: SlidingSyncStreamToken) -> "SlidingSyncResult": diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py index 3e7b8f76a1..5abf1041be 100644 --- a/tests/rest/client/test_sync.py +++ b/tests/rest/client/test_sync.py @@ -69,7 +69,6 @@ from tests.federation.transport.test_knocking import ( ) from tests.server import TimedOutException from tests.test_utils.event_injection import create_event, mark_event_as_partial_state -from tests.unittest import skip_unless logger = logging.getLogger(__name__) @@ -1656,12 +1655,6 @@ class SlidingSyncTestCase(SlidingSyncBase): channel.json_body["rooms"][room_id]["timeline"], ) - # TODO: Once we remove `ops`, we should be able to add a `RoomResult.__bool__` to - # check if there are any updates since the `from_token`. - @skip_unless( - False, - "Once we remove ops from the Sliding Sync response, this test should pass", - ) def test_wait_for_new_data_timeout(self) -> None: """ Test to make sure that the Sliding Sync request waits for new data to arrive but @@ -1711,12 +1704,8 @@ class SlidingSyncTestCase(SlidingSyncBase): channel.await_result(timeout_ms=1200) self.assertEqual(channel.code, 200, channel.json_body) - # We still see rooms because that's how Sliding Sync lists work but we reached - # the timeout before seeing them - self.assertEqual( - [event["event_id"] for event in channel.json_body["rooms"].keys()], - [room_id], - ) + # There should be no room sent down. + self.assertFalse(channel.json_body["rooms"]) def test_filter_list(self) -> None: """ @@ -3556,19 +3545,7 @@ class SlidingSyncTestCase(SlidingSyncBase): response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) # Nothing to see for this banned user in the room in the token range - self.assertIsNone(response_body["rooms"][room_id1].get("timeline")) - # No events returned in the timeline so nothing is "live" - self.assertEqual( - response_body["rooms"][room_id1]["num_live"], - 0, - response_body["rooms"][room_id1], - ) - # There aren't anymore events to paginate to in this range - self.assertEqual( - response_body["rooms"][room_id1]["limited"], - False, - response_body["rooms"][room_id1], - ) + self.assertIsNone(response_body["rooms"].get(room_id1)) def test_rooms_no_required_state(self) -> None: """ @@ -3668,12 +3645,15 @@ class SlidingSyncTestCase(SlidingSyncBase): # This one doesn't exist in the room [EventTypes.Tombstone, ""], ], - "timeline_limit": 0, + "timeline_limit": 1, } } } _, from_token = self.do_sync(sync_body, tok=user1_tok) + # Send a message so the room comes down sync. + self.helper.send(room_id1, "msg", tok=user1_tok) + # Make the incremental Sliding Sync request response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) @@ -4880,6 +4860,61 @@ class SlidingSyncTestCase(SlidingSyncBase): self.assertEqual(response_body["rooms"][room_id1]["limited"], True) self.assertEqual(response_body["rooms"][room_id1]["initial"], True) + def test_rooms_with_no_updates_do_not_come_down_incremental_sync(self) -> None: + """ + Test that rooms with no updates are returned in subsequent incremental + syncs. + """ + + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok) + + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [], + "timeline_limit": 0, + } + } + } + + _, from_token = self.do_sync(sync_body, tok=user1_tok) + + # Make the incremental Sliding Sync request + response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) + + # Nothing has happened in the room, so the room should not come down + # /sync. + self.assertIsNone(response_body["rooms"].get(room_id1)) + + def test_empty_initial_room_comes_down_sync(self) -> None: + """ + Test that rooms come down /sync even with empty required state and + timeline limit in initial sync. + """ + + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok) + + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [], + "timeline_limit": 0, + } + } + } + + # Make the Sliding Sync request + response_body, _ = self.do_sync(sync_body, tok=user1_tok) + self.assertEqual(response_body["rooms"][room_id1]["initial"], True) + class SlidingSyncToDeviceExtensionTestCase(SlidingSyncBase): """Tests for the to-device sliding sync extension""" |