diff options
author | Erik Johnston <erikj@element.io> | 2024-07-29 22:45:48 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-07-29 22:45:48 +0100 |
commit | be4a16ff445c9dfba04aeaed695afb3a56e204f7 (patch) | |
tree | 62de0ad9059e727a072ea06743afbf21765991ae /tests/rest/client/test_sync.py | |
parent | Refactor Sliding Sync tests to better utilize the `SlidingSyncBase.do_sync(..... (diff) | |
download | synapse-be4a16ff445c9dfba04aeaed695afb3a56e204f7.tar.xz |
Sliding Sync: Track whether we have sent rooms down to clients (#17447)
The basic idea is that we introduce a new token for a sliding sync connection, which stores the mapping of room to room "status" (i.e. have we sent the room down?). This token allows us to handle duplicate requests properly. In future it can be used to store more "per-connection" information safely. In future this should be migrated into the DB, so its important that we try to reduce the number of syncs where we need to update the per-connection information. In this PoC this only happens when we: a) send down a set of room for the first time, or b) we have previously sent down a room and there are updates but we are not sending the room down the sync (due to not falling in a list range) Co-authored-by: Eric Eastwood <eric.eastwood@beta.gouv.fr>
Diffstat (limited to 'tests/rest/client/test_sync.py')
-rw-r--r-- | tests/rest/client/test_sync.py | 452 |
1 files changed, 449 insertions, 3 deletions
diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py index 2bbbd95a76..3e7b8f76a1 100644 --- a/tests/rest/client/test_sync.py +++ b/tests/rest/client/test_sync.py @@ -59,6 +59,7 @@ from synapse.types import ( StreamToken, UserID, ) +from synapse.types.handlers import SlidingSyncConfig from synapse.util import Clock from synapse.util.stringutils import random_string @@ -3676,13 +3677,52 @@ class SlidingSyncTestCase(SlidingSyncBase): # Make the incremental Sliding Sync request response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) + # We only return updates but only if we've sent the room down the + # connection before. + self.assertIsNone(response_body["rooms"][room_id1].get("required_state")) + self.assertIsNone(response_body["rooms"][room_id1].get("invite_state")) + + def test_rooms_required_state_incremental_sync_restart(self) -> None: + """ + Test `rooms.required_state` returns requested state events in the room during an + incremental sync, after a restart (and so the in memory caches are reset). + """ + + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) + self.helper.join(room_id1, user1_id, tok=user1_tok) + + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [ + [EventTypes.Create, ""], + [EventTypes.RoomHistoryVisibility, ""], + # This one doesn't exist in the room + [EventTypes.Tombstone, ""], + ], + "timeline_limit": 1, + } + } + } + _, from_token = self.do_sync(sync_body, tok=user1_tok) + + # Reset the in-memory cache + self.hs.get_sliding_sync_handler().connection_store._connections.clear() + + # Make the Sliding Sync request + response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) + + # If the cache has been cleared then we do expect the state to come down state_map = self.get_success( self.storage_controllers.state.get_current_state(room_id1) ) - # The returned state doesn't change from initial to incremental sync. In the - # future, we will only return updates but only if we've sent the room down the - # connection before. self._assertRequiredStateIncludes( response_body["rooms"][room_id1]["required_state"], { @@ -4434,6 +4474,412 @@ class SlidingSyncTestCase(SlidingSyncBase): # `world_readable` but currently we don't support this. self.assertIsNone(response_body["rooms"].get(room_id1), response_body["rooms"]) + def test_rooms_required_state_incremental_sync_LIVE(self) -> None: + """Test that we only get state updates in incremental sync for rooms + we've already seen (LIVE). + """ + + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) + self.helper.join(room_id1, user1_id, tok=user1_tok) + + # Make the Sliding Sync request + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [ + [EventTypes.Create, ""], + [EventTypes.RoomHistoryVisibility, ""], + # This one doesn't exist in the room + [EventTypes.Name, ""], + ], + "timeline_limit": 0, + } + } + } + + response_body, from_token = self.do_sync(sync_body, tok=user1_tok) + + state_map = self.get_success( + self.storage_controllers.state.get_current_state(room_id1) + ) + + self._assertRequiredStateIncludes( + response_body["rooms"][room_id1]["required_state"], + { + state_map[(EventTypes.Create, "")], + state_map[(EventTypes.RoomHistoryVisibility, "")], + }, + exact=True, + ) + + # Send a state event + self.helper.send_state( + room_id1, EventTypes.Name, body={"name": "foo"}, tok=user2_tok + ) + + response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) + + state_map = self.get_success( + self.storage_controllers.state.get_current_state(room_id1) + ) + + self.assertNotIn("initial", response_body["rooms"][room_id1]) + self._assertRequiredStateIncludes( + response_body["rooms"][room_id1]["required_state"], + { + state_map[(EventTypes.Name, "")], + }, + exact=True, + ) + + @parameterized.expand([(False,), (True,)]) + def test_rooms_timeline_incremental_sync_PREVIOUSLY(self, limited: bool) -> None: + """ + Test getting room data where we have previously sent down the room, but + we missed sending down some timeline events previously and so its status + is considered PREVIOUSLY. + + There are two versions of this test, one where there are more messages + than the timeline limit, and one where there isn't. + """ + + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok) + room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok) + + self.helper.send(room_id1, "msg", tok=user1_tok) + + timeline_limit = 5 + conn_id = "conn_id" + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 0]], + "required_state": [], + "timeline_limit": timeline_limit, + } + }, + "conn_id": "conn_id", + } + + # The first room gets sent down the initial sync + response_body, initial_from_token = self.do_sync(sync_body, tok=user1_tok) + self.assertCountEqual( + response_body["rooms"].keys(), {room_id1}, response_body["rooms"] + ) + + # We now send down some events in room1 (depending on the test param). + expected_events = [] # The set of events in the timeline + if limited: + for _ in range(10): + resp = self.helper.send(room_id1, "msg1", tok=user1_tok) + expected_events.append(resp["event_id"]) + else: + resp = self.helper.send(room_id1, "msg1", tok=user1_tok) + expected_events.append(resp["event_id"]) + + # A second messages happens in the other room, so room1 won't get sent down. + self.helper.send(room_id2, "msg", tok=user1_tok) + + # Only the second room gets sent down sync. + response_body, from_token = self.do_sync( + sync_body, since=initial_from_token, tok=user1_tok + ) + + self.assertCountEqual( + response_body["rooms"].keys(), {room_id2}, response_body["rooms"] + ) + + # FIXME: This is a hack to record that the first room wasn't sent down + # sync, as we don't implement that currently. + sliding_sync_handler = self.hs.get_sliding_sync_handler() + requester = self.get_success( + self.hs.get_auth().get_user_by_access_token(user1_tok) + ) + sync_config = SlidingSyncConfig( + user=requester.user, + requester=requester, + conn_id=conn_id, + ) + + parsed_initial_from_token = self.get_success( + SlidingSyncStreamToken.from_string(self.store, initial_from_token) + ) + connection_position = self.get_success( + sliding_sync_handler.connection_store.record_rooms( + sync_config, + parsed_initial_from_token, + sent_room_ids=[], + unsent_room_ids=[room_id1], + ) + ) + + # FIXME: Now fix up `from_token` with new connect position above. + parsed_from_token = self.get_success( + SlidingSyncStreamToken.from_string(self.store, from_token) + ) + parsed_from_token = SlidingSyncStreamToken( + stream_token=parsed_from_token.stream_token, + connection_position=connection_position, + ) + from_token = self.get_success(parsed_from_token.to_string(self.store)) + + # We now send another event to room1, so we should sync all the missing events. + resp = self.helper.send(room_id1, "msg2", tok=user1_tok) + expected_events.append(resp["event_id"]) + + # This sync should contain the messages from room1 not yet sent down. + response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) + + self.assertCountEqual( + response_body["rooms"].keys(), {room_id1}, response_body["rooms"] + ) + self.assertNotIn("initial", response_body["rooms"][room_id1]) + + self.assertEqual( + [ev["event_id"] for ev in response_body["rooms"][room_id1]["timeline"]], + expected_events[-timeline_limit:], + ) + self.assertEqual(response_body["rooms"][room_id1]["limited"], limited) + self.assertEqual(response_body["rooms"][room_id1].get("required_state"), None) + + def test_rooms_required_state_incremental_sync_PREVIOUSLY(self) -> None: + """ + Test getting room data where we have previously sent down the room, but + we missed sending down some state previously and so its status is + considered PREVIOUSLY. + """ + + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok) + room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok) + + self.helper.send(room_id1, "msg", tok=user1_tok) + + conn_id = "conn_id" + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 0]], + "required_state": [ + [EventTypes.Create, ""], + [EventTypes.RoomHistoryVisibility, ""], + # This one doesn't exist in the room + [EventTypes.Name, ""], + ], + "timeline_limit": 0, + } + }, + "conn_id": "conn_id", + } + + # The first room gets sent down the initial sync + response_body, initial_from_token = self.do_sync(sync_body, tok=user1_tok) + self.assertCountEqual( + response_body["rooms"].keys(), {room_id1}, response_body["rooms"] + ) + + # We now send down some state in room1 + resp = self.helper.send_state( + room_id1, EventTypes.Name, {"name": "foo"}, tok=user1_tok + ) + name_change_id = resp["event_id"] + + # A second messages happens in the other room, so room1 won't get sent down. + self.helper.send(room_id2, "msg", tok=user1_tok) + + # Only the second room gets sent down sync. + response_body, from_token = self.do_sync( + sync_body, since=initial_from_token, tok=user1_tok + ) + + self.assertCountEqual( + response_body["rooms"].keys(), {room_id2}, response_body["rooms"] + ) + + # FIXME: This is a hack to record that the first room wasn't sent down + # sync, as we don't implement that currently. + sliding_sync_handler = self.hs.get_sliding_sync_handler() + requester = self.get_success( + self.hs.get_auth().get_user_by_access_token(user1_tok) + ) + sync_config = SlidingSyncConfig( + user=requester.user, + requester=requester, + conn_id=conn_id, + ) + + parsed_initial_from_token = self.get_success( + SlidingSyncStreamToken.from_string(self.store, initial_from_token) + ) + connection_position = self.get_success( + sliding_sync_handler.connection_store.record_rooms( + sync_config, + parsed_initial_from_token, + sent_room_ids=[], + unsent_room_ids=[room_id1], + ) + ) + + # FIXME: Now fix up `from_token` with new connect position above. + parsed_from_token = self.get_success( + SlidingSyncStreamToken.from_string(self.store, from_token) + ) + parsed_from_token = SlidingSyncStreamToken( + stream_token=parsed_from_token.stream_token, + connection_position=connection_position, + ) + from_token = self.get_success(parsed_from_token.to_string(self.store)) + + # We now send another event to room1, so we should sync all the missing state. + self.helper.send(room_id1, "msg", tok=user1_tok) + + # This sync should contain the state changes from room1. + response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) + + self.assertCountEqual( + response_body["rooms"].keys(), {room_id1}, response_body["rooms"] + ) + self.assertNotIn("initial", response_body["rooms"][room_id1]) + + # We should only see the name change. + self.assertEqual( + [ + ev["event_id"] + for ev in response_body["rooms"][room_id1]["required_state"] + ], + [name_change_id], + ) + + def test_rooms_required_state_incremental_sync_NEVER(self) -> None: + """ + Test getting `required_state` where we have NEVER sent down the room before + """ + + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok) + room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok) + + self.helper.send(room_id1, "msg", tok=user1_tok) + + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 0]], + "required_state": [ + [EventTypes.Create, ""], + [EventTypes.RoomHistoryVisibility, ""], + # This one doesn't exist in the room + [EventTypes.Name, ""], + ], + "timeline_limit": 1, + } + }, + } + + # A message happens in the other room, so room1 won't get sent down. + self.helper.send(room_id2, "msg", tok=user1_tok) + + # Only the second room gets sent down sync. + response_body, from_token = self.do_sync(sync_body, tok=user1_tok) + + self.assertCountEqual( + response_body["rooms"].keys(), {room_id2}, response_body["rooms"] + ) + + # We now send another event to room1, so we should send down the full + # room. + self.helper.send(room_id1, "msg2", tok=user1_tok) + + # This sync should contain the messages from room1 not yet sent down. + response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) + + self.assertCountEqual( + response_body["rooms"].keys(), {room_id1}, response_body["rooms"] + ) + + self.assertEqual(response_body["rooms"][room_id1]["initial"], True) + + state_map = self.get_success( + self.storage_controllers.state.get_current_state(room_id1) + ) + + self._assertRequiredStateIncludes( + response_body["rooms"][room_id1]["required_state"], + { + state_map[(EventTypes.Create, "")], + state_map[(EventTypes.RoomHistoryVisibility, "")], + }, + exact=True, + ) + + def test_rooms_timeline_incremental_sync_NEVER(self) -> None: + """ + Test getting timeline room data where we have NEVER sent down the room + before + """ + + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok) + room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok) + + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 0]], + "required_state": [], + "timeline_limit": 5, + } + }, + } + + expected_events = [] + for _ in range(4): + resp = self.helper.send(room_id1, "msg", tok=user1_tok) + expected_events.append(resp["event_id"]) + + # A message happens in the other room, so room1 won't get sent down. + self.helper.send(room_id2, "msg", tok=user1_tok) + + # Only the second room gets sent down sync. + response_body, from_token = self.do_sync(sync_body, tok=user1_tok) + + self.assertCountEqual( + response_body["rooms"].keys(), {room_id2}, response_body["rooms"] + ) + + # We now send another event to room1 so it comes down sync + resp = self.helper.send(room_id1, "msg2", tok=user1_tok) + expected_events.append(resp["event_id"]) + + # This sync should contain the messages from room1 not yet sent down. + response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) + + self.assertCountEqual( + response_body["rooms"].keys(), {room_id1}, response_body["rooms"] + ) + + self.assertEqual( + [ev["event_id"] for ev in response_body["rooms"][room_id1]["timeline"]], + expected_events, + ) + self.assertEqual(response_body["rooms"][room_id1]["limited"], True) + self.assertEqual(response_body["rooms"][room_id1]["initial"], True) + class SlidingSyncToDeviceExtensionTestCase(SlidingSyncBase): """Tests for the to-device sliding sync extension""" |