diff options
-rw-r--r-- | synapse/handlers/account_data.py | 1 | ||||
-rw-r--r-- | synapse/handlers/receipts.py | 4 | ||||
-rw-r--r-- | synapse/handlers/sliding_sync.py | 222 | ||||
-rw-r--r-- | synapse/rest/client/sync.py | 6 | ||||
-rw-r--r-- | synapse/types/handlers/__init__.py | 18 | ||||
-rw-r--r-- | synapse/types/rest/client/__init__.py | 18 | ||||
-rw-r--r-- | tests/rest/client/test_sync.py | 835 |
7 files changed, 870 insertions, 234 deletions
diff --git a/synapse/handlers/account_data.py b/synapse/handlers/account_data.py index 97a463d8d0..8041326cd5 100644 --- a/synapse/handlers/account_data.py +++ b/synapse/handlers/account_data.py @@ -327,6 +327,7 @@ class AccountDataEventSource(EventSource[int, JsonDict]): explicit_room_id: Optional[str] = None, ) -> Tuple[List[JsonDict], int]: user_id = user.to_string() + # TODO: Take `to_key` into account last_stream_id = from_key current_stream_id = self.store.get_max_account_data_stream_id() diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index 8674a8fcdd..d04c76be2a 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -286,8 +286,10 @@ class ReceiptEventSource(EventSource[MultiWriterStreamToken, JsonMapping]): room_ids: Iterable[str], is_guest: bool, explicit_room_id: Optional[str] = None, + to_key: Optional[MultiWriterStreamToken] = None, ) -> Tuple[List[JsonMapping], MultiWriterStreamToken]: - to_key = self.get_current_key() + if to_key is None: + to_key = self.get_current_key() if from_key == to_key: return [], to_key diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 3231574402..0c7299137d 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -47,6 +47,7 @@ from synapse.types import ( DeviceListUpdates, JsonDict, JsonMapping, + MultiWriterStreamToken, PersistedEventPosition, Requester, RoomStreamToken, @@ -630,7 +631,8 @@ class SlidingSyncHandler: extensions = await self.get_extensions_response( sync_config=sync_config, - lists=lists, + actual_lists=lists, + actual_room_ids=set(rooms.keys()), from_token=from_token, to_token=to_token, ) @@ -1800,7 +1802,8 @@ class SlidingSyncHandler: async def get_extensions_response( self, sync_config: SlidingSyncConfig, - lists: Dict[str, SlidingSyncResult.SlidingWindowList], + actual_lists: Dict[str, SlidingSyncResult.SlidingWindowList], + actual_room_ids: Set[str], to_token: StreamToken, from_token: Optional[SlidingSyncStreamToken], ) -> SlidingSyncResult.Extensions: @@ -1808,7 +1811,9 @@ class SlidingSyncHandler: Args: sync_config: Sync configuration - lists: Sliding window API. A map of list key to list results. + actual_lists: Sliding window API. A map of list key to list results in the + Sliding Sync response. + actual_room_ids: The actual room IDs in the the Sliding Sync response. to_token: The point in the stream to sync up to. from_token: The point in the stream to sync from. """ @@ -1837,18 +1842,102 @@ class SlidingSyncHandler: if sync_config.extensions.account_data is not None: account_data_response = await self.get_account_data_extension_response( sync_config=sync_config, - lists=lists, + actual_lists=actual_lists, + actual_room_ids=actual_room_ids, account_data_request=sync_config.extensions.account_data, to_token=to_token, from_token=from_token, ) + receipts_response = None + if sync_config.extensions.receipts is not None: + receipts_response = await self.get_receipts_extension_response( + sync_config=sync_config, + actual_lists=actual_lists, + actual_room_ids=actual_room_ids, + receipts_request=sync_config.extensions.receipts, + to_token=to_token, + from_token=from_token, + ) + return SlidingSyncResult.Extensions( to_device=to_device_response, e2ee=e2ee_response, account_data=account_data_response, + receipts=receipts_response, ) + def find_relevant_room_ids_for_extension( + self, + requested_lists: Optional[List[str]], + requested_room_ids: Optional[List[str]], + actual_lists: Dict[str, SlidingSyncResult.SlidingWindowList], + actual_room_ids: Set[str], + ) -> Set[str]: + """ + Handle the reserved `lists`/`rooms` keys for extensions. Extensions should only + return results for rooms in the Sliding Sync response. This matches up the + requested rooms/lists with the actual lists/rooms in the Sliding Sync response. + + {"lists": []} // Do not process any lists. + {"lists": ["rooms", "dms"]} // Process only a subset of lists. + {"lists": ["*"]} // Process all lists defined in the Sliding Window API. (This is the default.) + + {"rooms": []} // Do not process any specific rooms. + {"rooms": ["!a:b", "!c:d"]} // Process only a subset of room subscriptions. + {"rooms": ["*"]} // Process all room subscriptions defined in the Room Subscription API. (This is the default.) + + Args: + requested_lists: The `lists` from the extension request. + requested_room_ids: The `rooms` from the extension request. + actual_lists: The actual lists from the Sliding Sync response. + actual_room_ids: The actual room subscriptions from the Sliding Sync request. + """ + + # We only want to include account data for rooms that are already in the sliding + # sync response AND that were requested in the account data request. + relevant_room_ids: Set[str] = set() + + # See what rooms from the room subscriptions we should get account data for + if requested_room_ids is not None: + for room_id in requested_room_ids: + # A wildcard means we process all rooms from the room subscriptions + if room_id == "*": + relevant_room_ids.update(actual_room_ids) + break + + if room_id in actual_room_ids: + relevant_room_ids.add(room_id) + + # See what rooms from the sliding window lists we should get account data for + if requested_lists is not None: + for list_key in requested_lists: + # Just some typing because we share the variable name in multiple places + actual_list: Optional[SlidingSyncResult.SlidingWindowList] = None + + # A wildcard means we process rooms from all lists + if list_key == "*": + for actual_list in actual_lists.values(): + # We only expect a single SYNC operation for any list + assert len(actual_list.ops) == 1 + sync_op = actual_list.ops[0] + assert sync_op.op == OperationType.SYNC + + relevant_room_ids.update(sync_op.room_ids) + + break + + actual_list = actual_lists.get(list_key) + if actual_list is not None: + # We only expect a single SYNC operation for any list + assert len(actual_list.ops) == 1 + sync_op = actual_list.ops[0] + assert sync_op.op == OperationType.SYNC + + relevant_room_ids.update(sync_op.room_ids) + + return relevant_room_ids + async def get_to_device_extension_response( self, sync_config: SlidingSyncConfig, @@ -1976,7 +2065,8 @@ class SlidingSyncHandler: async def get_account_data_extension_response( self, sync_config: SlidingSyncConfig, - lists: Dict[str, SlidingSyncResult.SlidingWindowList], + actual_lists: Dict[str, SlidingSyncResult.SlidingWindowList], + actual_room_ids: Set[str], account_data_request: SlidingSyncConfig.Extensions.AccountDataExtension, to_token: StreamToken, from_token: Optional[SlidingSyncStreamToken], @@ -1985,7 +2075,9 @@ class SlidingSyncHandler: Args: sync_config: Sync configuration - lists: Sliding window API. A map of list key to list results. + actual_lists: Sliding window API. A map of list key to list results in the + Sliding Sync response. + actual_room_ids: The actual room IDs in the the Sliding Sync response. account_data_request: The account_data extension from the request to_token: The point in the stream to sync up to. from_token: The point in the stream to sync from. @@ -1998,6 +2090,7 @@ class SlidingSyncHandler: global_account_data_map: Mapping[str, JsonMapping] = {} if from_token is not None: + # TODO: This should take into account the `from_token` and `to_token` global_account_data_map = ( await self.store.get_updated_global_account_data_for_user( user_id, from_token.stream_token.account_data_key @@ -2009,76 +2102,40 @@ class SlidingSyncHandler: ) if have_push_rules_changed: global_account_data_map = dict(global_account_data_map) + # TODO: This should take into account the `from_token` and `to_token` global_account_data_map[AccountDataTypes.PUSH_RULES] = ( await self.push_rules_handler.push_rules_for_user(sync_config.user) ) else: + # TODO: This should take into account the `to_token` all_global_account_data = await self.store.get_global_account_data_for_user( user_id ) global_account_data_map = dict(all_global_account_data) + # TODO: This should take into account the `to_token` global_account_data_map[AccountDataTypes.PUSH_RULES] = ( await self.push_rules_handler.push_rules_for_user(sync_config.user) ) - # We only want to include account data for rooms that are already in the sliding - # sync response AND that were requested in the account data request. - relevant_room_ids: Set[str] = set() - - # See what rooms from the room subscriptions we should get account data for - if ( - account_data_request.rooms is not None - and sync_config.room_subscriptions is not None - ): - actual_room_ids = sync_config.room_subscriptions.keys() - - for room_id in account_data_request.rooms: - # A wildcard means we process all rooms from the room subscriptions - if room_id == "*": - relevant_room_ids.update(sync_config.room_subscriptions.keys()) - break - - if room_id in actual_room_ids: - relevant_room_ids.add(room_id) - - # See what rooms from the sliding window lists we should get account data for - if account_data_request.lists is not None: - for list_key in account_data_request.lists: - # Just some typing because we share the variable name in multiple places - actual_list: Optional[SlidingSyncResult.SlidingWindowList] = None - - # A wildcard means we process rooms from all lists - if list_key == "*": - for actual_list in lists.values(): - # We only expect a single SYNC operation for any list - assert len(actual_list.ops) == 1 - sync_op = actual_list.ops[0] - assert sync_op.op == OperationType.SYNC - - relevant_room_ids.update(sync_op.room_ids) - - break - - actual_list = lists.get(list_key) - if actual_list is not None: - # We only expect a single SYNC operation for any list - assert len(actual_list.ops) == 1 - sync_op = actual_list.ops[0] - assert sync_op.op == OperationType.SYNC - - relevant_room_ids.update(sync_op.room_ids) - # Fetch room account data account_data_by_room_map: Mapping[str, Mapping[str, JsonMapping]] = {} + relevant_room_ids = self.find_relevant_room_ids_for_extension( + requested_lists=account_data_request.lists, + requested_room_ids=account_data_request.rooms, + actual_lists=actual_lists, + actual_room_ids=actual_room_ids, + ) if len(relevant_room_ids) > 0: if from_token is not None: + # TODO: This should take into account the `from_token` and `to_token` account_data_by_room_map = ( await self.store.get_updated_room_account_data_for_user( user_id, from_token.stream_token.account_data_key ) ) else: + # TODO: This should take into account the `to_token` account_data_by_room_map = ( await self.store.get_room_account_data_for_user(user_id) ) @@ -2094,3 +2151,62 @@ class SlidingSyncHandler: global_account_data_map=global_account_data_map, account_data_by_room_map=account_data_by_room_map, ) + + async def get_receipts_extension_response( + self, + sync_config: SlidingSyncConfig, + actual_lists: Dict[str, SlidingSyncResult.SlidingWindowList], + actual_room_ids: Set[str], + receipts_request: SlidingSyncConfig.Extensions.ReceiptsExtension, + to_token: StreamToken, + from_token: Optional[SlidingSyncStreamToken], + ) -> Optional[SlidingSyncResult.Extensions.ReceiptsExtension]: + """Handle Receipts extension (MSC3960) + + Args: + sync_config: Sync configuration + actual_lists: Sliding window API. A map of list key to list results in the + Sliding Sync response. + actual_room_ids: The actual room IDs in the the Sliding Sync response. + account_data_request: The account_data extension from the request + to_token: The point in the stream to sync up to. + from_token: The point in the stream to sync from. + """ + # Skip if the extension is not enabled + if not receipts_request.enabled: + return None + + relevant_room_ids = self.find_relevant_room_ids_for_extension( + requested_lists=receipts_request.lists, + requested_room_ids=receipts_request.rooms, + actual_lists=actual_lists, + actual_room_ids=actual_room_ids, + ) + + room_id_to_receipt_map: Mapping[str, JsonMapping] = {} + if len(relevant_room_ids) > 0: + receipt_source = self.event_sources.sources.receipt + receipts, _ = await receipt_source.get_new_events( + user=sync_config.user, + from_key=( + from_token.stream_token.receipt_key + if from_token + else MultiWriterStreamToken(stream=0) + ), + to_key=to_token.receipt_key, + # This is a dummy value and isn't used in the function + limit=0, + room_ids=relevant_room_ids, + is_guest=False, + ) + + for receipt in receipts: + # These fields should exist for every receipt + room_id = receipt["room_id"] + type = receipt["type"] + content = receipt["content"] + room_id_to_receipt_map[room_id] = {"type": type, "content": content} + + return SlidingSyncResult.Extensions.ReceiptsExtension( + room_id_to_receipt_map=room_id_to_receipt_map, + ) diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py index 7cf1f56435..268c6521e0 100644 --- a/synapse/rest/client/sync.py +++ b/synapse/rest/client/sync.py @@ -1134,6 +1134,12 @@ class SlidingSyncRestServlet(RestServlet): }, } + if extensions.receipts is not None: + serialized_extensions["receipts"] = { + # Same as the the top-level `account_data.events` field in Sync v2. + "rooms": extensions.receipts.room_id_to_receipt_map, + } + return serialized_extensions diff --git a/synapse/types/handlers/__init__.py b/synapse/types/handlers/__init__.py index 479222a18d..488ebd8365 100644 --- a/synapse/types/handlers/__init__.py +++ b/synapse/types/handlers/__init__.py @@ -349,12 +349,28 @@ class SlidingSyncResult: self.global_account_data_map or self.account_data_by_room_map ) + @attr.s(slots=True, frozen=True, auto_attribs=True) + class ReceiptsExtension: + """The Receipts extension (MSC3960) + + Attributes: + room_id_to_receipt_map: Mapping from room_id to `m.receipt` event (type, content) + """ + + room_id_to_receipt_map: Mapping[str, JsonMapping] + + def __bool__(self) -> bool: + return bool(self.room_id_to_receipt_map) + to_device: Optional[ToDeviceExtension] = None e2ee: Optional[E2eeExtension] = None account_data: Optional[AccountDataExtension] = None + receipts: Optional[ReceiptsExtension] = None def __bool__(self) -> bool: - return bool(self.to_device or self.e2ee or self.account_data) + return bool( + self.to_device or self.e2ee or self.account_data or self.receipts + ) next_pos: SlidingSyncStreamToken lists: Dict[str, SlidingWindowList] diff --git a/synapse/types/rest/client/__init__.py b/synapse/types/rest/client/__init__.py index 34e07ddac5..ae8b7d8144 100644 --- a/synapse/types/rest/client/__init__.py +++ b/synapse/types/rest/client/__init__.py @@ -339,9 +339,27 @@ class SlidingSyncBody(RequestBodyModel): # Process all room subscriptions defined in the Room Subscription API. (This is the default.) rooms: Optional[List[StrictStr]] = ["*"] + class ReceiptsExtension(RequestBodyModel): + """The Receipts extension (MSC3960) + + Attributes: + enabled + lists: List of list keys (from the Sliding Window API) to apply this + extension to. + rooms: List of room IDs (from the Room Subscription API) to apply this + extension to. + """ + + enabled: Optional[StrictBool] = False + # Process all lists defined in the Sliding Window API. (This is the default.) + lists: Optional[List[StrictStr]] = ["*"] + # Process all room subscriptions defined in the Room Subscription API. (This is the default.) + rooms: Optional[List[StrictStr]] = ["*"] + to_device: Optional[ToDeviceExtension] = None e2ee: Optional[E2eeExtension] = None account_data: Optional[AccountDataExtension] = None + receipts: Optional[ReceiptsExtension] = None # mypy workaround via https://github.com/pydantic/pydantic/issues/156#issuecomment-1130883884 if TYPE_CHECKING: diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py index 2bbbd95a76..4422deb884 100644 --- a/tests/rest/client/test_sync.py +++ b/tests/rest/client/test_sync.py @@ -1369,6 +1369,7 @@ class SlidingSyncTestCase(SlidingSyncBase): room.register_servlets, sync.register_servlets, devices.register_servlets, + receipts.register_servlets, ] def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: @@ -4434,6 +4435,225 @@ class SlidingSyncTestCase(SlidingSyncBase): # `world_readable` but currently we don't support this. self.assertIsNone(response_body["rooms"].get(room_id1), response_body["rooms"]) + # Any extensions that use `lists`/`rooms` should be tested here + @parameterized.expand([("account_data",), ("receipts",)]) + def test_extensions_lists_rooms_relevant_rooms(self, extension_name: str) -> None: + """ + With various extensions, test out requesting different variations of + `lists`/`rooms`. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + # Create some rooms + room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok) + room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok) + room_id3 = self.helper.create_room_as(user1_id, tok=user1_tok) + room_id4 = self.helper.create_room_as(user1_id, tok=user1_tok) + room_id5 = self.helper.create_room_as(user1_id, tok=user1_tok) + + room_id_to_human_name_map = { + room_id1: "room1", + room_id2: "room2", + room_id3: "room3", + room_id4: "room4", + room_id5: "room5", + } + + for room_id in room_id_to_human_name_map.keys(): + if extension_name == "account_data": + # Add some account data to each room + self.get_success( + self.account_data_handler.add_account_data_to_room( + user_id=user1_id, + room_id=room_id, + account_data_type="org.matrix.roorarraz", + content={"roo": "rar"}, + ) + ) + elif extension_name == "receipts": + event_response = self.helper.send( + room_id, body="new event", tok=user1_tok + ) + # Read last event + channel = self.make_request( + "POST", + f"/rooms/{room_id}/receipt/{ReceiptTypes.READ}/{event_response["event_id"]}", + {}, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 200, channel.json_body) + else: + raise AssertionError(f"Unknown extension name: {extension_name}") + + main_sync_body = { + "lists": { + # We expect this list range to include room5 and room4 + "foo-list": { + "ranges": [[0, 1]], + "required_state": [], + "timeline_limit": 0, + }, + # We expect this list range to include room5, room4, room3 + "bar-list": { + "ranges": [[0, 2]], + "required_state": [], + "timeline_limit": 0, + }, + }, + "room_subscriptions": { + room_id1: { + "required_state": [], + "timeline_limit": 0, + } + }, + } + + # Mix lists and rooms + sync_body = { + **main_sync_body, + "extensions": { + extension_name: { + "enabled": True, + "lists": ["foo-list", "non-existent-list"], + "rooms": [room_id1, room_id2, "!non-existent-room"], + } + }, + } + response_body, _ = self.do_sync(sync_body, tok=user1_tok) + + # room1: ✅ Requested via `rooms` and a room subscription exists + # room2: ❌ Requested via `rooms` but not in the response (from lists or room subscriptions) + # room3: ❌ Not requested + # room4: ✅ Shows up because requested via `lists` and list exists in the response + # room5: ✅ Shows up because requested via `lists` and list exists in the response + self.assertIncludes( + { + room_id_to_human_name_map[room_id] + for room_id in response_body["extensions"][extension_name] + .get("rooms") + .keys() + }, + {"room1", "room4", "room5"}, + exact=True, + ) + + # Try wildcards (this is the default) + sync_body = { + **main_sync_body, + "extensions": { + extension_name: { + "enabled": True, + # "lists": ["*"], + # "rooms": ["*"], + } + }, + } + response_body, _ = self.do_sync(sync_body, tok=user1_tok) + + # room1: ✅ Shows up because of default `rooms` wildcard and is in one of the room subscriptions + # room2: ❌ Not requested + # room3: ✅ Shows up because of default `lists` wildcard and is in a list + # room4: ✅ Shows up because of default `lists` wildcard and is in a list + # room5: ✅ Shows up because of default `lists` wildcard and is in a list + self.assertIncludes( + { + room_id_to_human_name_map[room_id] + for room_id in response_body["extensions"][extension_name] + .get("rooms") + .keys() + }, + {"room1", "room3", "room4", "room5"}, + exact=True, + ) + + # Empty list will return nothing + sync_body = { + **main_sync_body, + "extensions": { + extension_name: { + "enabled": True, + "lists": [], + "rooms": [], + } + }, + } + response_body, _ = self.do_sync(sync_body, tok=user1_tok) + + # room1: ❌ Not requested + # room2: ❌ Not requested + # room3: ❌ Not requested + # room4: ❌ Not requested + # room5: ❌ Not requested + self.assertIncludes( + { + room_id_to_human_name_map[room_id] + for room_id in response_body["extensions"][extension_name] + .get("rooms") + .keys() + }, + set(), + exact=True, + ) + + # Try wildcard and none + sync_body = { + **main_sync_body, + "extensions": { + extension_name: { + "enabled": True, + "lists": ["*"], + "rooms": [], + } + }, + } + response_body, _ = self.do_sync(sync_body, tok=user1_tok) + + # room1: ❌ Not requested + # room2: ❌ Not requested + # room3: ✅ Shows up because of default `lists` wildcard and is in a list + # room4: ✅ Shows up because of default `lists` wildcard and is in a list + # room5: ✅ Shows up because of default `lists` wildcard and is in a list + self.assertIncludes( + { + room_id_to_human_name_map[room_id] + for room_id in response_body["extensions"][extension_name] + .get("rooms") + .keys() + }, + {"room3", "room4", "room5"}, + exact=True, + ) + + # Try requesting a room that is only in a list + sync_body = { + **main_sync_body, + "extensions": { + extension_name: { + "enabled": True, + "lists": [], + "rooms": [room_id5], + } + }, + } + response_body, _ = self.do_sync(sync_body, tok=user1_tok) + + # room1: ❌ Not requested + # room2: ❌ Not requested + # room3: ❌ Not requested + # room4: ❌ Not requested + # room5: ✅ Requested via `rooms` and is in a list + self.assertIncludes( + { + room_id_to_human_name_map[room_id] + for room_id in response_body["extensions"][extension_name] + .get("rooms") + .keys() + }, + {"room5"}, + exact=True, + ) + class SlidingSyncToDeviceExtensionTestCase(SlidingSyncBase): """Tests for the to-device sliding sync extension""" @@ -5447,188 +5667,407 @@ class SlidingSyncAccountDataExtensionTestCase(SlidingSyncBase): exact=True, ) - def test_room_account_data_relevant_rooms(self) -> None: + def test_wait_for_new_data(self) -> None: """ - Test out different variations of `lists`/`rooms` we are requesting account data for. + Test to make sure that the Sliding Sync request waits for new data to arrive. + + (Only applies to incremental syncs with a `timeout` specified) """ user1_id = self.register_user("user1", "pass") user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") - # Create a room and add some room account data - room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok) + room_id = self.helper.create_room_as(user2_id, tok=user2_tok) + self.helper.join(room_id, user1_id, tok=user1_tok) + + sync_body = { + "lists": {}, + "extensions": { + "account_data": { + "enabled": True, + } + }, + } + _, from_token = self.do_sync(sync_body, tok=user1_tok) + + # Make an incremental Sliding Sync request with the account_data extension enabled + channel = self.make_request( + "POST", + self.sync_endpoint + f"?timeout=10000&pos={from_token}", + content=sync_body, + access_token=user1_tok, + await_result=False, + ) + # Block for 5 seconds to make sure we are `notifier.wait_for_events(...)` + with self.assertRaises(TimedOutException): + channel.await_result(timeout_ms=5000) + # Bump the global account data to trigger new results self.get_success( - self.account_data_handler.add_account_data_to_room( - user_id=user1_id, - room_id=room_id1, - account_data_type="org.matrix.roorarraz", - content={"roo": "rar"}, + self.account_data_handler.add_account_data_for_user( + user1_id, + "org.matrix.foobarbaz", + {"foo": "bar"}, ) ) + # Should respond before the 10 second timeout + channel.await_result(timeout_ms=3000) + self.assertEqual(channel.code, 200, channel.json_body) - # Create another room with some room account data - room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok) - self.get_success( - self.account_data_handler.add_account_data_to_room( - user_id=user1_id, - room_id=room_id2, - account_data_type="org.matrix.roorarraz", - content={"roo": "rar"}, - ) + # We should see the global account data update + self.assertIncludes( + { + global_event["type"] + for global_event in channel.json_body["extensions"]["account_data"].get( + "global" + ) + }, + {"org.matrix.foobarbaz"}, + exact=True, + ) + self.assertIncludes( + channel.json_body["extensions"]["account_data"].get("rooms").keys(), + set(), + exact=True, ) - # Create another room with some room account data - room_id3 = self.helper.create_room_as(user1_id, tok=user1_tok) - self.get_success( - self.account_data_handler.add_account_data_to_room( - user_id=user1_id, - room_id=room_id3, - account_data_type="org.matrix.roorarraz", - content={"roo": "rar"}, - ) + def test_wait_for_new_data_timeout(self) -> None: + """ + Test to make sure that the Sliding Sync request waits for new data to arrive but + no data ever arrives so we timeout. We're also making sure that the default data + from the account_data extension doesn't trigger a false-positive for new data. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + sync_body = { + "lists": {}, + "extensions": { + "account_data": { + "enabled": True, + } + }, + } + _, from_token = self.do_sync(sync_body, tok=user1_tok) + + # Make the Sliding Sync request + channel = self.make_request( + "POST", + self.sync_endpoint + f"?timeout=10000&pos={from_token}", + content=sync_body, + access_token=user1_tok, + await_result=False, ) + # Block for 5 seconds to make sure we are `notifier.wait_for_events(...)` + with self.assertRaises(TimedOutException): + channel.await_result(timeout_ms=5000) + # Wake-up `notifier.wait_for_events(...)` that will cause us test + # `SlidingSyncResult.__bool__` for new results. + self._bump_notifier_wait_for_events( + user1_id, + # We choose `StreamKeyType.PRESENCE` because we're testing for account data + # and don't want to contaminate the account data results using + # `StreamKeyType.ACCOUNT_DATA`. + wake_stream_key=StreamKeyType.PRESENCE, + ) + # Block for a little bit more to ensure we don't see any new results. + with self.assertRaises(TimedOutException): + channel.await_result(timeout_ms=4000) + # Wait for the sync to complete (wait for the rest of the 10 second timeout, + # 5000 + 4000 + 1200 > 10000) + channel.await_result(timeout_ms=1200) + self.assertEqual(channel.code, 200, channel.json_body) - # Create another room with some room account data - room_id4 = self.helper.create_room_as(user1_id, tok=user1_tok) - self.get_success( - self.account_data_handler.add_account_data_to_room( - user_id=user1_id, - room_id=room_id4, - account_data_type="org.matrix.roorarraz", - content={"roo": "rar"}, - ) + self.assertIsNotNone( + channel.json_body["extensions"]["account_data"].get("global") + ) + self.assertIsNotNone( + channel.json_body["extensions"]["account_data"].get("rooms") ) - # Create another room with some room account data - room_id5 = self.helper.create_room_as(user1_id, tok=user1_tok) + +class SlidingSyncReceiptsExtensionTestCase(unittest.HomeserverTestCase): + """Tests for the receipts sliding sync extension""" + + servlets = [ + synapse.rest.admin.register_servlets, + login.register_servlets, + room.register_servlets, + sync.register_servlets, + ] + + def default_config(self) -> JsonDict: + config = super().default_config() + # Enable sliding sync + config["experimental_features"] = {"msc3575_enabled": True} + return config + + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + self.store = hs.get_datastores().main + self.event_sources = hs.get_event_sources() + self.e2e_keys_handler = hs.get_e2e_keys_handler() + self.account_data_handler = hs.get_account_data_handler() + self.notifier = hs.get_notifier() + self.sync_endpoint = ( + "/_matrix/client/unstable/org.matrix.simplified_msc3575/sync" + ) + + # TODO: Remove once https://github.com/element-hq/synapse/pull/17481 lands + def _bump_notifier_wait_for_events(self, user_id: str) -> None: + """ + Wake-up a `notifier.wait_for_events(user_id)` call without affecting the Sliding + Sync results. + """ + # We're expecting some new activity from this point onwards + from_token = self.event_sources.get_current_token() + + triggered_notifier_wait_for_events = False + + async def _on_new_acivity( + before_token: StreamToken, after_token: StreamToken + ) -> bool: + nonlocal triggered_notifier_wait_for_events + triggered_notifier_wait_for_events = True + return True + + # Listen for some new activity for the user. We're just trying to confirm that + # our bump below actually does what we think it does (triggers new activity for + # the user). + result_awaitable = self.notifier.wait_for_events( + user_id, + 1000, + _on_new_acivity, + from_token=from_token, + ) + + # Update the account data so that `notifier.wait_for_events(...)` wakes up. + # We're bumping account data because it won't show up in the Sliding Sync + # response so it won't affect whether we have results. self.get_success( - self.account_data_handler.add_account_data_to_room( - user_id=user1_id, - room_id=room_id5, - account_data_type="org.matrix.roorarraz", - content={"roo": "rar"}, + self.account_data_handler.add_account_data_for_user( + user_id, + "org.matrix.foobarbaz", + {"foo": "bar"}, ) ) - room_id_to_human_name_map = { - room_id1: "room1", - room_id2: "room2", - room_id3: "room3", - room_id4: "room4", - room_id5: "room5", - } + # Wait for our notifier result + self.get_success(result_awaitable) - # Mix lists and rooms + if not triggered_notifier_wait_for_events: + raise AssertionError( + "Expected `notifier.wait_for_events(...)` to be triggered" + ) + + def test_no_data_initial_sync(self) -> None: + """ + Test that enabling the account_data extension works during an intitial sync, + even if there is no-data. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + # Make an initial Sliding Sync request with the account_data extension enabled sync_body = { - "lists": { - # We expect this list range to include room5 and room4 - "foo-list": { - "ranges": [[0, 1]], - "required_state": [], - "timeline_limit": 0, - }, - # We expect this list range to include room5, room4, room3 - "bar-list": { - "ranges": [[0, 2]], - "required_state": [], - "timeline_limit": 0, - }, - }, - "room_subscriptions": { - room_id1: { - "required_state": [], - "timeline_limit": 0, - } - }, + "lists": {}, "extensions": { - "account_data": { + "receipts": { "enabled": True, - "lists": ["foo-list", "non-existent-list"], - "rooms": [room_id1, room_id2, "!non-existent-room"], } }, } response_body, _ = self.do_sync(sync_body, tok=user1_tok) - # room1: ✅ Requested via `rooms` and a room subscription exists - # room2: ❌ Requested via `rooms` but not in the response (from lists or room subscriptions) - # room3: ❌ Not requested - # room4: ✅ Shows up because requested via `lists` and list exists in the response - # room5: ✅ Shows up because requested via `lists` and list exists in the response self.assertIncludes( { - room_id_to_human_name_map[room_id] - for room_id in response_body["extensions"]["account_data"] - .get("rooms") - .keys() + global_event["type"] + for global_event in response_body["extensions"]["account_data"].get( + "global" + ) }, - {"room1", "room4", "room5"}, + # Even though we don't have any global account data set, Synapse saves some + # default push rules for us. + {AccountDataTypes.PUSH_RULES}, + exact=True, + ) + self.assertIncludes( + response_body["extensions"]["account_data"].get("rooms").keys(), + set(), exact=True, ) - # Try wildcards (this is the default) + def test_no_data_incremental_sync(self) -> None: + """ + Test that enabling account_data extension works during an incremental sync, even + if there is no-data. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + sync_body = { - "lists": { - # We expect this list range to include room5 and room4 - "foo-list": { - "ranges": [[0, 1]], - "required_state": [], - "timeline_limit": 0, - }, - # We expect this list range to include room5, room4, room3 - "bar-list": { - "ranges": [[0, 2]], - "required_state": [], - "timeline_limit": 0, - }, - }, - "room_subscriptions": { - room_id1: { - "required_state": [], - "timeline_limit": 0, + "lists": {}, + "extensions": { + "receipts": { + "enabled": True, } }, + } + _, from_token = self.do_sync(sync_body, tok=user1_tok) + + # Make an incremental Sliding Sync request with the account_data extension enabled + response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) + + # There has been no account data changes since the `from_token` so we shouldn't + # see any account data here. + self.assertIncludes( + { + global_event["type"] + for global_event in response_body["extensions"]["account_data"].get( + "global" + ) + }, + set(), + exact=True, + ) + self.assertIncludes( + response_body["extensions"]["account_data"].get("rooms").keys(), + set(), + exact=True, + ) + + def test_global_account_data_initial_sync(self) -> None: + """ + On initial sync, we should return all global account data on initial sync. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + # Update the global account data + self.get_success( + self.account_data_handler.add_account_data_for_user( + user_id=user1_id, + account_data_type="org.matrix.foobarbaz", + content={"foo": "bar"}, + ) + ) + + # Make an initial Sliding Sync request with the account_data extension enabled + sync_body = { + "lists": {}, "extensions": { - "account_data": { + "receipts": { "enabled": True, - # "lists": ["*"], - # "rooms": ["*"], } }, } response_body, _ = self.do_sync(sync_body, tok=user1_tok) - # room1: ✅ Shows up because of default `rooms` wildcard and is in one of the room subscriptions - # room2: ❌ Not requested - # room3: ✅ Shows up because of default `lists` wildcard and is in a list - # room4: ✅ Shows up because of default `lists` wildcard and is in a list - # room5: ✅ Shows up because of default `lists` wildcard and is in a list + # It should show us all of the global account data self.assertIncludes( { - room_id_to_human_name_map[room_id] - for room_id in response_body["extensions"]["account_data"] - .get("rooms") - .keys() + global_event["type"] + for global_event in response_body["extensions"]["account_data"].get( + "global" + ) }, - {"room1", "room3", "room4", "room5"}, + {AccountDataTypes.PUSH_RULES, "org.matrix.foobarbaz"}, + exact=True, + ) + self.assertIncludes( + response_body["extensions"]["account_data"].get("rooms").keys(), + set(), exact=True, ) - # Empty list will return nothing + def test_global_account_data_incremental_sync(self) -> None: + """ + On incremental sync, we should only account data that has changed since the + `from_token`. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + # Add some global account data + self.get_success( + self.account_data_handler.add_account_data_for_user( + user_id=user1_id, + account_data_type="org.matrix.foobarbaz", + content={"foo": "bar"}, + ) + ) + sync_body = { - "lists": { - # We expect this list range to include room5 and room4 - "foo-list": { - "ranges": [[0, 1]], - "required_state": [], - "timeline_limit": 0, - }, - # We expect this list range to include room5, room4, room3 - "bar-list": { - "ranges": [[0, 2]], - "required_state": [], - "timeline_limit": 0, - }, + "lists": {}, + "extensions": { + "receipts": { + "enabled": True, + } + }, + } + _, from_token = self.do_sync(sync_body, tok=user1_tok) + + # Add some other global account data + self.get_success( + self.account_data_handler.add_account_data_for_user( + user_id=user1_id, + account_data_type="org.matrix.doodardaz", + content={"doo": "dar"}, + ) + ) + + # Make an incremental Sliding Sync request with the account_data extension enabled + response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) + + self.assertIncludes( + { + global_event["type"] + for global_event in response_body["extensions"]["account_data"].get( + "global" + ) }, + # We should only see the new global account data that happened after the `from_token` + {"org.matrix.doodardaz"}, + exact=True, + ) + self.assertIncludes( + response_body["extensions"]["account_data"].get("rooms").keys(), + set(), + exact=True, + ) + + def test_room_account_data_initial_sync(self) -> None: + """ + On initial sync, we return all account data for a given room but only for + rooms that we request and are being returned in the Sliding Sync response. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + # Create a room and add some room account data + room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok) + self.get_success( + self.account_data_handler.add_account_data_to_room( + user_id=user1_id, + room_id=room_id1, + account_data_type="org.matrix.roorarraz", + content={"roo": "rar"}, + ) + ) + + # Create another room with some room account data + room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok) + self.get_success( + self.account_data_handler.add_account_data_to_room( + user_id=user1_id, + room_id=room_id2, + account_data_type="org.matrix.roorarraz", + content={"roo": "rar"}, + ) + ) + + # Make an initial Sliding Sync request with the account_data extension enabled + sync_body = { + "lists": {}, "room_subscriptions": { room_id1: { "required_state": [], @@ -5636,47 +6075,66 @@ class SlidingSyncAccountDataExtensionTestCase(SlidingSyncBase): } }, "extensions": { - "account_data": { + "receipts": { "enabled": True, - "lists": [], - "rooms": [], + "rooms": [room_id1, room_id2], } }, } response_body, _ = self.do_sync(sync_body, tok=user1_tok) - # room1: ❌ Not requested - # room2: ❌ Not requested - # room3: ❌ Not requested - # room4: ❌ Not requested - # room5: ❌ Not requested + self.assertIsNotNone(response_body["extensions"]["account_data"].get("global")) + # Even though we requested room2, we only expect room1 to show up because that's + # the only room in the Sliding Sync response (room2 is not one of our room + # subscriptions or in a sliding window list). + self.assertIncludes( + response_body["extensions"]["account_data"].get("rooms").keys(), + {room_id1}, + exact=True, + ) self.assertIncludes( { - room_id_to_human_name_map[room_id] - for room_id in response_body["extensions"]["account_data"] + event["type"] + for event in response_body["extensions"]["account_data"] .get("rooms") - .keys() + .get(room_id1) }, - set(), + {"org.matrix.roorarraz"}, exact=True, ) - # Try wildcard and none + def test_room_account_data_incremental_sync(self) -> None: + """ + On incremental sync, we return all account data for a given room but only for + rooms that we request and are being returned in the Sliding Sync response. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + # Create a room and add some room account data + room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok) + self.get_success( + self.account_data_handler.add_account_data_to_room( + user_id=user1_id, + room_id=room_id1, + account_data_type="org.matrix.roorarraz", + content={"roo": "rar"}, + ) + ) + + # Create another room with some room account data + room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok) + self.get_success( + self.account_data_handler.add_account_data_to_room( + user_id=user1_id, + room_id=room_id2, + account_data_type="org.matrix.roorarraz", + content={"roo": "rar"}, + ) + ) + sync_body = { - "lists": { - # We expect this list range to include room5 and room4 - "foo-list": { - "ranges": [[0, 1]], - "required_state": [], - "timeline_limit": 0, - }, - # We expect this list range to include room5, room4, room3 - "bar-list": { - "ranges": [[0, 2]], - "required_state": [], - "timeline_limit": 0, - }, - }, + "lists": {}, "room_subscriptions": { room_id1: { "required_state": [], @@ -5684,28 +6142,53 @@ class SlidingSyncAccountDataExtensionTestCase(SlidingSyncBase): } }, "extensions": { - "account_data": { + "receipts": { "enabled": True, - "lists": ["*"], - "rooms": [], + "rooms": [room_id1, room_id2], } }, } - response_body, _ = self.do_sync(sync_body, tok=user1_tok) + _, from_token = self.do_sync(sync_body, tok=user1_tok) - # room1: ❌ Not requested - # room2: ❌ Not requested - # room3: ✅ Shows up because of default `lists` wildcard and is in a list - # room4: ✅ Shows up because of default `lists` wildcard and is in a list - # room5: ✅ Shows up because of default `lists` wildcard and is in a list + # Add some other room account data + self.get_success( + self.account_data_handler.add_account_data_to_room( + user_id=user1_id, + room_id=room_id1, + account_data_type="org.matrix.roorarraz2", + content={"roo": "rar"}, + ) + ) + self.get_success( + self.account_data_handler.add_account_data_to_room( + user_id=user1_id, + room_id=room_id2, + account_data_type="org.matrix.roorarraz2", + content={"roo": "rar"}, + ) + ) + + # Make an incremental Sliding Sync request with the account_data extension enabled + response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) + + self.assertIsNotNone(response_body["extensions"]["account_data"].get("global")) + # Even though we requested room2, we only expect room1 to show up because that's + # the only room in the Sliding Sync response (room2 is not one of our room + # subscriptions or in a sliding window list). + self.assertIncludes( + response_body["extensions"]["account_data"].get("rooms").keys(), + {room_id1}, + exact=True, + ) + # We should only see the new room account data that happened after the `from_token` self.assertIncludes( { - room_id_to_human_name_map[room_id] - for room_id in response_body["extensions"]["account_data"] + event["type"] + for event in response_body["extensions"]["account_data"] .get("rooms") - .keys() + .get(room_id1) }, - {"room3", "room4", "room5"}, + {"org.matrix.roorarraz2"}, exact=True, ) @@ -5726,7 +6209,7 @@ class SlidingSyncAccountDataExtensionTestCase(SlidingSyncBase): sync_body = { "lists": {}, "extensions": { - "account_data": { + "receipts": { "enabled": True, } }, @@ -5785,7 +6268,7 @@ class SlidingSyncAccountDataExtensionTestCase(SlidingSyncBase): sync_body = { "lists": {}, "extensions": { - "account_data": { + "receipts": { "enabled": True, } }, @@ -5805,13 +6288,7 @@ class SlidingSyncAccountDataExtensionTestCase(SlidingSyncBase): channel.await_result(timeout_ms=5000) # Wake-up `notifier.wait_for_events(...)` that will cause us test # `SlidingSyncResult.__bool__` for new results. - self._bump_notifier_wait_for_events( - user1_id, - # We choose `StreamKeyType.PRESENCE` because we're testing for account data - # and don't want to contaminate the account data results using - # `StreamKeyType.ACCOUNT_DATA`. - wake_stream_key=StreamKeyType.PRESENCE, - ) + self._bump_notifier_wait_for_events(user1_id) # Block for a little bit more to ensure we don't see any new results. with self.assertRaises(TimedOutException): channel.await_result(timeout_ms=4000) |