diff options
Diffstat (limited to 'tests/rest/client/test_sync.py')
-rw-r--r-- | tests/rest/client/test_sync.py | 3713 |
1 files changed, 2358 insertions, 1355 deletions
diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py index 2628869de6..5abf1041be 100644 --- a/tests/rest/client/test_sync.py +++ b/tests/rest/client/test_sync.py @@ -21,7 +21,7 @@ import json import logging from http import HTTPStatus -from typing import Any, Dict, Iterable, List +from typing import Any, Dict, Iterable, List, Literal, Optional, Tuple from parameterized import parameterized, parameterized_class @@ -37,6 +37,7 @@ from synapse.api.constants import ( ReceiptTypes, RelationTypes, ) +from synapse.api.room_versions import RoomVersions from synapse.events import EventBase from synapse.handlers.sliding_sync import StateValues from synapse.rest.client import ( @@ -50,16 +51,24 @@ from synapse.rest.client import ( sync, ) from synapse.server import HomeServer -from synapse.types import JsonDict, RoomStreamToken, StreamKeyType, StreamToken, UserID +from synapse.types import ( + JsonDict, + RoomStreamToken, + SlidingSyncStreamToken, + StreamKeyType, + StreamToken, + UserID, +) +from synapse.types.handlers import SlidingSyncConfig from synapse.util import Clock +from synapse.util.stringutils import random_string from tests import unittest from tests.federation.transport.test_knocking import ( KnockingStrippedStateEventHelperMixin, ) -from tests.server import FakeChannel, TimedOutException -from tests.test_utils.event_injection import mark_event_as_partial_state -from tests.unittest import skip_unless +from tests.server import TimedOutException +from tests.test_utils.event_injection import create_event, mark_event_as_partial_state logger = logging.getLogger(__name__) @@ -1225,7 +1234,131 @@ class ExcludeRoomTestCase(unittest.HomeserverTestCase): self.assertIn(self.included_room_id, channel.json_body["rooms"]["join"]) -class SlidingSyncTestCase(unittest.HomeserverTestCase): +class SlidingSyncBase(unittest.HomeserverTestCase): + """Base class for sliding sync test cases""" + + sync_endpoint = "/_matrix/client/unstable/org.matrix.simplified_msc3575/sync" + + def default_config(self) -> JsonDict: + config = super().default_config() + # Enable sliding sync + config["experimental_features"] = {"msc3575_enabled": True} + return config + + def do_sync( + self, sync_body: JsonDict, *, since: Optional[str] = None, tok: str + ) -> Tuple[JsonDict, str]: + """Do a sliding sync request with given body. + + Asserts the request was successful. + + Attributes: + sync_body: The full request body to use + since: Optional since token + tok: Access token to use + + Returns: + A tuple of the response body and the `pos` field. + """ + + sync_path = self.sync_endpoint + if since: + sync_path += f"?pos={since}" + + channel = self.make_request( + method="POST", + path=sync_path, + content=sync_body, + access_token=tok, + ) + self.assertEqual(channel.code, 200, channel.json_body) + + return channel.json_body, channel.json_body["pos"] + + def _bump_notifier_wait_for_events( + self, + user_id: str, + wake_stream_key: Literal[ + StreamKeyType.ACCOUNT_DATA, + StreamKeyType.PRESENCE, + ], + ) -> None: + """ + Wake-up a `notifier.wait_for_events(user_id)` call without affecting the Sliding + Sync results. + + Args: + user_id: The user ID to wake up the notifier for + wake_stream_key: The stream key to wake up. This will create an actual new + entity in that stream so it's best to choose one that won't affect the + Sliding Sync results you're testing for. In other words, if your testing + account data, choose `StreamKeyType.PRESENCE` instead. We support two + possible stream keys because you're probably testing one or the other so + one is always a "safe" option. + """ + # We're expecting some new activity from this point onwards + from_token = self.hs.get_event_sources().get_current_token() + + triggered_notifier_wait_for_events = False + + async def _on_new_acivity( + before_token: StreamToken, after_token: StreamToken + ) -> bool: + nonlocal triggered_notifier_wait_for_events + triggered_notifier_wait_for_events = True + return True + + notifier = self.hs.get_notifier() + + # Listen for some new activity for the user. We're just trying to confirm that + # our bump below actually does what we think it does (triggers new activity for + # the user). + result_awaitable = notifier.wait_for_events( + user_id, + 1000, + _on_new_acivity, + from_token=from_token, + ) + + # Update the account data or presence so that `notifier.wait_for_events(...)` + # wakes up. We chose these two options because they're least likely to show up + # in the Sliding Sync response so it won't affect whether we have results. + if wake_stream_key == StreamKeyType.ACCOUNT_DATA: + self.get_success( + self.hs.get_account_data_handler().add_account_data_for_user( + user_id, + "org.matrix.foobarbaz", + {"foo": "bar"}, + ) + ) + elif wake_stream_key == StreamKeyType.PRESENCE: + sending_user_id = self.register_user( + "user_bump_notifier_wait_for_events_" + random_string(10), "pass" + ) + sending_user_tok = self.login(sending_user_id, "pass") + test_msg = {"foo": "bar"} + chan = self.make_request( + "PUT", + "/_matrix/client/r0/sendToDevice/m.test/1234", + content={"messages": {user_id: {"d1": test_msg}}}, + access_token=sending_user_tok, + ) + self.assertEqual(chan.code, 200, chan.result) + else: + raise AssertionError( + "Unable to wake that stream in _bump_notifier_wait_for_events(...)" + ) + + # Wait for our notifier result + self.get_success(result_awaitable) + + if not triggered_notifier_wait_for_events: + raise AssertionError( + "Expected `notifier.wait_for_events(...)` to be triggered" + ) + + +class SlidingSyncTestCase(SlidingSyncBase): """ Tests regarding MSC3575 Sliding Sync `/sync` endpoint. """ @@ -1238,22 +1371,10 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): devices.register_servlets, ] - def default_config(self) -> JsonDict: - config = super().default_config() - # Enable sliding sync - config["experimental_features"] = {"msc3575_enabled": True} - return config - def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.store = hs.get_datastores().main - self.sync_endpoint = ( - "/_matrix/client/unstable/org.matrix.simplified_msc3575/sync" - ) - self.store = hs.get_datastores().main self.event_sources = hs.get_event_sources() self.storage_controllers = hs.get_storage_controllers() - self.account_data_handler = hs.get_account_data_handler() - self.notifier = hs.get_notifier() def _assertRequiredStateIncludes( self, @@ -1379,94 +1500,41 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): return room_id - def _bump_notifier_wait_for_events(self, user_id: str) -> None: - """ - Wake-up a `notifier.wait_for_events(user_id)` call without affecting the Sliding - Sync results. - """ - # We're expecting some new activity from this point onwards - from_token = self.event_sources.get_current_token() - - triggered_notifier_wait_for_events = False - - async def _on_new_acivity( - before_token: StreamToken, after_token: StreamToken - ) -> bool: - nonlocal triggered_notifier_wait_for_events - triggered_notifier_wait_for_events = True - return True - - # Listen for some new activity for the user. We're just trying to confirm that - # our bump below actually does what we think it does (triggers new activity for - # the user). - result_awaitable = self.notifier.wait_for_events( - user_id, - 1000, - _on_new_acivity, - from_token=from_token, - ) - - # Update the account data so that `notifier.wait_for_events(...)` wakes up. - # We're bumping account data because it won't show up in the Sliding Sync - # response so it won't affect whether we have results. - self.get_success( - self.account_data_handler.add_account_data_for_user( - user_id, - "org.matrix.foobarbaz", - {"foo": "bar"}, - ) - ) - - # Wait for our notifier result - self.get_success(result_awaitable) - - if not triggered_notifier_wait_for_events: - raise AssertionError( - "Expected `notifier.wait_for_events(...)` to be triggered" - ) - def test_sync_list(self) -> None: """ Test that room IDs show up in the Sliding Sync `lists` """ - alice_user_id = self.register_user("alice", "correcthorse") - alice_access_token = self.login(alice_user_id, "correcthorse") + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") - room_id = self.helper.create_room_as( - alice_user_id, tok=alice_access_token, is_public=True - ) + room_id = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True) # Make the Sliding Sync request - channel = self.make_request( - "POST", - self.sync_endpoint, - { - "lists": { - "foo-list": { - "ranges": [[0, 99]], - "required_state": [ - ["m.room.join_rules", ""], - ["m.room.history_visibility", ""], - ["m.space.child", "*"], - ], - "timeline_limit": 1, - } + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 99]], + "required_state": [ + ["m.room.join_rules", ""], + ["m.room.history_visibility", ""], + ["m.space.child", "*"], + ], + "timeline_limit": 1, } - }, - access_token=alice_access_token, - ) - self.assertEqual(channel.code, 200, channel.json_body) + } + } + response_body, _ = self.do_sync(sync_body, tok=user1_tok) # Make sure it has the foo-list we requested self.assertListEqual( - list(channel.json_body["lists"].keys()), + list(response_body["lists"].keys()), ["foo-list"], - channel.json_body["lists"].keys(), + response_body["lists"].keys(), ) # Make sure the list includes the room we are joined to self.assertListEqual( - list(channel.json_body["lists"]["foo-list"]["ops"]), + list(response_body["lists"]["foo-list"]["ops"]), [ { "op": "SYNC", @@ -1474,15 +1542,15 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): "room_ids": [room_id], } ], - channel.json_body["lists"]["foo-list"], + response_body["lists"]["foo-list"], ) def test_wait_for_sync_token(self) -> None: """ Test that worker will wait until it catches up to the given token """ - alice_user_id = self.register_user("alice", "correcthorse") - alice_access_token = self.login(alice_user_id, "correcthorse") + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") # Create a future token that will cause us to wait. Since we never send a new # event to reach that future stream_ordering, the worker will wait until the @@ -1496,27 +1564,28 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): ) future_position_token_serialized = self.get_success( - future_position_token.to_string(self.store) + SlidingSyncStreamToken(future_position_token, 0).to_string(self.store) ) # Make the Sliding Sync request + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 99]], + "required_state": [ + ["m.room.join_rules", ""], + ["m.room.history_visibility", ""], + ["m.space.child", "*"], + ], + "timeline_limit": 1, + } + } + } channel = self.make_request( "POST", self.sync_endpoint + f"?pos={future_position_token_serialized}", - { - "lists": { - "foo-list": { - "ranges": [[0, 99]], - "required_state": [ - ["m.room.join_rules", ""], - ["m.room.history_visibility", ""], - ["m.space.child", "*"], - ], - "timeline_limit": 1, - } - } - }, - access_token=alice_access_token, + content=sync_body, + access_token=user1_tok, await_result=False, ) # Block for 10 seconds to make `notifier.wait_for_stream_token(from_token)` @@ -1544,23 +1613,22 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): room_id = self.helper.create_room_as(user2_id, tok=user2_tok) self.helper.join(room_id, user1_id, tok=user1_tok) - from_token = self.event_sources.get_current_token() + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 0]], + "required_state": [], + "timeline_limit": 1, + } + } + } + _, from_token = self.do_sync(sync_body, tok=user1_tok) # Make the Sliding Sync request channel = self.make_request( "POST", - self.sync_endpoint - + "?timeout=10000" - + f"&pos={self.get_success(from_token.to_string(self.store))}", - { - "lists": { - "foo-list": { - "ranges": [[0, 0]], - "required_state": [], - "timeline_limit": 1, - } - } - }, + self.sync_endpoint + f"?timeout=10000&pos={from_token}", + content=sync_body, access_token=user1_tok, await_result=False, ) @@ -1587,12 +1655,6 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): channel.json_body["rooms"][room_id]["timeline"], ) - # TODO: Once we remove `ops`, we should be able to add a `RoomResult.__bool__` to - # check if there are any updates since the `from_token`. - @skip_unless( - False, - "Once we remove ops from the Sliding Sync response, this test should pass", - ) def test_wait_for_new_data_timeout(self) -> None: """ Test to make sure that the Sliding Sync request waits for new data to arrive but @@ -1607,23 +1669,22 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): room_id = self.helper.create_room_as(user2_id, tok=user2_tok) self.helper.join(room_id, user1_id, tok=user1_tok) - from_token = self.event_sources.get_current_token() + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 0]], + "required_state": [], + "timeline_limit": 1, + } + } + } + _, from_token = self.do_sync(sync_body, tok=user1_tok) # Make the Sliding Sync request channel = self.make_request( "POST", - self.sync_endpoint - + "?timeout=10000" - + f"&pos={self.get_success(from_token.to_string(self.store))}", - { - "lists": { - "foo-list": { - "ranges": [[0, 0]], - "required_state": [], - "timeline_limit": 1, - } - } - }, + self.sync_endpoint + f"?timeout=10000&pos={from_token}", + content=sync_body, access_token=user1_tok, await_result=False, ) @@ -1632,7 +1693,9 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): channel.await_result(timeout_ms=5000) # Wake-up `notifier.wait_for_events(...)` that will cause us test # `SlidingSyncResult.__bool__` for new results. - self._bump_notifier_wait_for_events(user1_id) + self._bump_notifier_wait_for_events( + user1_id, wake_stream_key=StreamKeyType.ACCOUNT_DATA + ) # Block for a little bit more to ensure we don't see any new results. with self.assertRaises(TimedOutException): channel.await_result(timeout_ms=4000) @@ -1641,12 +1704,8 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): channel.await_result(timeout_ms=1200) self.assertEqual(channel.code, 200, channel.json_body) - # We still see rooms because that's how Sliding Sync lists work but we reached - # the timeout before seeing them - self.assertEqual( - [event["event_id"] for event in channel.json_body["rooms"].keys()], - [room_id], - ) + # There should be no room sent down. + self.assertFalse(channel.json_body["rooms"]) def test_filter_list(self) -> None: """ @@ -1682,55 +1741,50 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): self.helper.invite(invite_room_id, src=user2_id, targ=user1_id, tok=user2_tok) # Make the Sliding Sync request - channel = self.make_request( - "POST", - self.sync_endpoint, - { - "lists": { - # Absense of filters does not imply "False" values - "all": { - "ranges": [[0, 99]], - "required_state": [], - "timeline_limit": 1, - "filters": {}, - }, - # Test single truthy filter - "dms": { - "ranges": [[0, 99]], - "required_state": [], - "timeline_limit": 1, - "filters": {"is_dm": True}, - }, - # Test single falsy filter - "non-dms": { - "ranges": [[0, 99]], - "required_state": [], - "timeline_limit": 1, - "filters": {"is_dm": False}, - }, - # Test how multiple filters should stack (AND'd together) - "room-invites": { - "ranges": [[0, 99]], - "required_state": [], - "timeline_limit": 1, - "filters": {"is_dm": False, "is_invite": True}, - }, - } - }, - access_token=user1_tok, - ) - self.assertEqual(channel.code, 200, channel.json_body) + sync_body = { + "lists": { + # Absense of filters does not imply "False" values + "all": { + "ranges": [[0, 99]], + "required_state": [], + "timeline_limit": 1, + "filters": {}, + }, + # Test single truthy filter + "dms": { + "ranges": [[0, 99]], + "required_state": [], + "timeline_limit": 1, + "filters": {"is_dm": True}, + }, + # Test single falsy filter + "non-dms": { + "ranges": [[0, 99]], + "required_state": [], + "timeline_limit": 1, + "filters": {"is_dm": False}, + }, + # Test how multiple filters should stack (AND'd together) + "room-invites": { + "ranges": [[0, 99]], + "required_state": [], + "timeline_limit": 1, + "filters": {"is_dm": False, "is_invite": True}, + }, + } + } + response_body, _ = self.do_sync(sync_body, tok=user1_tok) # Make sure it has the foo-list we requested self.assertListEqual( - list(channel.json_body["lists"].keys()), + list(response_body["lists"].keys()), ["all", "dms", "non-dms", "room-invites"], - channel.json_body["lists"].keys(), + response_body["lists"].keys(), ) # Make sure the lists have the correct rooms self.assertListEqual( - list(channel.json_body["lists"]["all"]["ops"]), + list(response_body["lists"]["all"]["ops"]), [ { "op": "SYNC", @@ -1743,10 +1797,10 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): ], } ], - list(channel.json_body["lists"]["all"]), + list(response_body["lists"]["all"]), ) self.assertListEqual( - list(channel.json_body["lists"]["dms"]["ops"]), + list(response_body["lists"]["dms"]["ops"]), [ { "op": "SYNC", @@ -1754,10 +1808,10 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): "room_ids": [invited_dm_room_id, joined_dm_room_id], } ], - list(channel.json_body["lists"]["dms"]), + list(response_body["lists"]["dms"]), ) self.assertListEqual( - list(channel.json_body["lists"]["non-dms"]["ops"]), + list(response_body["lists"]["non-dms"]["ops"]), [ { "op": "SYNC", @@ -1765,10 +1819,10 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): "room_ids": [invite_room_id, room_id], } ], - list(channel.json_body["lists"]["non-dms"]), + list(response_body["lists"]["non-dms"]), ) self.assertListEqual( - list(channel.json_body["lists"]["room-invites"]["ops"]), + list(response_body["lists"]["room-invites"]["ops"]), [ { "op": "SYNC", @@ -1776,14 +1830,14 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): "room_ids": [invite_room_id], } ], - list(channel.json_body["lists"]["room-invites"]), + list(response_body["lists"]["room-invites"]), ) # Ensure DM's are correctly marked self.assertDictEqual( { room_id: room.get("is_dm") - for room_id, room in channel.json_body["rooms"].items() + for room_id, room in response_body["rooms"].items() }, { invite_room_id: None, @@ -1810,36 +1864,31 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): self.helper.send(room_id2, "activity in room2", tok=user1_tok) # Make the Sliding Sync request - channel = self.make_request( - "POST", - self.sync_endpoint, - { - "lists": { - "foo-list": { - "ranges": [[0, 99]], - "required_state": [ - ["m.room.join_rules", ""], - ["m.room.history_visibility", ""], - ["m.space.child", "*"], - ], - "timeline_limit": 1, - } + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 99]], + "required_state": [ + ["m.room.join_rules", ""], + ["m.room.history_visibility", ""], + ["m.space.child", "*"], + ], + "timeline_limit": 1, } - }, - access_token=user1_tok, - ) - self.assertEqual(channel.code, 200, channel.json_body) + } + } + response_body, _ = self.do_sync(sync_body, tok=user1_tok) # Make sure it has the foo-list we requested self.assertListEqual( - list(channel.json_body["lists"].keys()), + list(response_body["lists"].keys()), ["foo-list"], - channel.json_body["lists"].keys(), + response_body["lists"].keys(), ) # Make sure the list is sorted in the way we expect self.assertListEqual( - list(channel.json_body["lists"]["foo-list"]["ops"]), + list(response_body["lists"]["foo-list"]["ops"]), [ { "op": "SYNC", @@ -1847,7 +1896,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): "room_ids": [room_id2, room_id1, room_id3], } ], - channel.json_body["lists"]["foo-list"], + response_body["lists"]["foo-list"], ) def test_sliced_windows(self) -> None: @@ -1863,35 +1912,26 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): room_id3 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True) # Make the Sliding Sync request for a single room - channel = self.make_request( - "POST", - self.sync_endpoint, - { - "lists": { - "foo-list": { - "ranges": [[0, 0]], - "required_state": [ - ["m.room.join_rules", ""], - ["m.room.history_visibility", ""], - ["m.space.child", "*"], - ], - "timeline_limit": 1, - } + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 0]], + "required_state": [], + "timeline_limit": 1, } - }, - access_token=user1_tok, - ) - self.assertEqual(channel.code, 200, channel.json_body) + } + } + response_body, _ = self.do_sync(sync_body, tok=user1_tok) # Make sure it has the foo-list we requested self.assertListEqual( - list(channel.json_body["lists"].keys()), + list(response_body["lists"].keys()), ["foo-list"], - channel.json_body["lists"].keys(), + response_body["lists"].keys(), ) # Make sure the list is sorted in the way we expect self.assertListEqual( - list(channel.json_body["lists"]["foo-list"]["ops"]), + list(response_body["lists"]["foo-list"]["ops"]), [ { "op": "SYNC", @@ -1899,39 +1939,30 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): "room_ids": [room_id3], } ], - channel.json_body["lists"]["foo-list"], + response_body["lists"]["foo-list"], ) # Make the Sliding Sync request for the first two rooms - channel = self.make_request( - "POST", - self.sync_endpoint, - { - "lists": { - "foo-list": { - "ranges": [[0, 1]], - "required_state": [ - ["m.room.join_rules", ""], - ["m.room.history_visibility", ""], - ["m.space.child", "*"], - ], - "timeline_limit": 1, - } + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [], + "timeline_limit": 1, } - }, - access_token=user1_tok, - ) - self.assertEqual(channel.code, 200, channel.json_body) + } + } + response_body, _ = self.do_sync(sync_body, tok=user1_tok) # Make sure it has the foo-list we requested self.assertListEqual( - list(channel.json_body["lists"].keys()), + list(response_body["lists"].keys()), ["foo-list"], - channel.json_body["lists"].keys(), + response_body["lists"].keys(), ) # Make sure the list is sorted in the way we expect self.assertListEqual( - list(channel.json_body["lists"]["foo-list"]["ops"]), + list(response_body["lists"]["foo-list"]["ops"]), [ { "op": "SYNC", @@ -1939,7 +1970,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): "room_ids": [room_id3, room_id2], } ], - channel.json_body["lists"]["foo-list"], + response_body["lists"]["foo-list"], ) def test_rooms_meta_when_joined(self) -> None: @@ -1970,43 +2001,38 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): self.helper.join(room_id1, user1_id, tok=user1_tok) # Make the Sliding Sync request - channel = self.make_request( - "POST", - self.sync_endpoint, - { - "lists": { - "foo-list": { - "ranges": [[0, 1]], - "required_state": [], - "timeline_limit": 0, - } + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [], + "timeline_limit": 0, } - }, - access_token=user1_tok, - ) - self.assertEqual(channel.code, 200, channel.json_body) + } + } + response_body, _ = self.do_sync(sync_body, tok=user1_tok) # Reflect the current state of the room self.assertEqual( - channel.json_body["rooms"][room_id1]["name"], + response_body["rooms"][room_id1]["name"], "my super room", - channel.json_body["rooms"][room_id1], + response_body["rooms"][room_id1], ) self.assertEqual( - channel.json_body["rooms"][room_id1]["avatar"], + response_body["rooms"][room_id1]["avatar"], "mxc://DUMMY_MEDIA_ID", - channel.json_body["rooms"][room_id1], + response_body["rooms"][room_id1], ) self.assertEqual( - channel.json_body["rooms"][room_id1]["joined_count"], + response_body["rooms"][room_id1]["joined_count"], 2, ) self.assertEqual( - channel.json_body["rooms"][room_id1]["invited_count"], + response_body["rooms"][room_id1]["invited_count"], 0, ) self.assertIsNone( - channel.json_body["rooms"][room_id1].get("is_dm"), + response_body["rooms"][room_id1].get("is_dm"), ) def test_rooms_meta_when_invited(self) -> None: @@ -2053,44 +2079,39 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): ) # Make the Sliding Sync request - channel = self.make_request( - "POST", - self.sync_endpoint, - { - "lists": { - "foo-list": { - "ranges": [[0, 1]], - "required_state": [], - "timeline_limit": 0, - } + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [], + "timeline_limit": 0, } - }, - access_token=user1_tok, - ) - self.assertEqual(channel.code, 200, channel.json_body) + } + } + response_body, _ = self.do_sync(sync_body, tok=user1_tok) # This should still reflect the current state of the room even when the user is # invited. self.assertEqual( - channel.json_body["rooms"][room_id1]["name"], + response_body["rooms"][room_id1]["name"], "my super duper room", - channel.json_body["rooms"][room_id1], + response_body["rooms"][room_id1], ) self.assertEqual( - channel.json_body["rooms"][room_id1]["avatar"], + response_body["rooms"][room_id1]["avatar"], "mxc://UPDATED_DUMMY_MEDIA_ID", - channel.json_body["rooms"][room_id1], + response_body["rooms"][room_id1], ) self.assertEqual( - channel.json_body["rooms"][room_id1]["joined_count"], + response_body["rooms"][room_id1]["joined_count"], 1, ) self.assertEqual( - channel.json_body["rooms"][room_id1]["invited_count"], + response_body["rooms"][room_id1]["invited_count"], 1, ) self.assertIsNone( - channel.json_body["rooms"][room_id1].get("is_dm"), + response_body["rooms"][room_id1].get("is_dm"), ) def test_rooms_meta_when_banned(self) -> None: @@ -2137,45 +2158,40 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): ) # Make the Sliding Sync request - channel = self.make_request( - "POST", - self.sync_endpoint, - { - "lists": { - "foo-list": { - "ranges": [[0, 1]], - "required_state": [], - "timeline_limit": 0, - } + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [], + "timeline_limit": 0, } - }, - access_token=user1_tok, - ) - self.assertEqual(channel.code, 200, channel.json_body) + } + } + response_body, _ = self.do_sync(sync_body, tok=user1_tok) # Reflect the state of the room at the time of leaving self.assertEqual( - channel.json_body["rooms"][room_id1]["name"], + response_body["rooms"][room_id1]["name"], "my super room", - channel.json_body["rooms"][room_id1], + response_body["rooms"][room_id1], ) self.assertEqual( - channel.json_body["rooms"][room_id1]["avatar"], + response_body["rooms"][room_id1]["avatar"], "mxc://DUMMY_MEDIA_ID", - channel.json_body["rooms"][room_id1], + response_body["rooms"][room_id1], ) self.assertEqual( - channel.json_body["rooms"][room_id1]["joined_count"], + response_body["rooms"][room_id1]["joined_count"], # FIXME: The actual number should be "1" (user2) but we currently don't # support this for rooms where the user has left/been banned. 0, ) self.assertEqual( - channel.json_body["rooms"][room_id1]["invited_count"], + response_body["rooms"][room_id1]["invited_count"], 0, ) self.assertIsNone( - channel.json_body["rooms"][room_id1].get("is_dm"), + response_body["rooms"][room_id1].get("is_dm"), ) def test_rooms_meta_heroes(self) -> None: @@ -2215,61 +2231,56 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): self.helper.invite(room_id2, src=user2_id, targ=user3_id, tok=user2_tok) # Make the Sliding Sync request - channel = self.make_request( - "POST", - self.sync_endpoint, - { - "lists": { - "foo-list": { - "ranges": [[0, 1]], - "required_state": [], - "timeline_limit": 0, - } + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [], + "timeline_limit": 0, } - }, - access_token=user1_tok, - ) - self.assertEqual(channel.code, 200, channel.json_body) + } + } + response_body, _ = self.do_sync(sync_body, tok=user1_tok) # Room1 has a name so we shouldn't see any `heroes` which the client would use # the calculate the room name themselves. self.assertEqual( - channel.json_body["rooms"][room_id1]["name"], + response_body["rooms"][room_id1]["name"], "my super room", - channel.json_body["rooms"][room_id1], + response_body["rooms"][room_id1], ) - self.assertIsNone(channel.json_body["rooms"][room_id1].get("heroes")) + self.assertIsNone(response_body["rooms"][room_id1].get("heroes")) self.assertEqual( - channel.json_body["rooms"][room_id1]["joined_count"], + response_body["rooms"][room_id1]["joined_count"], 2, ) self.assertEqual( - channel.json_body["rooms"][room_id1]["invited_count"], + response_body["rooms"][room_id1]["invited_count"], 1, ) # Room2 doesn't have a name so we should see `heroes` populated - self.assertIsNone(channel.json_body["rooms"][room_id2].get("name")) + self.assertIsNone(response_body["rooms"][room_id2].get("name")) self.assertCountEqual( [ hero["user_id"] - for hero in channel.json_body["rooms"][room_id2].get("heroes", []) + for hero in response_body["rooms"][room_id2].get("heroes", []) ], # Heroes shouldn't include the user themselves (we shouldn't see user1) [user2_id, user3_id], ) self.assertEqual( - channel.json_body["rooms"][room_id2]["joined_count"], + response_body["rooms"][room_id2]["joined_count"], 2, ) self.assertEqual( - channel.json_body["rooms"][room_id2]["invited_count"], + response_body["rooms"][room_id2]["invited_count"], 1, ) # We didn't request any state so we shouldn't see any `required_state` - self.assertIsNone(channel.json_body["rooms"][room_id1].get("required_state")) - self.assertIsNone(channel.json_body["rooms"][room_id2].get("required_state")) + self.assertIsNone(response_body["rooms"][room_id1].get("required_state")) + self.assertIsNone(response_body["rooms"][room_id2].get("required_state")) def test_rooms_meta_heroes_max(self) -> None: """ @@ -2308,44 +2319,39 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): self.helper.join(room_id1, user7_id, tok=user7_tok) # Make the Sliding Sync request - channel = self.make_request( - "POST", - self.sync_endpoint, - { - "lists": { - "foo-list": { - "ranges": [[0, 1]], - "required_state": [], - "timeline_limit": 0, - } + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [], + "timeline_limit": 0, } - }, - access_token=user1_tok, - ) - self.assertEqual(channel.code, 200, channel.json_body) + } + } + response_body, _ = self.do_sync(sync_body, tok=user1_tok) # Room2 doesn't have a name so we should see `heroes` populated - self.assertIsNone(channel.json_body["rooms"][room_id1].get("name")) + self.assertIsNone(response_body["rooms"][room_id1].get("name")) self.assertCountEqual( [ hero["user_id"] - for hero in channel.json_body["rooms"][room_id1].get("heroes", []) + for hero in response_body["rooms"][room_id1].get("heroes", []) ], # Heroes should be the first 5 users in the room (excluding the user # themselves, we shouldn't see `user1`) [user2_id, user3_id, user4_id, user5_id, user6_id], ) self.assertEqual( - channel.json_body["rooms"][room_id1]["joined_count"], + response_body["rooms"][room_id1]["joined_count"], 7, ) self.assertEqual( - channel.json_body["rooms"][room_id1]["invited_count"], + response_body["rooms"][room_id1]["invited_count"], 0, ) # We didn't request any state so we shouldn't see any `required_state` - self.assertIsNone(channel.json_body["rooms"][room_id1].get("required_state")) + self.assertIsNone(response_body["rooms"][room_id1].get("required_state")) def test_rooms_meta_heroes_when_banned(self) -> None: """ @@ -2386,28 +2392,23 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): self.helper.invite(room_id1, src=user2_id, targ=user5_id, tok=user2_tok) # Make the Sliding Sync request - channel = self.make_request( - "POST", - self.sync_endpoint, - { - "lists": { - "foo-list": { - "ranges": [[0, 1]], - "required_state": [], - "timeline_limit": 0, - } + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [], + "timeline_limit": 0, } - }, - access_token=user1_tok, - ) - self.assertEqual(channel.code, 200, channel.json_body) + } + } + response_body, _ = self.do_sync(sync_body, tok=user1_tok) # Room2 doesn't have a name so we should see `heroes` populated - self.assertIsNone(channel.json_body["rooms"][room_id1].get("name")) + self.assertIsNone(response_body["rooms"][room_id1].get("name")) self.assertCountEqual( [ hero["user_id"] - for hero in channel.json_body["rooms"][room_id1].get("heroes", []) + for hero in response_body["rooms"][room_id1].get("heroes", []) ], # Heroes shouldn't include the user themselves (we shouldn't see user1). We # also shouldn't see user4 since they joined after user1 was banned. @@ -2418,13 +2419,13 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): ) self.assertEqual( - channel.json_body["rooms"][room_id1]["joined_count"], + response_body["rooms"][room_id1]["joined_count"], # FIXME: The actual number should be "1" (user2) but we currently don't # support this for rooms where the user has left/been banned. 0, ) self.assertEqual( - channel.json_body["rooms"][room_id1]["invited_count"], + response_body["rooms"][room_id1]["invited_count"], # We shouldn't see user5 since they were invited after user1 was banned. # # FIXME: The actual number should be "1" (user3) but we currently don't @@ -2457,46 +2458,41 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): user1_join_response = self.helper.join(room_id1, user1_id, tok=user1_tok) # Make the Sliding Sync request - channel = self.make_request( - "POST", - self.sync_endpoint, - { - "lists": { - "foo-list": { - "ranges": [[0, 1]], - "required_state": [], - "timeline_limit": 3, - } + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [], + "timeline_limit": 3, } - }, - access_token=user1_tok, - ) - self.assertEqual(channel.code, 200, channel.json_body) + } + } + response_body, _ = self.do_sync(sync_body, tok=user1_tok) # We expect to saturate the `timeline_limit` (there are more than 3 messages in the room) self.assertEqual( - channel.json_body["rooms"][room_id1]["limited"], + response_body["rooms"][room_id1]["limited"], True, - channel.json_body["rooms"][room_id1], + response_body["rooms"][room_id1], ) # Check to make sure the latest events are returned self.assertEqual( [ event["event_id"] - for event in channel.json_body["rooms"][room_id1]["timeline"] + for event in response_body["rooms"][room_id1]["timeline"] ], [ event_response4["event_id"], event_response5["event_id"], user1_join_response["event_id"], ], - channel.json_body["rooms"][room_id1]["timeline"], + response_body["rooms"][room_id1]["timeline"], ) # Check to make sure the `prev_batch` points at the right place prev_batch_token = self.get_success( StreamToken.from_string( - self.store, channel.json_body["rooms"][room_id1]["prev_batch"] + self.store, response_body["rooms"][room_id1]["prev_batch"] ) ) prev_batch_room_stream_token_serialized = self.get_success( @@ -2520,9 +2516,9 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): # With no `from_token` (initial sync), it's all historical since there is no # "live" range self.assertEqual( - channel.json_body["rooms"][room_id1]["num_live"], + response_body["rooms"][room_id1]["num_live"], 0, - channel.json_body["rooms"][room_id1], + response_body["rooms"][room_id1], ) def test_rooms_not_limited_initial_sync(self) -> None: @@ -2543,44 +2539,39 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): # Make the Sliding Sync request timeline_limit = 100 - channel = self.make_request( - "POST", - self.sync_endpoint, - { - "lists": { - "foo-list": { - "ranges": [[0, 1]], - "required_state": [], - "timeline_limit": timeline_limit, - } + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [], + "timeline_limit": timeline_limit, } - }, - access_token=user1_tok, - ) - self.assertEqual(channel.code, 200, channel.json_body) + } + } + response_body, _ = self.do_sync(sync_body, tok=user1_tok) # The timeline should be `limited=False` because we have all of the events (no # more to paginate to) self.assertEqual( - channel.json_body["rooms"][room_id1]["limited"], + response_body["rooms"][room_id1]["limited"], False, - channel.json_body["rooms"][room_id1], + response_body["rooms"][room_id1], ) expected_number_of_events = 9 # We're just looking to make sure we got all of the events before hitting the `timeline_limit` self.assertEqual( - len(channel.json_body["rooms"][room_id1]["timeline"]), + len(response_body["rooms"][room_id1]["timeline"]), expected_number_of_events, - channel.json_body["rooms"][room_id1]["timeline"], + response_body["rooms"][room_id1]["timeline"], ) self.assertLessEqual(expected_number_of_events, timeline_limit) # With no `from_token` (initial sync), it's all historical since there is no # "live" token range. self.assertEqual( - channel.json_body["rooms"][room_id1]["num_live"], + response_body["rooms"][room_id1]["num_live"], 0, - channel.json_body["rooms"][room_id1], + response_body["rooms"][room_id1], ) def test_rooms_incremental_sync(self) -> None: @@ -2598,7 +2589,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): # Make an initial Sliding Sync request to grab a token. This is also a sanity # check that we can go from initial to incremental sync. - sync_params = { + sync_body = { "lists": { "foo-list": { "ranges": [[0, 1]], @@ -2607,14 +2598,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): } } } - channel = self.make_request( - "POST", - self.sync_endpoint, - sync_params, - access_token=user1_tok, - ) - self.assertEqual(channel.code, 200, channel.json_body) - next_pos = channel.json_body["pos"] + _, from_token = self.do_sync(sync_body, tok=user1_tok) # Send some events but don't send enough to saturate the `timeline_limit`. # We want to later test that we only get the new events since the `next_pos` @@ -2622,41 +2606,35 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): event_response3 = self.helper.send(room_id1, "activity after3", tok=user2_tok) # Make an incremental Sliding Sync request (what we're trying to test) - channel = self.make_request( - "POST", - self.sync_endpoint + f"?pos={next_pos}", - sync_params, - access_token=user1_tok, - ) - self.assertEqual(channel.code, 200, channel.json_body) + response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) # We only expect to see the new events since the last sync which isn't enough to # fill up the `timeline_limit`. self.assertEqual( - channel.json_body["rooms"][room_id1]["limited"], + response_body["rooms"][room_id1]["limited"], False, - f'Our `timeline_limit` was {sync_params["lists"]["foo-list"]["timeline_limit"]} ' - + f'and {len(channel.json_body["rooms"][room_id1]["timeline"])} events were returned in the timeline. ' - + str(channel.json_body["rooms"][room_id1]), + f'Our `timeline_limit` was {sync_body["lists"]["foo-list"]["timeline_limit"]} ' + + f'and {len(response_body["rooms"][room_id1]["timeline"])} events were returned in the timeline. ' + + str(response_body["rooms"][room_id1]), ) # Check to make sure the latest events are returned self.assertEqual( [ event["event_id"] - for event in channel.json_body["rooms"][room_id1]["timeline"] + for event in response_body["rooms"][room_id1]["timeline"] ], [ event_response2["event_id"], event_response3["event_id"], ], - channel.json_body["rooms"][room_id1]["timeline"], + response_body["rooms"][room_id1]["timeline"], ) # All events are "live" self.assertEqual( - channel.json_body["rooms"][room_id1]["num_live"], + response_body["rooms"][room_id1]["num_live"], 2, - channel.json_body["rooms"][room_id1], + response_body["rooms"][room_id1], ) def test_rooms_bump_stamp(self) -> None: @@ -2701,33 +2679,27 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): ) # Make the Sliding Sync request - timeline_limit = 100 - channel = self.make_request( - "POST", - self.sync_endpoint, - { - "lists": { - "foo-list": { - "ranges": [[0, 1]], - "required_state": [], - "timeline_limit": timeline_limit, - } + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [], + "timeline_limit": 100, } - }, - access_token=user1_tok, - ) - self.assertEqual(channel.code, 200, channel.json_body) + } + } + response_body, _ = self.do_sync(sync_body, tok=user1_tok) # Make sure it has the foo-list we requested self.assertListEqual( - list(channel.json_body["lists"].keys()), + list(response_body["lists"].keys()), ["foo-list"], - channel.json_body["lists"].keys(), + response_body["lists"].keys(), ) # Make sure the list includes the rooms in the right order self.assertListEqual( - list(channel.json_body["lists"]["foo-list"]["ops"]), + list(response_body["lists"]["foo-list"]["ops"]), [ { "op": "SYNC", @@ -2737,23 +2709,137 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): "room_ids": [room_id1, room_id2], } ], - channel.json_body["lists"]["foo-list"], + response_body["lists"]["foo-list"], ) # The `bump_stamp` for room1 should point at the latest message (not the # reaction since it's not one of the `DEFAULT_BUMP_EVENT_TYPES`) self.assertEqual( - channel.json_body["rooms"][room_id1]["bump_stamp"], + response_body["rooms"][room_id1]["bump_stamp"], event_pos1.stream, - channel.json_body["rooms"][room_id1], + response_body["rooms"][room_id1], ) # The `bump_stamp` for room2 should point at the latest message self.assertEqual( - channel.json_body["rooms"][room_id2]["bump_stamp"], + response_body["rooms"][room_id2]["bump_stamp"], event_pos2.stream, - channel.json_body["rooms"][room_id2], + response_body["rooms"][room_id2], + ) + + def test_rooms_bump_stamp_backfill(self) -> None: + """ + Test that `bump_stamp` ignores backfilled events, i.e. events with a + negative stream ordering. + """ + + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + # Create a remote room + creator = "@user:other" + room_id = "!foo:other" + shared_kwargs = { + "room_id": room_id, + "room_version": "10", + } + + create_tuple = self.get_success( + create_event( + self.hs, + prev_event_ids=[], + type=EventTypes.Create, + state_key="", + sender=creator, + **shared_kwargs, + ) + ) + creator_tuple = self.get_success( + create_event( + self.hs, + prev_event_ids=[create_tuple[0].event_id], + auth_event_ids=[create_tuple[0].event_id], + type=EventTypes.Member, + state_key=creator, + content={"membership": Membership.JOIN}, + sender=creator, + **shared_kwargs, + ) + ) + # We add a message event as a valid "bump type" + msg_tuple = self.get_success( + create_event( + self.hs, + prev_event_ids=[creator_tuple[0].event_id], + auth_event_ids=[create_tuple[0].event_id], + type=EventTypes.Message, + content={"body": "foo", "msgtype": "m.text"}, + sender=creator, + **shared_kwargs, + ) + ) + invite_tuple = self.get_success( + create_event( + self.hs, + prev_event_ids=[msg_tuple[0].event_id], + auth_event_ids=[create_tuple[0].event_id, creator_tuple[0].event_id], + type=EventTypes.Member, + state_key=user1_id, + content={"membership": Membership.INVITE}, + sender=creator, + **shared_kwargs, + ) + ) + + remote_events_and_contexts = [ + create_tuple, + creator_tuple, + msg_tuple, + invite_tuple, + ] + + # Ensure the local HS knows the room version + self.get_success( + self.store.store_room(room_id, creator, False, RoomVersions.V10) + ) + + # Persist these events as backfilled events. + persistence = self.hs.get_storage_controllers().persistence + assert persistence is not None + + for event, context in remote_events_and_contexts: + self.get_success(persistence.persist_event(event, context, backfilled=True)) + + # Now we join the local user to the room + join_tuple = self.get_success( + create_event( + self.hs, + prev_event_ids=[invite_tuple[0].event_id], + auth_event_ids=[create_tuple[0].event_id, invite_tuple[0].event_id], + type=EventTypes.Member, + state_key=user1_id, + content={"membership": Membership.JOIN}, + sender=user1_id, + **shared_kwargs, + ) ) + self.get_success(persistence.persist_event(*join_tuple)) + + # Doing an SS request should return a positive `bump_stamp`, even though + # the only event that matches the bump types has as negative stream + # ordering. + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [], + "timeline_limit": 5, + } + } + } + response_body, _ = self.do_sync(sync_body, tok=user1_tok) + + self.assertGreater(response_body["rooms"][room_id]["bump_stamp"], 0) def test_rooms_newly_joined_incremental_sync(self) -> None: """ @@ -2771,7 +2857,20 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): room_id1, "activity before token2", tok=user2_tok ) - from_token = self.event_sources.get_current_token() + # The `timeline_limit` is set to 4 so we can at least see one historical event + # before the `from_token`. We should see historical events because this is a + # `newly_joined` room. + timeline_limit = 4 + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [], + "timeline_limit": timeline_limit, + } + } + } + _, from_token = self.do_sync(sync_body, tok=user1_tok) # Join the room after the `from_token` which will make us consider this room as # `newly_joined`. @@ -2786,42 +2885,23 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): room_id1, "activity after token4", tok=user2_tok ) - # The `timeline_limit` is set to 4 so we can at least see one historical event - # before the `from_token`. We should see historical events because this is a - # `newly_joined` room. - timeline_limit = 4 # Make an incremental Sliding Sync request (what we're trying to test) - channel = self.make_request( - "POST", - self.sync_endpoint - + f"?pos={self.get_success(from_token.to_string(self.store))}", - { - "lists": { - "foo-list": { - "ranges": [[0, 1]], - "required_state": [], - "timeline_limit": timeline_limit, - } - } - }, - access_token=user1_tok, - ) - self.assertEqual(channel.code, 200, channel.json_body) + response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) # We should see the new events and the rest should be filled with historical # events which will make us `limited=True` since there are more to paginate to. self.assertEqual( - channel.json_body["rooms"][room_id1]["limited"], + response_body["rooms"][room_id1]["limited"], True, f"Our `timeline_limit` was {timeline_limit} " - + f'and {len(channel.json_body["rooms"][room_id1]["timeline"])} events were returned in the timeline. ' - + str(channel.json_body["rooms"][room_id1]), + + f'and {len(response_body["rooms"][room_id1]["timeline"])} events were returned in the timeline. ' + + str(response_body["rooms"][room_id1]), ) # Check to make sure that the "live" and historical events are returned self.assertEqual( [ event["event_id"] - for event in channel.json_body["rooms"][room_id1]["timeline"] + for event in response_body["rooms"][room_id1]["timeline"] ], [ event_response2["event_id"], @@ -2829,14 +2909,14 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): event_response3["event_id"], event_response4["event_id"], ], - channel.json_body["rooms"][room_id1]["timeline"], + response_body["rooms"][room_id1]["timeline"], ) # Only events after the `from_token` are "live" (join, event3, event4) self.assertEqual( - channel.json_body["rooms"][room_id1]["num_live"], + response_body["rooms"][room_id1]["num_live"], 3, - channel.json_body["rooms"][room_id1], + response_body["rooms"][room_id1], ) def test_rooms_invite_shared_history_initial_sync(self) -> None: @@ -2873,51 +2953,46 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): self.helper.send(room_id1, "activity after4", tok=user2_tok) # Make the Sliding Sync request - channel = self.make_request( - "POST", - self.sync_endpoint, - { - "lists": { - "foo-list": { - "ranges": [[0, 1]], - "required_state": [], - "timeline_limit": 3, - } + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [], + "timeline_limit": 3, } - }, - access_token=user1_tok, - ) - self.assertEqual(channel.code, 200, channel.json_body) + } + } + response_body, _ = self.do_sync(sync_body, tok=user1_tok) # `timeline` is omitted for `invite` rooms with `stripped_state` self.assertIsNone( - channel.json_body["rooms"][room_id1].get("timeline"), - channel.json_body["rooms"][room_id1], + response_body["rooms"][room_id1].get("timeline"), + response_body["rooms"][room_id1], ) # `num_live` is omitted for `invite` rooms with `stripped_state` (no timeline anyway) self.assertIsNone( - channel.json_body["rooms"][room_id1].get("num_live"), - channel.json_body["rooms"][room_id1], + response_body["rooms"][room_id1].get("num_live"), + response_body["rooms"][room_id1], ) # `limited` is omitted for `invite` rooms with `stripped_state` (no timeline anyway) self.assertIsNone( - channel.json_body["rooms"][room_id1].get("limited"), - channel.json_body["rooms"][room_id1], + response_body["rooms"][room_id1].get("limited"), + response_body["rooms"][room_id1], ) # `prev_batch` is omitted for `invite` rooms with `stripped_state` (no timeline anyway) self.assertIsNone( - channel.json_body["rooms"][room_id1].get("prev_batch"), - channel.json_body["rooms"][room_id1], + response_body["rooms"][room_id1].get("prev_batch"), + response_body["rooms"][room_id1], ) # `required_state` is omitted for `invite` rooms with `stripped_state` self.assertIsNone( - channel.json_body["rooms"][room_id1].get("required_state"), - channel.json_body["rooms"][room_id1], + response_body["rooms"][room_id1].get("required_state"), + response_body["rooms"][room_id1], ) # We should have some `stripped_state` so the potential joiner can identify the # room (we don't care about the order). self.assertCountEqual( - channel.json_body["rooms"][room_id1]["invite_state"], + response_body["rooms"][room_id1]["invite_state"], [ { "content": {"creator": user2_id, "room_version": "10"}, @@ -2944,7 +3019,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): "type": "m.room.member", }, ], - channel.json_body["rooms"][room_id1]["invite_state"], + response_body["rooms"][room_id1]["invite_state"], ) def test_rooms_invite_shared_history_incremental_sync(self) -> None: @@ -2980,58 +3055,54 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): self.helper.send(room_id1, "activity after invite3", tok=user2_tok) self.helper.send(room_id1, "activity after invite4", tok=user2_tok) - from_token = self.event_sources.get_current_token() + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [], + "timeline_limit": 3, + } + } + } + _, from_token = self.do_sync(sync_body, tok=user1_tok) self.helper.send(room_id1, "activity after token5", tok=user2_tok) self.helper.send(room_id1, "activity after toekn6", tok=user2_tok) # Make the Sliding Sync request - channel = self.make_request( - "POST", - self.sync_endpoint - + f"?pos={self.get_success(from_token.to_string(self.store))}", - { - "lists": { - "foo-list": { - "ranges": [[0, 1]], - "required_state": [], - "timeline_limit": 3, - } - } - }, - access_token=user1_tok, + response_body, from_token = self.do_sync( + sync_body, since=from_token, tok=user1_tok ) - self.assertEqual(channel.code, 200, channel.json_body) # `timeline` is omitted for `invite` rooms with `stripped_state` self.assertIsNone( - channel.json_body["rooms"][room_id1].get("timeline"), - channel.json_body["rooms"][room_id1], + response_body["rooms"][room_id1].get("timeline"), + response_body["rooms"][room_id1], ) # `num_live` is omitted for `invite` rooms with `stripped_state` (no timeline anyway) self.assertIsNone( - channel.json_body["rooms"][room_id1].get("num_live"), - channel.json_body["rooms"][room_id1], + response_body["rooms"][room_id1].get("num_live"), + response_body["rooms"][room_id1], ) # `limited` is omitted for `invite` rooms with `stripped_state` (no timeline anyway) self.assertIsNone( - channel.json_body["rooms"][room_id1].get("limited"), - channel.json_body["rooms"][room_id1], + response_body["rooms"][room_id1].get("limited"), + response_body["rooms"][room_id1], ) # `prev_batch` is omitted for `invite` rooms with `stripped_state` (no timeline anyway) self.assertIsNone( - channel.json_body["rooms"][room_id1].get("prev_batch"), - channel.json_body["rooms"][room_id1], + response_body["rooms"][room_id1].get("prev_batch"), + response_body["rooms"][room_id1], ) # `required_state` is omitted for `invite` rooms with `stripped_state` self.assertIsNone( - channel.json_body["rooms"][room_id1].get("required_state"), - channel.json_body["rooms"][room_id1], + response_body["rooms"][room_id1].get("required_state"), + response_body["rooms"][room_id1], ) # We should have some `stripped_state` so the potential joiner can identify the # room (we don't care about the order). self.assertCountEqual( - channel.json_body["rooms"][room_id1]["invite_state"], + response_body["rooms"][room_id1]["invite_state"], [ { "content": {"creator": user2_id, "room_version": "10"}, @@ -3058,7 +3129,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): "type": "m.room.member", }, ], - channel.json_body["rooms"][room_id1]["invite_state"], + response_body["rooms"][room_id1]["invite_state"], ) def test_rooms_invite_world_readable_history_initial_sync(self) -> None: @@ -3112,52 +3183,47 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): self.helper.send(room_id1, "activity after4", tok=user2_tok) # Make the Sliding Sync request - channel = self.make_request( - "POST", - self.sync_endpoint, - { - "lists": { - "foo-list": { - "ranges": [[0, 1]], - "required_state": [], - # Large enough to see the latest events and before the invite - "timeline_limit": 4, - } + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [], + # Large enough to see the latest events and before the invite + "timeline_limit": 4, } - }, - access_token=user1_tok, - ) - self.assertEqual(channel.code, 200, channel.json_body) + } + } + response_body, _ = self.do_sync(sync_body, tok=user1_tok) # `timeline` is omitted for `invite` rooms with `stripped_state` self.assertIsNone( - channel.json_body["rooms"][room_id1].get("timeline"), - channel.json_body["rooms"][room_id1], + response_body["rooms"][room_id1].get("timeline"), + response_body["rooms"][room_id1], ) # `num_live` is omitted for `invite` rooms with `stripped_state` (no timeline anyway) self.assertIsNone( - channel.json_body["rooms"][room_id1].get("num_live"), - channel.json_body["rooms"][room_id1], + response_body["rooms"][room_id1].get("num_live"), + response_body["rooms"][room_id1], ) # `limited` is omitted for `invite` rooms with `stripped_state` (no timeline anyway) self.assertIsNone( - channel.json_body["rooms"][room_id1].get("limited"), - channel.json_body["rooms"][room_id1], + response_body["rooms"][room_id1].get("limited"), + response_body["rooms"][room_id1], ) # `prev_batch` is omitted for `invite` rooms with `stripped_state` (no timeline anyway) self.assertIsNone( - channel.json_body["rooms"][room_id1].get("prev_batch"), - channel.json_body["rooms"][room_id1], + response_body["rooms"][room_id1].get("prev_batch"), + response_body["rooms"][room_id1], ) # `required_state` is omitted for `invite` rooms with `stripped_state` self.assertIsNone( - channel.json_body["rooms"][room_id1].get("required_state"), - channel.json_body["rooms"][room_id1], + response_body["rooms"][room_id1].get("required_state"), + response_body["rooms"][room_id1], ) # We should have some `stripped_state` so the potential joiner can identify the # room (we don't care about the order). self.assertCountEqual( - channel.json_body["rooms"][room_id1]["invite_state"], + response_body["rooms"][room_id1]["invite_state"], [ { "content": {"creator": user2_id, "room_version": "10"}, @@ -3184,7 +3250,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): "type": "m.room.member", }, ], - channel.json_body["rooms"][room_id1]["invite_state"], + response_body["rooms"][room_id1]["invite_state"], ) def test_rooms_invite_world_readable_history_incremental_sync(self) -> None: @@ -3237,59 +3303,53 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): self.helper.send(room_id1, "activity after invite3", tok=user2_tok) self.helper.send(room_id1, "activity after invite4", tok=user2_tok) - from_token = self.event_sources.get_current_token() + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [], + # Large enough to see the latest events and before the invite + "timeline_limit": 4, + } + } + } + _, from_token = self.do_sync(sync_body, tok=user1_tok) self.helper.send(room_id1, "activity after token5", tok=user2_tok) self.helper.send(room_id1, "activity after toekn6", tok=user2_tok) - # Make the Sliding Sync request - channel = self.make_request( - "POST", - self.sync_endpoint - + f"?pos={self.get_success(from_token.to_string(self.store))}", - { - "lists": { - "foo-list": { - "ranges": [[0, 1]], - "required_state": [], - # Large enough to see the latest events and before the invite - "timeline_limit": 4, - } - } - }, - access_token=user1_tok, - ) - self.assertEqual(channel.code, 200, channel.json_body) + # Make the incremental Sliding Sync request + response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) # `timeline` is omitted for `invite` rooms with `stripped_state` self.assertIsNone( - channel.json_body["rooms"][room_id1].get("timeline"), - channel.json_body["rooms"][room_id1], + response_body["rooms"][room_id1].get("timeline"), + response_body["rooms"][room_id1], ) # `num_live` is omitted for `invite` rooms with `stripped_state` (no timeline anyway) self.assertIsNone( - channel.json_body["rooms"][room_id1].get("num_live"), - channel.json_body["rooms"][room_id1], + response_body["rooms"][room_id1].get("num_live"), + response_body["rooms"][room_id1], ) # `limited` is omitted for `invite` rooms with `stripped_state` (no timeline anyway) self.assertIsNone( - channel.json_body["rooms"][room_id1].get("limited"), - channel.json_body["rooms"][room_id1], + response_body["rooms"][room_id1].get("limited"), + response_body["rooms"][room_id1], ) # `prev_batch` is omitted for `invite` rooms with `stripped_state` (no timeline anyway) self.assertIsNone( - channel.json_body["rooms"][room_id1].get("prev_batch"), - channel.json_body["rooms"][room_id1], + response_body["rooms"][room_id1].get("prev_batch"), + response_body["rooms"][room_id1], ) # `required_state` is omitted for `invite` rooms with `stripped_state` self.assertIsNone( - channel.json_body["rooms"][room_id1].get("required_state"), - channel.json_body["rooms"][room_id1], + response_body["rooms"][room_id1].get("required_state"), + response_body["rooms"][room_id1], ) # We should have some `stripped_state` so the potential joiner can identify the # room (we don't care about the order). self.assertCountEqual( - channel.json_body["rooms"][room_id1]["invite_state"], + response_body["rooms"][room_id1]["invite_state"], [ { "content": {"creator": user2_id, "room_version": "10"}, @@ -3316,7 +3376,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): "type": "m.room.member", }, ], - channel.json_body["rooms"][room_id1]["invite_state"], + response_body["rooms"][room_id1]["invite_state"], ) def test_rooms_ban_initial_sync(self) -> None: @@ -3344,47 +3404,42 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): self.helper.send(room_id1, "activity after6", tok=user2_tok) # Make the Sliding Sync request - channel = self.make_request( - "POST", - self.sync_endpoint, - { - "lists": { - "foo-list": { - "ranges": [[0, 1]], - "required_state": [], - "timeline_limit": 3, - } + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [], + "timeline_limit": 3, } - }, - access_token=user1_tok, - ) - self.assertEqual(channel.code, 200, channel.json_body) + } + } + response_body, _ = self.do_sync(sync_body, tok=user1_tok) # We should see events before the ban but not after self.assertEqual( [ event["event_id"] - for event in channel.json_body["rooms"][room_id1]["timeline"] + for event in response_body["rooms"][room_id1]["timeline"] ], [ event_response3["event_id"], event_response4["event_id"], user1_ban_response["event_id"], ], - channel.json_body["rooms"][room_id1]["timeline"], + response_body["rooms"][room_id1]["timeline"], ) # No "live" events in an initial sync (no `from_token` to define the "live" # range) self.assertEqual( - channel.json_body["rooms"][room_id1]["num_live"], + response_body["rooms"][room_id1]["num_live"], 0, - channel.json_body["rooms"][room_id1], + response_body["rooms"][room_id1], ) # There are more events to paginate to self.assertEqual( - channel.json_body["rooms"][room_id1]["limited"], + response_body["rooms"][room_id1]["limited"], True, - channel.json_body["rooms"][room_id1], + response_body["rooms"][room_id1], ) def test_rooms_ban_incremental_sync1(self) -> None: @@ -3402,7 +3457,16 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): self.helper.send(room_id1, "activity before2", tok=user2_tok) self.helper.join(room_id1, user1_id, tok=user1_tok) - from_token = self.event_sources.get_current_token() + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [], + "timeline_limit": 4, + } + } + } + _, from_token = self.do_sync(sync_body, tok=user1_tok) event_response3 = self.helper.send(room_id1, "activity after3", tok=user2_tok) event_response4 = self.helper.send(room_id1, "activity after4", tok=user2_tok) @@ -3415,48 +3479,33 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): self.helper.send(room_id1, "activity after5", tok=user2_tok) self.helper.send(room_id1, "activity after6", tok=user2_tok) - # Make the Sliding Sync request - channel = self.make_request( - "POST", - self.sync_endpoint - + f"?pos={self.get_success(from_token.to_string(self.store))}", - { - "lists": { - "foo-list": { - "ranges": [[0, 1]], - "required_state": [], - "timeline_limit": 4, - } - } - }, - access_token=user1_tok, - ) - self.assertEqual(channel.code, 200, channel.json_body) + # Make the incremental Sliding Sync request + response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) # We should see events before the ban but not after self.assertEqual( [ event["event_id"] - for event in channel.json_body["rooms"][room_id1]["timeline"] + for event in response_body["rooms"][room_id1]["timeline"] ], [ event_response3["event_id"], event_response4["event_id"], user1_ban_response["event_id"], ], - channel.json_body["rooms"][room_id1]["timeline"], + response_body["rooms"][room_id1]["timeline"], ) # All live events in the incremental sync self.assertEqual( - channel.json_body["rooms"][room_id1]["num_live"], + response_body["rooms"][room_id1]["num_live"], 3, - channel.json_body["rooms"][room_id1], + response_body["rooms"][room_id1], ) # There aren't anymore events to paginate to in this range self.assertEqual( - channel.json_body["rooms"][room_id1]["limited"], + response_body["rooms"][room_id1]["limited"], False, - channel.json_body["rooms"][room_id1], + response_body["rooms"][room_id1], ) def test_rooms_ban_incremental_sync2(self) -> None: @@ -3479,42 +3528,24 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): self.helper.send(room_id1, "activity after3", tok=user2_tok) - from_token = self.event_sources.get_current_token() + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [], + "timeline_limit": 4, + } + } + } + _, from_token = self.do_sync(sync_body, tok=user1_tok) self.helper.send(room_id1, "activity after4", tok=user2_tok) - # Make the Sliding Sync request - channel = self.make_request( - "POST", - self.sync_endpoint - + f"?pos={self.get_success(from_token.to_string(self.store))}", - { - "lists": { - "foo-list": { - "ranges": [[0, 1]], - "required_state": [], - "timeline_limit": 4, - } - } - }, - access_token=user1_tok, - ) - self.assertEqual(channel.code, 200, channel.json_body) + # Make the incremental Sliding Sync request + response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) # Nothing to see for this banned user in the room in the token range - self.assertIsNone(channel.json_body["rooms"][room_id1].get("timeline")) - # No events returned in the timeline so nothing is "live" - self.assertEqual( - channel.json_body["rooms"][room_id1]["num_live"], - 0, - channel.json_body["rooms"][room_id1], - ) - # There aren't anymore events to paginate to in this range - self.assertEqual( - channel.json_body["rooms"][room_id1]["limited"], - False, - channel.json_body["rooms"][room_id1], - ) + self.assertIsNone(response_body["rooms"].get(room_id1)) def test_rooms_no_required_state(self) -> None: """ @@ -3529,27 +3560,22 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): self.helper.join(room_id1, user1_id, tok=user1_tok) # Make the Sliding Sync request - channel = self.make_request( - "POST", - self.sync_endpoint, - { - "lists": { - "foo-list": { - "ranges": [[0, 1]], - # Empty `required_state` - "required_state": [], - "timeline_limit": 0, - } + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + # Empty `required_state` + "required_state": [], + "timeline_limit": 0, } - }, - access_token=user1_tok, - ) - self.assertEqual(channel.code, 200, channel.json_body) + } + } + response_body, _ = self.do_sync(sync_body, tok=user1_tok) # No `required_state` in response self.assertIsNone( - channel.json_body["rooms"][room_id1].get("required_state"), - channel.json_body["rooms"][room_id1], + response_body["rooms"][room_id1].get("required_state"), + response_body["rooms"][room_id1], ) def test_rooms_required_state_initial_sync(self) -> None: @@ -3566,40 +3592,35 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): self.helper.join(room_id1, user1_id, tok=user1_tok) # Make the Sliding Sync request - channel = self.make_request( - "POST", - self.sync_endpoint, - { - "lists": { - "foo-list": { - "ranges": [[0, 1]], - "required_state": [ - [EventTypes.Create, ""], - [EventTypes.RoomHistoryVisibility, ""], - # This one doesn't exist in the room - [EventTypes.Tombstone, ""], - ], - "timeline_limit": 0, - } + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [ + [EventTypes.Create, ""], + [EventTypes.RoomHistoryVisibility, ""], + # This one doesn't exist in the room + [EventTypes.Tombstone, ""], + ], + "timeline_limit": 0, } - }, - access_token=user1_tok, - ) - self.assertEqual(channel.code, 200, channel.json_body) + } + } + response_body, _ = self.do_sync(sync_body, tok=user1_tok) state_map = self.get_success( self.storage_controllers.state.get_current_state(room_id1) ) self._assertRequiredStateIncludes( - channel.json_body["rooms"][room_id1]["required_state"], + response_body["rooms"][room_id1]["required_state"], { state_map[(EventTypes.Create, "")], state_map[(EventTypes.RoomHistoryVisibility, "")], }, exact=True, ) - self.assertIsNone(channel.json_body["rooms"][room_id1].get("invite_state")) + self.assertIsNone(response_body["rooms"][room_id1].get("invite_state")) def test_rooms_required_state_incremental_sync(self) -> None: """ @@ -3614,47 +3635,83 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) self.helper.join(room_id1, user1_id, tok=user1_tok) - after_room_token = self.event_sources.get_current_token() + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [ + [EventTypes.Create, ""], + [EventTypes.RoomHistoryVisibility, ""], + # This one doesn't exist in the room + [EventTypes.Tombstone, ""], + ], + "timeline_limit": 1, + } + } + } + _, from_token = self.do_sync(sync_body, tok=user1_tok) - # Make the Sliding Sync request - channel = self.make_request( - "POST", - self.sync_endpoint - + f"?pos={self.get_success(after_room_token.to_string(self.store))}", - { - "lists": { - "foo-list": { - "ranges": [[0, 1]], - "required_state": [ - [EventTypes.Create, ""], - [EventTypes.RoomHistoryVisibility, ""], - # This one doesn't exist in the room - [EventTypes.Tombstone, ""], - ], - "timeline_limit": 0, - } + # Send a message so the room comes down sync. + self.helper.send(room_id1, "msg", tok=user1_tok) + + # Make the incremental Sliding Sync request + response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) + + # We only return updates but only if we've sent the room down the + # connection before. + self.assertIsNone(response_body["rooms"][room_id1].get("required_state")) + self.assertIsNone(response_body["rooms"][room_id1].get("invite_state")) + + def test_rooms_required_state_incremental_sync_restart(self) -> None: + """ + Test `rooms.required_state` returns requested state events in the room during an + incremental sync, after a restart (and so the in memory caches are reset). + """ + + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) + self.helper.join(room_id1, user1_id, tok=user1_tok) + + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [ + [EventTypes.Create, ""], + [EventTypes.RoomHistoryVisibility, ""], + # This one doesn't exist in the room + [EventTypes.Tombstone, ""], + ], + "timeline_limit": 1, } - }, - access_token=user1_tok, - ) - self.assertEqual(channel.code, 200, channel.json_body) + } + } + _, from_token = self.do_sync(sync_body, tok=user1_tok) + # Reset the in-memory cache + self.hs.get_sliding_sync_handler().connection_store._connections.clear() + + # Make the Sliding Sync request + response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) + + # If the cache has been cleared then we do expect the state to come down state_map = self.get_success( self.storage_controllers.state.get_current_state(room_id1) ) - # The returned state doesn't change from initial to incremental sync. In the - # future, we will only return updates but only if we've sent the room down the - # connection before. self._assertRequiredStateIncludes( - channel.json_body["rooms"][room_id1]["required_state"], + response_body["rooms"][room_id1]["required_state"], { state_map[(EventTypes.Create, "")], state_map[(EventTypes.RoomHistoryVisibility, "")], }, exact=True, ) - self.assertIsNone(channel.json_body["rooms"][room_id1].get("invite_state")) + self.assertIsNone(response_body["rooms"][room_id1].get("invite_state")) def test_rooms_required_state_wildcard(self) -> None: """ @@ -3684,35 +3741,30 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): ) # Make the Sliding Sync request with wildcards for the `event_type` and `state_key` - channel = self.make_request( - "POST", - self.sync_endpoint, - { - "lists": { - "foo-list": { - "ranges": [[0, 1]], - "required_state": [ - [StateValues.WILDCARD, StateValues.WILDCARD], - ], - "timeline_limit": 0, - } + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [ + [StateValues.WILDCARD, StateValues.WILDCARD], + ], + "timeline_limit": 0, } - }, - access_token=user1_tok, - ) - self.assertEqual(channel.code, 200, channel.json_body) + } + } + response_body, _ = self.do_sync(sync_body, tok=user1_tok) state_map = self.get_success( self.storage_controllers.state.get_current_state(room_id1) ) self._assertRequiredStateIncludes( - channel.json_body["rooms"][room_id1]["required_state"], + response_body["rooms"][room_id1]["required_state"], # We should see all the state events in the room state_map.values(), exact=True, ) - self.assertIsNone(channel.json_body["rooms"][room_id1].get("invite_state")) + self.assertIsNone(response_body["rooms"][room_id1].get("invite_state")) def test_rooms_required_state_wildcard_event_type(self) -> None: """ @@ -3743,23 +3795,18 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): ) # Make the Sliding Sync request with wildcards for the `event_type` - channel = self.make_request( - "POST", - self.sync_endpoint, - { - "lists": { - "foo-list": { - "ranges": [[0, 1]], - "required_state": [ - [StateValues.WILDCARD, user2_id], - ], - "timeline_limit": 0, - } + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [ + [StateValues.WILDCARD, user2_id], + ], + "timeline_limit": 0, } - }, - access_token=user1_tok, - ) - self.assertEqual(channel.code, 200, channel.json_body) + } + } + response_body, _ = self.do_sync(sync_body, tok=user1_tok) state_map = self.get_success( self.storage_controllers.state.get_current_state(room_id1) @@ -3767,7 +3814,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): # We expect at-least any state event with the `user2_id` as the `state_key` self._assertRequiredStateIncludes( - channel.json_body["rooms"][room_id1]["required_state"], + response_body["rooms"][room_id1]["required_state"], { state_map[(EventTypes.Member, user2_id)], state_map[("org.matrix.foo_state", user2_id)], @@ -3776,7 +3823,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): # events when the `event_type` is a wildcard. exact=False, ) - self.assertIsNone(channel.json_body["rooms"][room_id1].get("invite_state")) + self.assertIsNone(response_body["rooms"][room_id1].get("invite_state")) def test_rooms_required_state_wildcard_state_key(self) -> None: """ @@ -3792,37 +3839,32 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): self.helper.join(room_id1, user1_id, tok=user1_tok) # Make the Sliding Sync request with wildcards for the `state_key` - channel = self.make_request( - "POST", - self.sync_endpoint, - { - "lists": { - "foo-list": { - "ranges": [[0, 1]], - "required_state": [ - [EventTypes.Member, StateValues.WILDCARD], - ], - "timeline_limit": 0, - } + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [ + [EventTypes.Member, StateValues.WILDCARD], + ], + "timeline_limit": 0, } - }, - access_token=user1_tok, - ) - self.assertEqual(channel.code, 200, channel.json_body) + } + } + response_body, _ = self.do_sync(sync_body, tok=user1_tok) state_map = self.get_success( self.storage_controllers.state.get_current_state(room_id1) ) self._assertRequiredStateIncludes( - channel.json_body["rooms"][room_id1]["required_state"], + response_body["rooms"][room_id1]["required_state"], { state_map[(EventTypes.Member, user1_id)], state_map[(EventTypes.Member, user2_id)], }, exact=True, ) - self.assertIsNone(channel.json_body["rooms"][room_id1].get("invite_state")) + self.assertIsNone(response_body["rooms"][room_id1].get("invite_state")) def test_rooms_required_state_lazy_loading_room_members(self) -> None: """ @@ -3845,24 +3887,19 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): self.helper.send(room_id1, "3", tok=user2_tok) # Make the Sliding Sync request with lazy loading for the room members - channel = self.make_request( - "POST", - self.sync_endpoint, - { - "lists": { - "foo-list": { - "ranges": [[0, 1]], - "required_state": [ - [EventTypes.Create, ""], - [EventTypes.Member, StateValues.LAZY], - ], - "timeline_limit": 3, - } + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [ + [EventTypes.Create, ""], + [EventTypes.Member, StateValues.LAZY], + ], + "timeline_limit": 3, } - }, - access_token=user1_tok, - ) - self.assertEqual(channel.code, 200, channel.json_body) + } + } + response_body, _ = self.do_sync(sync_body, tok=user1_tok) state_map = self.get_success( self.storage_controllers.state.get_current_state(room_id1) @@ -3870,7 +3907,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): # Only user2 and user3 sent events in the 3 events we see in the `timeline` self._assertRequiredStateIncludes( - channel.json_body["rooms"][room_id1]["required_state"], + response_body["rooms"][room_id1]["required_state"], { state_map[(EventTypes.Create, "")], state_map[(EventTypes.Member, user2_id)], @@ -3878,7 +3915,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): }, exact=True, ) - self.assertIsNone(channel.json_body["rooms"][room_id1].get("invite_state")) + self.assertIsNone(response_body["rooms"][room_id1].get("invite_state")) def test_rooms_required_state_me(self) -> None: """ @@ -3918,25 +3955,20 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): ) # Make the Sliding Sync request with a request for '$ME'. - channel = self.make_request( - "POST", - self.sync_endpoint, - { - "lists": { - "foo-list": { - "ranges": [[0, 1]], - "required_state": [ - [EventTypes.Create, ""], - [EventTypes.Member, StateValues.ME], - ["org.matrix.foo", StateValues.ME], - ], - "timeline_limit": 3, - } + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [ + [EventTypes.Create, ""], + [EventTypes.Member, StateValues.ME], + ["org.matrix.foo", StateValues.ME], + ], + "timeline_limit": 3, } - }, - access_token=user1_tok, - ) - self.assertEqual(channel.code, 200, channel.json_body) + } + } + response_body, _ = self.do_sync(sync_body, tok=user1_tok) state_map = self.get_success( self.storage_controllers.state.get_current_state(room_id1) @@ -3944,7 +3976,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): # Only user2 and user3 sent events in the 3 events we see in the `timeline` self._assertRequiredStateIncludes( - channel.json_body["rooms"][room_id1]["required_state"], + response_body["rooms"][room_id1]["required_state"], { state_map[(EventTypes.Create, "")], state_map[(EventTypes.Member, user1_id)], @@ -3952,7 +3984,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): }, exact=True, ) - self.assertIsNone(channel.json_body["rooms"][room_id1].get("invite_state")) + self.assertIsNone(response_body["rooms"][room_id1].get("invite_state")) @parameterized.expand([(Membership.LEAVE,), (Membership.BAN,)]) def test_rooms_required_state_leave_ban(self, stop_membership: str) -> None: @@ -3966,7 +3998,20 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): user3_id = self.register_user("user3", "pass") user3_tok = self.login(user3_id, "pass") - from_token = self.event_sources.get_current_token() + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [ + [EventTypes.Create, ""], + [EventTypes.Member, "*"], + ["org.matrix.foo_state", ""], + ], + "timeline_limit": 3, + } + } + } + _, from_token = self.do_sync(sync_body, tok=user1_tok) room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) self.helper.join(room_id1, user1_id, tok=user1_tok) @@ -4002,30 +4047,11 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): self.helper.leave(room_id1, user3_id, tok=user3_tok) # Make the Sliding Sync request with lazy loading for the room members - channel = self.make_request( - "POST", - self.sync_endpoint - + f"?pos={self.get_success(from_token.to_string(self.store))}", - { - "lists": { - "foo-list": { - "ranges": [[0, 1]], - "required_state": [ - [EventTypes.Create, ""], - [EventTypes.Member, "*"], - ["org.matrix.foo_state", ""], - ], - "timeline_limit": 3, - } - } - }, - access_token=user1_tok, - ) - self.assertEqual(channel.code, 200, channel.json_body) + response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) # Only user2 and user3 sent events in the 3 events we see in the `timeline` self._assertRequiredStateIncludes( - channel.json_body["rooms"][room_id1]["required_state"], + response_body["rooms"][room_id1]["required_state"], { state_map[(EventTypes.Create, "")], state_map[(EventTypes.Member, user1_id)], @@ -4035,7 +4061,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): }, exact=True, ) - self.assertIsNone(channel.json_body["rooms"][room_id1].get("invite_state")) + self.assertIsNone(response_body["rooms"][room_id1].get("invite_state")) def test_rooms_required_state_combine_superset(self) -> None: """ @@ -4065,45 +4091,40 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): ) # Make the Sliding Sync request with wildcards for the `state_key` - channel = self.make_request( - "POST", - self.sync_endpoint, - { - "lists": { - "foo-list": { - "ranges": [[0, 1]], - "required_state": [ - [EventTypes.Create, ""], - [EventTypes.Member, user1_id], - ], - "timeline_limit": 0, - }, - "bar-list": { - "ranges": [[0, 1]], - "required_state": [ - [EventTypes.Member, StateValues.WILDCARD], - ["org.matrix.foo_state", ""], - ], - "timeline_limit": 0, - }, + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [ + [EventTypes.Create, ""], + [EventTypes.Member, user1_id], + ], + "timeline_limit": 0, }, - "room_subscriptions": { - room_id1: { - "required_state": [["org.matrix.bar_state", ""]], - "timeline_limit": 0, - } + "bar-list": { + "ranges": [[0, 1]], + "required_state": [ + [EventTypes.Member, StateValues.WILDCARD], + ["org.matrix.foo_state", ""], + ], + "timeline_limit": 0, }, }, - access_token=user1_tok, - ) - self.assertEqual(channel.code, 200, channel.json_body) + "room_subscriptions": { + room_id1: { + "required_state": [["org.matrix.bar_state", ""]], + "timeline_limit": 0, + } + }, + } + response_body, _ = self.do_sync(sync_body, tok=user1_tok) state_map = self.get_success( self.storage_controllers.state.get_current_state(room_id1) ) self._assertRequiredStateIncludes( - channel.json_body["rooms"][room_id1]["required_state"], + response_body["rooms"][room_id1]["required_state"], { state_map[(EventTypes.Create, "")], state_map[(EventTypes.Member, user1_id)], @@ -4113,7 +4134,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): }, exact=True, ) - self.assertIsNone(channel.json_body["rooms"][room_id1].get("invite_state")) + self.assertIsNone(response_body["rooms"][room_id1].get("invite_state")) def test_rooms_required_state_partial_state(self) -> None: """ @@ -4136,28 +4157,23 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): ) # Make the Sliding Sync request (NOT lazy-loading room members) - channel = self.make_request( - "POST", - self.sync_endpoint, - { - "lists": { - "foo-list": { - "ranges": [[0, 1]], - "required_state": [ - [EventTypes.Create, ""], - ], - "timeline_limit": 0, - }, - } - }, - access_token=user1_tok, - ) - self.assertEqual(channel.code, 200, channel.json_body) + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [ + [EventTypes.Create, ""], + ], + "timeline_limit": 0, + }, + } + } + response_body, _ = self.do_sync(sync_body, tok=user1_tok) # Make sure the list includes room1 but room2 is excluded because it's still # partially-stated self.assertListEqual( - list(channel.json_body["lists"]["foo-list"]["ops"]), + list(response_body["lists"]["foo-list"]["ops"]), [ { "op": "SYNC", @@ -4165,33 +4181,28 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): "room_ids": [room_id1], } ], - channel.json_body["lists"]["foo-list"], + response_body["lists"]["foo-list"], ) # Make the Sliding Sync request (with lazy-loading room members) - channel = self.make_request( - "POST", - self.sync_endpoint, - { - "lists": { - "foo-list": { - "ranges": [[0, 1]], - "required_state": [ - [EventTypes.Create, ""], - # Lazy-load room members - [EventTypes.Member, StateValues.LAZY], - ], - "timeline_limit": 0, - }, - } - }, - access_token=user1_tok, - ) - self.assertEqual(channel.code, 200, channel.json_body) + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [ + [EventTypes.Create, ""], + # Lazy-load room members + [EventTypes.Member, StateValues.LAZY], + ], + "timeline_limit": 0, + }, + } + } + response_body, _ = self.do_sync(sync_body, tok=user1_tok) # The list should include both rooms now because we're lazy-loading room members self.assertListEqual( - list(channel.json_body["lists"]["foo-list"]["ops"]), + list(response_body["lists"]["foo-list"]["ops"]), [ { "op": "SYNC", @@ -4199,7 +4210,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): "room_ids": [room_id2, room_id1], } ], - channel.json_body["lists"]["foo-list"], + response_body["lists"]["foo-list"], ) def test_room_subscriptions_with_join_membership(self) -> None: @@ -4216,22 +4227,17 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): join_response = self.helper.join(room_id1, user1_id, tok=user1_tok) # Make the Sliding Sync request with just the room subscription - channel = self.make_request( - "POST", - self.sync_endpoint, - { - "room_subscriptions": { - room_id1: { - "required_state": [ - [EventTypes.Create, ""], - ], - "timeline_limit": 1, - } - }, + sync_body = { + "room_subscriptions": { + room_id1: { + "required_state": [ + [EventTypes.Create, ""], + ], + "timeline_limit": 1, + } }, - access_token=user1_tok, - ) - self.assertEqual(channel.code, 200, channel.json_body) + } + response_body, _ = self.do_sync(sync_body, tok=user1_tok) state_map = self.get_success( self.storage_controllers.state.get_current_state(room_id1) @@ -4239,37 +4245,37 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): # We should see some state self._assertRequiredStateIncludes( - channel.json_body["rooms"][room_id1]["required_state"], + response_body["rooms"][room_id1]["required_state"], { state_map[(EventTypes.Create, "")], }, exact=True, ) - self.assertIsNone(channel.json_body["rooms"][room_id1].get("invite_state")) + self.assertIsNone(response_body["rooms"][room_id1].get("invite_state")) # We should see some events self.assertEqual( [ event["event_id"] - for event in channel.json_body["rooms"][room_id1]["timeline"] + for event in response_body["rooms"][room_id1]["timeline"] ], [ join_response["event_id"], ], - channel.json_body["rooms"][room_id1]["timeline"], + response_body["rooms"][room_id1]["timeline"], ) # No "live" events in an initial sync (no `from_token` to define the "live" # range) self.assertEqual( - channel.json_body["rooms"][room_id1]["num_live"], + response_body["rooms"][room_id1]["num_live"], 0, - channel.json_body["rooms"][room_id1], + response_body["rooms"][room_id1], ) # There are more events to paginate to self.assertEqual( - channel.json_body["rooms"][room_id1]["limited"], + response_body["rooms"][room_id1]["limited"], True, - channel.json_body["rooms"][room_id1], + response_body["rooms"][room_id1], ) def test_room_subscriptions_with_leave_membership(self) -> None: @@ -4310,57 +4316,52 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): ) # Make the Sliding Sync request with just the room subscription - channel = self.make_request( - "POST", - self.sync_endpoint, - { - "room_subscriptions": { - room_id1: { - "required_state": [ - ["org.matrix.foo_state", ""], - ], - "timeline_limit": 2, - } - }, + sync_body = { + "room_subscriptions": { + room_id1: { + "required_state": [ + ["org.matrix.foo_state", ""], + ], + "timeline_limit": 2, + } }, - access_token=user1_tok, - ) - self.assertEqual(channel.code, 200, channel.json_body) + } + response_body, _ = self.do_sync(sync_body, tok=user1_tok) # We should see the state at the time of the leave self._assertRequiredStateIncludes( - channel.json_body["rooms"][room_id1]["required_state"], + response_body["rooms"][room_id1]["required_state"], { state_map[("org.matrix.foo_state", "")], }, exact=True, ) - self.assertIsNone(channel.json_body["rooms"][room_id1].get("invite_state")) + self.assertIsNone(response_body["rooms"][room_id1].get("invite_state")) # We should see some before we left (nothing after) self.assertEqual( [ event["event_id"] - for event in channel.json_body["rooms"][room_id1]["timeline"] + for event in response_body["rooms"][room_id1]["timeline"] ], [ join_response["event_id"], leave_response["event_id"], ], - channel.json_body["rooms"][room_id1]["timeline"], + response_body["rooms"][room_id1]["timeline"], ) # No "live" events in an initial sync (no `from_token` to define the "live" # range) self.assertEqual( - channel.json_body["rooms"][room_id1]["num_live"], + response_body["rooms"][room_id1]["num_live"], 0, - channel.json_body["rooms"][room_id1], + response_body["rooms"][room_id1], ) # There are more events to paginate to self.assertEqual( - channel.json_body["rooms"][room_id1]["limited"], + response_body["rooms"][room_id1]["limited"], True, - channel.json_body["rooms"][room_id1], + response_body["rooms"][room_id1], ) def test_room_subscriptions_no_leak_private_room(self) -> None: @@ -4381,27 +4382,20 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): ) # Make the Sliding Sync request with just the room subscription - channel = self.make_request( - "POST", - self.sync_endpoint, - { - "room_subscriptions": { - room_id1: { - "required_state": [ - [EventTypes.Create, ""], - ], - "timeline_limit": 1, - } - }, + sync_body = { + "room_subscriptions": { + room_id1: { + "required_state": [ + [EventTypes.Create, ""], + ], + "timeline_limit": 1, + } }, - access_token=user1_tok, - ) - self.assertEqual(channel.code, 200, channel.json_body) + } + response_body, _ = self.do_sync(sync_body, tok=user1_tok) # We should not see the room at all (we're not in it) - self.assertIsNone( - channel.json_body["rooms"].get(room_id1), channel.json_body["rooms"] - ) + self.assertIsNone(response_body["rooms"].get(room_id1), response_body["rooms"]) def test_room_subscriptions_world_readable(self) -> None: """ @@ -4444,111 +4438,506 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): # Note: We never join the room # Make the Sliding Sync request with just the room subscription - channel = self.make_request( - "POST", - self.sync_endpoint, - { - "room_subscriptions": { - room_id1: { - "required_state": [ - [EventTypes.Create, ""], - ], - "timeline_limit": 1, - } - }, + sync_body = { + "room_subscriptions": { + room_id1: { + "required_state": [ + [EventTypes.Create, ""], + ], + "timeline_limit": 1, + } }, - access_token=user1_tok, - ) - self.assertEqual(channel.code, 200, channel.json_body) + } + response_body, _ = self.do_sync(sync_body, tok=user1_tok) # FIXME: In the future, we should be able to see the room because it's # `world_readable` but currently we don't support this. - self.assertIsNone( - channel.json_body["rooms"].get(room_id1), channel.json_body["rooms"] + self.assertIsNone(response_body["rooms"].get(room_id1), response_body["rooms"]) + + def test_rooms_required_state_incremental_sync_LIVE(self) -> None: + """Test that we only get state updates in incremental sync for rooms + we've already seen (LIVE). + """ + + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) + self.helper.join(room_id1, user1_id, tok=user1_tok) + + # Make the Sliding Sync request + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [ + [EventTypes.Create, ""], + [EventTypes.RoomHistoryVisibility, ""], + # This one doesn't exist in the room + [EventTypes.Name, ""], + ], + "timeline_limit": 0, + } + } + } + + response_body, from_token = self.do_sync(sync_body, tok=user1_tok) + + state_map = self.get_success( + self.storage_controllers.state.get_current_state(room_id1) ) + self._assertRequiredStateIncludes( + response_body["rooms"][room_id1]["required_state"], + { + state_map[(EventTypes.Create, "")], + state_map[(EventTypes.RoomHistoryVisibility, "")], + }, + exact=True, + ) -class SlidingSyncToDeviceExtensionTestCase(unittest.HomeserverTestCase): - """Tests for the to-device sliding sync extension""" + # Send a state event + self.helper.send_state( + room_id1, EventTypes.Name, body={"name": "foo"}, tok=user2_tok + ) - servlets = [ - synapse.rest.admin.register_servlets, - login.register_servlets, - sync.register_servlets, - sendtodevice.register_servlets, - ] + response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) - def default_config(self) -> JsonDict: - config = super().default_config() - # Enable sliding sync - config["experimental_features"] = {"msc3575_enabled": True} - return config + state_map = self.get_success( + self.storage_controllers.state.get_current_state(room_id1) + ) - def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: - self.store = hs.get_datastores().main - self.event_sources = hs.get_event_sources() - self.account_data_handler = hs.get_account_data_handler() - self.notifier = hs.get_notifier() - self.sync_endpoint = ( - "/_matrix/client/unstable/org.matrix.simplified_msc3575/sync" + self.assertNotIn("initial", response_body["rooms"][room_id1]) + self._assertRequiredStateIncludes( + response_body["rooms"][room_id1]["required_state"], + { + state_map[(EventTypes.Name, "")], + }, + exact=True, ) - def _bump_notifier_wait_for_events(self, user_id: str) -> None: + @parameterized.expand([(False,), (True,)]) + def test_rooms_timeline_incremental_sync_PREVIOUSLY(self, limited: bool) -> None: """ - Wake-up a `notifier.wait_for_events(user_id)` call without affecting the Sliding - Sync results. + Test getting room data where we have previously sent down the room, but + we missed sending down some timeline events previously and so its status + is considered PREVIOUSLY. + + There are two versions of this test, one where there are more messages + than the timeline limit, and one where there isn't. """ - # We're expecting some new activity from this point onwards - from_token = self.event_sources.get_current_token() - triggered_notifier_wait_for_events = False + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") - async def _on_new_acivity( - before_token: StreamToken, after_token: StreamToken - ) -> bool: - nonlocal triggered_notifier_wait_for_events - triggered_notifier_wait_for_events = True - return True + room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok) + room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok) - # Listen for some new activity for the user. We're just trying to confirm that - # our bump below actually does what we think it does (triggers new activity for - # the user). - result_awaitable = self.notifier.wait_for_events( - user_id, - 1000, - _on_new_acivity, - from_token=from_token, + self.helper.send(room_id1, "msg", tok=user1_tok) + + timeline_limit = 5 + conn_id = "conn_id" + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 0]], + "required_state": [], + "timeline_limit": timeline_limit, + } + }, + "conn_id": "conn_id", + } + + # The first room gets sent down the initial sync + response_body, initial_from_token = self.do_sync(sync_body, tok=user1_tok) + self.assertCountEqual( + response_body["rooms"].keys(), {room_id1}, response_body["rooms"] ) - # Update the account data so that `notifier.wait_for_events(...)` wakes up. - # We're bumping account data because it won't show up in the Sliding Sync - # response so it won't affect whether we have results. - self.get_success( - self.account_data_handler.add_account_data_for_user( - user_id, - "org.matrix.foobarbaz", - {"foo": "bar"}, + # We now send down some events in room1 (depending on the test param). + expected_events = [] # The set of events in the timeline + if limited: + for _ in range(10): + resp = self.helper.send(room_id1, "msg1", tok=user1_tok) + expected_events.append(resp["event_id"]) + else: + resp = self.helper.send(room_id1, "msg1", tok=user1_tok) + expected_events.append(resp["event_id"]) + + # A second messages happens in the other room, so room1 won't get sent down. + self.helper.send(room_id2, "msg", tok=user1_tok) + + # Only the second room gets sent down sync. + response_body, from_token = self.do_sync( + sync_body, since=initial_from_token, tok=user1_tok + ) + + self.assertCountEqual( + response_body["rooms"].keys(), {room_id2}, response_body["rooms"] + ) + + # FIXME: This is a hack to record that the first room wasn't sent down + # sync, as we don't implement that currently. + sliding_sync_handler = self.hs.get_sliding_sync_handler() + requester = self.get_success( + self.hs.get_auth().get_user_by_access_token(user1_tok) + ) + sync_config = SlidingSyncConfig( + user=requester.user, + requester=requester, + conn_id=conn_id, + ) + + parsed_initial_from_token = self.get_success( + SlidingSyncStreamToken.from_string(self.store, initial_from_token) + ) + connection_position = self.get_success( + sliding_sync_handler.connection_store.record_rooms( + sync_config, + parsed_initial_from_token, + sent_room_ids=[], + unsent_room_ids=[room_id1], ) ) - # Wait for our notifier result - self.get_success(result_awaitable) + # FIXME: Now fix up `from_token` with new connect position above. + parsed_from_token = self.get_success( + SlidingSyncStreamToken.from_string(self.store, from_token) + ) + parsed_from_token = SlidingSyncStreamToken( + stream_token=parsed_from_token.stream_token, + connection_position=connection_position, + ) + from_token = self.get_success(parsed_from_token.to_string(self.store)) - if not triggered_notifier_wait_for_events: - raise AssertionError( - "Expected `notifier.wait_for_events(...)` to be triggered" + # We now send another event to room1, so we should sync all the missing events. + resp = self.helper.send(room_id1, "msg2", tok=user1_tok) + expected_events.append(resp["event_id"]) + + # This sync should contain the messages from room1 not yet sent down. + response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) + + self.assertCountEqual( + response_body["rooms"].keys(), {room_id1}, response_body["rooms"] + ) + self.assertNotIn("initial", response_body["rooms"][room_id1]) + + self.assertEqual( + [ev["event_id"] for ev in response_body["rooms"][room_id1]["timeline"]], + expected_events[-timeline_limit:], + ) + self.assertEqual(response_body["rooms"][room_id1]["limited"], limited) + self.assertEqual(response_body["rooms"][room_id1].get("required_state"), None) + + def test_rooms_required_state_incremental_sync_PREVIOUSLY(self) -> None: + """ + Test getting room data where we have previously sent down the room, but + we missed sending down some state previously and so its status is + considered PREVIOUSLY. + """ + + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok) + room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok) + + self.helper.send(room_id1, "msg", tok=user1_tok) + + conn_id = "conn_id" + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 0]], + "required_state": [ + [EventTypes.Create, ""], + [EventTypes.RoomHistoryVisibility, ""], + # This one doesn't exist in the room + [EventTypes.Name, ""], + ], + "timeline_limit": 0, + } + }, + "conn_id": "conn_id", + } + + # The first room gets sent down the initial sync + response_body, initial_from_token = self.do_sync(sync_body, tok=user1_tok) + self.assertCountEqual( + response_body["rooms"].keys(), {room_id1}, response_body["rooms"] + ) + + # We now send down some state in room1 + resp = self.helper.send_state( + room_id1, EventTypes.Name, {"name": "foo"}, tok=user1_tok + ) + name_change_id = resp["event_id"] + + # A second messages happens in the other room, so room1 won't get sent down. + self.helper.send(room_id2, "msg", tok=user1_tok) + + # Only the second room gets sent down sync. + response_body, from_token = self.do_sync( + sync_body, since=initial_from_token, tok=user1_tok + ) + + self.assertCountEqual( + response_body["rooms"].keys(), {room_id2}, response_body["rooms"] + ) + + # FIXME: This is a hack to record that the first room wasn't sent down + # sync, as we don't implement that currently. + sliding_sync_handler = self.hs.get_sliding_sync_handler() + requester = self.get_success( + self.hs.get_auth().get_user_by_access_token(user1_tok) + ) + sync_config = SlidingSyncConfig( + user=requester.user, + requester=requester, + conn_id=conn_id, + ) + + parsed_initial_from_token = self.get_success( + SlidingSyncStreamToken.from_string(self.store, initial_from_token) + ) + connection_position = self.get_success( + sliding_sync_handler.connection_store.record_rooms( + sync_config, + parsed_initial_from_token, + sent_room_ids=[], + unsent_room_ids=[room_id1], ) + ) + + # FIXME: Now fix up `from_token` with new connect position above. + parsed_from_token = self.get_success( + SlidingSyncStreamToken.from_string(self.store, from_token) + ) + parsed_from_token = SlidingSyncStreamToken( + stream_token=parsed_from_token.stream_token, + connection_position=connection_position, + ) + from_token = self.get_success(parsed_from_token.to_string(self.store)) + + # We now send another event to room1, so we should sync all the missing state. + self.helper.send(room_id1, "msg", tok=user1_tok) + + # This sync should contain the state changes from room1. + response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) + + self.assertCountEqual( + response_body["rooms"].keys(), {room_id1}, response_body["rooms"] + ) + self.assertNotIn("initial", response_body["rooms"][room_id1]) + + # We should only see the name change. + self.assertEqual( + [ + ev["event_id"] + for ev in response_body["rooms"][room_id1]["required_state"] + ], + [name_change_id], + ) + + def test_rooms_required_state_incremental_sync_NEVER(self) -> None: + """ + Test getting `required_state` where we have NEVER sent down the room before + """ + + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok) + room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok) + + self.helper.send(room_id1, "msg", tok=user1_tok) + + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 0]], + "required_state": [ + [EventTypes.Create, ""], + [EventTypes.RoomHistoryVisibility, ""], + # This one doesn't exist in the room + [EventTypes.Name, ""], + ], + "timeline_limit": 1, + } + }, + } + + # A message happens in the other room, so room1 won't get sent down. + self.helper.send(room_id2, "msg", tok=user1_tok) + + # Only the second room gets sent down sync. + response_body, from_token = self.do_sync(sync_body, tok=user1_tok) + + self.assertCountEqual( + response_body["rooms"].keys(), {room_id2}, response_body["rooms"] + ) + + # We now send another event to room1, so we should send down the full + # room. + self.helper.send(room_id1, "msg2", tok=user1_tok) + + # This sync should contain the messages from room1 not yet sent down. + response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) + + self.assertCountEqual( + response_body["rooms"].keys(), {room_id1}, response_body["rooms"] + ) + + self.assertEqual(response_body["rooms"][room_id1]["initial"], True) + + state_map = self.get_success( + self.storage_controllers.state.get_current_state(room_id1) + ) + + self._assertRequiredStateIncludes( + response_body["rooms"][room_id1]["required_state"], + { + state_map[(EventTypes.Create, "")], + state_map[(EventTypes.RoomHistoryVisibility, "")], + }, + exact=True, + ) + + def test_rooms_timeline_incremental_sync_NEVER(self) -> None: + """ + Test getting timeline room data where we have NEVER sent down the room + before + """ + + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok) + room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok) + + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 0]], + "required_state": [], + "timeline_limit": 5, + } + }, + } + + expected_events = [] + for _ in range(4): + resp = self.helper.send(room_id1, "msg", tok=user1_tok) + expected_events.append(resp["event_id"]) + + # A message happens in the other room, so room1 won't get sent down. + self.helper.send(room_id2, "msg", tok=user1_tok) + + # Only the second room gets sent down sync. + response_body, from_token = self.do_sync(sync_body, tok=user1_tok) + + self.assertCountEqual( + response_body["rooms"].keys(), {room_id2}, response_body["rooms"] + ) + + # We now send another event to room1 so it comes down sync + resp = self.helper.send(room_id1, "msg2", tok=user1_tok) + expected_events.append(resp["event_id"]) + + # This sync should contain the messages from room1 not yet sent down. + response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) + + self.assertCountEqual( + response_body["rooms"].keys(), {room_id1}, response_body["rooms"] + ) + + self.assertEqual( + [ev["event_id"] for ev in response_body["rooms"][room_id1]["timeline"]], + expected_events, + ) + self.assertEqual(response_body["rooms"][room_id1]["limited"], True) + self.assertEqual(response_body["rooms"][room_id1]["initial"], True) + + def test_rooms_with_no_updates_do_not_come_down_incremental_sync(self) -> None: + """ + Test that rooms with no updates are returned in subsequent incremental + syncs. + """ + + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok) + + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [], + "timeline_limit": 0, + } + } + } + + _, from_token = self.do_sync(sync_body, tok=user1_tok) + + # Make the incremental Sliding Sync request + response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) + + # Nothing has happened in the room, so the room should not come down + # /sync. + self.assertIsNone(response_body["rooms"].get(room_id1)) + + def test_empty_initial_room_comes_down_sync(self) -> None: + """ + Test that rooms come down /sync even with empty required state and + timeline limit in initial sync. + """ + + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok) + + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [], + "timeline_limit": 0, + } + } + } + + # Make the Sliding Sync request + response_body, _ = self.do_sync(sync_body, tok=user1_tok) + self.assertEqual(response_body["rooms"][room_id1]["initial"], True) + + +class SlidingSyncToDeviceExtensionTestCase(SlidingSyncBase): + """Tests for the to-device sliding sync extension""" + + servlets = [ + synapse.rest.admin.register_servlets, + login.register_servlets, + sync.register_servlets, + sendtodevice.register_servlets, + ] + + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + self.store = hs.get_datastores().main def _assert_to_device_response( - self, channel: FakeChannel, expected_messages: List[JsonDict] + self, response_body: JsonDict, expected_messages: List[JsonDict] ) -> str: """Assert the sliding sync response was successful and has the expected to-device messages. Returns the next_batch token from the to-device section. """ - self.assertEqual(channel.code, 200, channel.json_body) - extensions = channel.json_body["extensions"] + extensions = response_body["extensions"] to_device = extensions["to_device"] self.assertIsInstance(to_device["next_batch"], str) self.assertEqual(to_device["events"], expected_messages) @@ -4562,22 +4951,18 @@ class SlidingSyncToDeviceExtensionTestCase(unittest.HomeserverTestCase): user1_id = self.register_user("user1", "pass") user1_tok = self.login(user1_id, "pass") - channel = self.make_request( - "POST", - self.sync_endpoint, - { - "lists": {}, - "extensions": { - "to_device": { - "enabled": True, - } - }, + sync_body = { + "lists": {}, + "extensions": { + "to_device": { + "enabled": True, + } }, - access_token=user1_tok, - ) + } + response_body, _ = self.do_sync(sync_body, tok=user1_tok) # We expect no to-device messages - self._assert_to_device_response(channel, []) + self._assert_to_device_response(response_body, []) def test_data_initial_sync(self) -> None: """Test that we get to-device messages when we don't specify a since @@ -4598,21 +4983,17 @@ class SlidingSyncToDeviceExtensionTestCase(unittest.HomeserverTestCase): ) self.assertEqual(chan.code, 200, chan.result) - channel = self.make_request( - "POST", - self.sync_endpoint, - { - "lists": {}, - "extensions": { - "to_device": { - "enabled": True, - } - }, + sync_body = { + "lists": {}, + "extensions": { + "to_device": { + "enabled": True, + } }, - access_token=user1_tok, - ) + } + response_body, _ = self.do_sync(sync_body, tok=user1_tok) self._assert_to_device_response( - channel, + response_body, [{"content": test_msg, "sender": user2_id, "type": "m.test"}], ) @@ -4624,21 +5005,17 @@ class SlidingSyncToDeviceExtensionTestCase(unittest.HomeserverTestCase): user2_id = self.register_user("u2", "pass") user2_tok = self.login(user2_id, "pass", "d2") - channel = self.make_request( - "POST", - self.sync_endpoint, - { - "lists": {}, - "extensions": { - "to_device": { - "enabled": True, - } - }, + sync_body: JsonDict = { + "lists": {}, + "extensions": { + "to_device": { + "enabled": True, + } }, - access_token=user1_tok, - ) + } + response_body, _ = self.do_sync(sync_body, tok=user1_tok) # No to-device messages yet. - next_batch = self._assert_to_device_response(channel, []) + next_batch = self._assert_to_device_response(response_body, []) test_msg = {"foo": "bar"} chan = self.make_request( @@ -4649,59 +5026,47 @@ class SlidingSyncToDeviceExtensionTestCase(unittest.HomeserverTestCase): ) self.assertEqual(chan.code, 200, chan.result) - channel = self.make_request( - "POST", - self.sync_endpoint, - { - "lists": {}, - "extensions": { - "to_device": { - "enabled": True, - "since": next_batch, - } - }, + sync_body = { + "lists": {}, + "extensions": { + "to_device": { + "enabled": True, + "since": next_batch, + } }, - access_token=user1_tok, - ) + } + response_body, _ = self.do_sync(sync_body, tok=user1_tok) next_batch = self._assert_to_device_response( - channel, + response_body, [{"content": test_msg, "sender": user2_id, "type": "m.test"}], ) # The next sliding sync request should not include the to-device # message. - channel = self.make_request( - "POST", - self.sync_endpoint, - { - "lists": {}, - "extensions": { - "to_device": { - "enabled": True, - "since": next_batch, - } - }, + sync_body = { + "lists": {}, + "extensions": { + "to_device": { + "enabled": True, + "since": next_batch, + } }, - access_token=user1_tok, - ) - self._assert_to_device_response(channel, []) + } + response_body, _ = self.do_sync(sync_body, tok=user1_tok) + self._assert_to_device_response(response_body, []) # An initial sliding sync request should not include the to-device # message, as it should have been deleted - channel = self.make_request( - "POST", - self.sync_endpoint, - { - "lists": {}, - "extensions": { - "to_device": { - "enabled": True, - } - }, + sync_body = { + "lists": {}, + "extensions": { + "to_device": { + "enabled": True, + } }, - access_token=user1_tok, - ) - self._assert_to_device_response(channel, []) + } + response_body, _ = self.do_sync(sync_body, tok=user1_tok) + self._assert_to_device_response(response_body, []) def test_wait_for_new_data(self) -> None: """ @@ -4714,22 +5079,21 @@ class SlidingSyncToDeviceExtensionTestCase(unittest.HomeserverTestCase): user2_id = self.register_user("u2", "pass") user2_tok = self.login(user2_id, "pass", "d2") - from_token = self.event_sources.get_current_token() + sync_body = { + "lists": {}, + "extensions": { + "to_device": { + "enabled": True, + } + }, + } + _, from_token = self.do_sync(sync_body, tok=user1_tok) # Make the Sliding Sync request channel = self.make_request( "POST", - self.sync_endpoint - + "?timeout=10000" - + f"&pos={self.get_success(from_token.to_string(self.store))}", - { - "lists": {}, - "extensions": { - "to_device": { - "enabled": True, - } - }, - }, + self.sync_endpoint + "?timeout=10000" + f"&pos={from_token}", + content=sync_body, access_token=user1_tok, await_result=False, ) @@ -4752,7 +5116,7 @@ class SlidingSyncToDeviceExtensionTestCase(unittest.HomeserverTestCase): self.assertEqual(channel.code, 200, channel.json_body) self._assert_to_device_response( - channel, + channel.json_body, [{"content": test_msg, "sender": user2_id, "type": "m.test"}], ) @@ -4765,22 +5129,21 @@ class SlidingSyncToDeviceExtensionTestCase(unittest.HomeserverTestCase): user1_id = self.register_user("user1", "pass") user1_tok = self.login(user1_id, "pass") - from_token = self.event_sources.get_current_token() + sync_body = { + "lists": {}, + "extensions": { + "to_device": { + "enabled": True, + } + }, + } + _, from_token = self.do_sync(sync_body, tok=user1_tok) # Make the Sliding Sync request channel = self.make_request( "POST", - self.sync_endpoint - + "?timeout=10000" - + f"&pos={self.get_success(from_token.to_string(self.store))}", - { - "lists": {}, - "extensions": { - "to_device": { - "enabled": True, - } - }, - }, + self.sync_endpoint + "?timeout=10000" + f"&pos={from_token}", + content=sync_body, access_token=user1_tok, await_result=False, ) @@ -4789,7 +5152,9 @@ class SlidingSyncToDeviceExtensionTestCase(unittest.HomeserverTestCase): channel.await_result(timeout_ms=5000) # Wake-up `notifier.wait_for_events(...)` that will cause us test # `SlidingSyncResult.__bool__` for new results. - self._bump_notifier_wait_for_events(user1_id) + self._bump_notifier_wait_for_events( + user1_id, wake_stream_key=StreamKeyType.ACCOUNT_DATA + ) # Block for a little bit more to ensure we don't see any new results. with self.assertRaises(TimedOutException): channel.await_result(timeout_ms=4000) @@ -4798,10 +5163,10 @@ class SlidingSyncToDeviceExtensionTestCase(unittest.HomeserverTestCase): channel.await_result(timeout_ms=1200) self.assertEqual(channel.code, 200, channel.json_body) - self._assert_to_device_response(channel, []) + self._assert_to_device_response(channel.json_body, []) -class SlidingSyncE2eeExtensionTestCase(unittest.HomeserverTestCase): +class SlidingSyncE2eeExtensionTestCase(SlidingSyncBase): """Tests for the e2ee sliding sync extension""" servlets = [ @@ -4812,67 +5177,9 @@ class SlidingSyncE2eeExtensionTestCase(unittest.HomeserverTestCase): devices.register_servlets, ] - def default_config(self) -> JsonDict: - config = super().default_config() - # Enable sliding sync - config["experimental_features"] = {"msc3575_enabled": True} - return config - def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.store = hs.get_datastores().main - self.event_sources = hs.get_event_sources() self.e2e_keys_handler = hs.get_e2e_keys_handler() - self.account_data_handler = hs.get_account_data_handler() - self.notifier = hs.get_notifier() - self.sync_endpoint = ( - "/_matrix/client/unstable/org.matrix.simplified_msc3575/sync" - ) - - def _bump_notifier_wait_for_events(self, user_id: str) -> None: - """ - Wake-up a `notifier.wait_for_events(user_id)` call without affecting the Sliding - Sync results. - """ - # We're expecting some new activity from this point onwards - from_token = self.event_sources.get_current_token() - - triggered_notifier_wait_for_events = False - - async def _on_new_acivity( - before_token: StreamToken, after_token: StreamToken - ) -> bool: - nonlocal triggered_notifier_wait_for_events - triggered_notifier_wait_for_events = True - return True - - # Listen for some new activity for the user. We're just trying to confirm that - # our bump below actually does what we think it does (triggers new activity for - # the user). - result_awaitable = self.notifier.wait_for_events( - user_id, - 1000, - _on_new_acivity, - from_token=from_token, - ) - - # Update the account data so that `notifier.wait_for_events(...)` wakes up. - # We're bumping account data because it won't show up in the Sliding Sync - # response so it won't affect whether we have results. - self.get_success( - self.account_data_handler.add_account_data_for_user( - user_id, - "org.matrix.foobarbaz", - {"foo": "bar"}, - ) - ) - - # Wait for our notifier result - self.get_success(result_awaitable) - - if not triggered_notifier_wait_for_events: - raise AssertionError( - "Expected `notifier.wait_for_events(...)` to be triggered" - ) def test_no_data_initial_sync(self) -> None: """ @@ -4883,27 +5190,22 @@ class SlidingSyncE2eeExtensionTestCase(unittest.HomeserverTestCase): user1_tok = self.login(user1_id, "pass") # Make an initial Sliding Sync request with the e2ee extension enabled - channel = self.make_request( - "POST", - self.sync_endpoint, - { - "lists": {}, - "extensions": { - "e2ee": { - "enabled": True, - } - }, + sync_body = { + "lists": {}, + "extensions": { + "e2ee": { + "enabled": True, + } }, - access_token=user1_tok, - ) - self.assertEqual(channel.code, 200, channel.json_body) + } + response_body, _ = self.do_sync(sync_body, tok=user1_tok) # Device list updates are only present for incremental syncs - self.assertIsNone(channel.json_body["extensions"]["e2ee"].get("device_lists")) + self.assertIsNone(response_body["extensions"]["e2ee"].get("device_lists")) # Both of these should be present even when empty self.assertEqual( - channel.json_body["extensions"]["e2ee"]["device_one_time_keys_count"], + response_body["extensions"]["e2ee"]["device_one_time_keys_count"], { # This is always present because of # https://github.com/element-hq/element-android/issues/3725 and @@ -4912,7 +5214,7 @@ class SlidingSyncE2eeExtensionTestCase(unittest.HomeserverTestCase): }, ) self.assertEqual( - channel.json_body["extensions"]["e2ee"]["device_unused_fallback_key_types"], + response_body["extensions"]["e2ee"]["device_unused_fallback_key_types"], [], ) @@ -4924,40 +5226,32 @@ class SlidingSyncE2eeExtensionTestCase(unittest.HomeserverTestCase): user1_id = self.register_user("user1", "pass") user1_tok = self.login(user1_id, "pass") - from_token = self.event_sources.get_current_token() + sync_body = { + "lists": {}, + "extensions": { + "e2ee": { + "enabled": True, + } + }, + } + _, from_token = self.do_sync(sync_body, tok=user1_tok) # Make an incremental Sliding Sync request with the e2ee extension enabled - channel = self.make_request( - "POST", - self.sync_endpoint - + f"?pos={self.get_success(from_token.to_string(self.store))}", - { - "lists": {}, - "extensions": { - "e2ee": { - "enabled": True, - } - }, - }, - access_token=user1_tok, - ) - self.assertEqual(channel.code, 200, channel.json_body) + response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) # Device list shows up for incremental syncs self.assertEqual( - channel.json_body["extensions"]["e2ee"] - .get("device_lists", {}) - .get("changed"), + response_body["extensions"]["e2ee"].get("device_lists", {}).get("changed"), [], ) self.assertEqual( - channel.json_body["extensions"]["e2ee"].get("device_lists", {}).get("left"), + response_body["extensions"]["e2ee"].get("device_lists", {}).get("left"), [], ) # Both of these should be present even when empty self.assertEqual( - channel.json_body["extensions"]["e2ee"]["device_one_time_keys_count"], + response_body["extensions"]["e2ee"]["device_one_time_keys_count"], { # Note that "signed_curve25519" is always returned in key count responses # regardless of whether we uploaded any keys for it. This is necessary until @@ -4970,7 +5264,7 @@ class SlidingSyncE2eeExtensionTestCase(unittest.HomeserverTestCase): }, ) self.assertEqual( - channel.json_body["extensions"]["e2ee"]["device_unused_fallback_key_types"], + response_body["extensions"]["e2ee"]["device_unused_fallback_key_types"], [], ) @@ -4992,22 +5286,21 @@ class SlidingSyncE2eeExtensionTestCase(unittest.HomeserverTestCase): self.helper.join(room_id, user1_id, tok=user1_tok) self.helper.join(room_id, user3_id, tok=user3_tok) - from_token = self.event_sources.get_current_token() + sync_body = { + "lists": {}, + "extensions": { + "e2ee": { + "enabled": True, + } + }, + } + _, from_token = self.do_sync(sync_body, tok=user1_tok) # Make the Sliding Sync request channel = self.make_request( "POST", - self.sync_endpoint - + "?timeout=10000" - + f"&pos={self.get_success(from_token.to_string(self.store))}", - { - "lists": {}, - "extensions": { - "e2ee": { - "enabled": True, - } - }, - }, + self.sync_endpoint + "?timeout=10000" + f"&pos={from_token}", + content=sync_body, access_token=user1_tok, await_result=False, ) @@ -5053,22 +5346,21 @@ class SlidingSyncE2eeExtensionTestCase(unittest.HomeserverTestCase): user1_id = self.register_user("user1", "pass") user1_tok = self.login(user1_id, "pass") - from_token = self.event_sources.get_current_token() + sync_body = { + "lists": {}, + "extensions": { + "e2ee": { + "enabled": True, + } + }, + } + _, from_token = self.do_sync(sync_body, tok=user1_tok) # Make the Sliding Sync request channel = self.make_request( "POST", - self.sync_endpoint - + "?timeout=10000" - + f"&pos={self.get_success(from_token.to_string(self.store))}", - { - "lists": {}, - "extensions": { - "e2ee": { - "enabled": True, - } - }, - }, + self.sync_endpoint + f"?timeout=10000&pos={from_token}", + content=sync_body, access_token=user1_tok, await_result=False, ) @@ -5077,7 +5369,9 @@ class SlidingSyncE2eeExtensionTestCase(unittest.HomeserverTestCase): channel.await_result(timeout_ms=5000) # Wake-up `notifier.wait_for_events(...)` that will cause us test # `SlidingSyncResult.__bool__` for new results. - self._bump_notifier_wait_for_events(user1_id) + self._bump_notifier_wait_for_events( + user1_id, wake_stream_key=StreamKeyType.ACCOUNT_DATA + ) # Block for a little bit more to ensure we don't see any new results. with self.assertRaises(TimedOutException): channel.await_result(timeout_ms=4000) @@ -5138,7 +5432,15 @@ class SlidingSyncE2eeExtensionTestCase(unittest.HomeserverTestCase): self.helper.join(room_id, user3_id, tok=user3_tok) self.helper.join(room_id, user4_id, tok=user4_tok) - from_token = self.event_sources.get_current_token() + sync_body = { + "lists": {}, + "extensions": { + "e2ee": { + "enabled": True, + } + }, + } + _, from_token = self.do_sync(sync_body, tok=user1_tok) # Have user3 update their device list channel = self.make_request( @@ -5155,31 +5457,15 @@ class SlidingSyncE2eeExtensionTestCase(unittest.HomeserverTestCase): self.helper.leave(room_id, user4_id, tok=user4_tok) # Make an incremental Sliding Sync request with the e2ee extension enabled - channel = self.make_request( - "POST", - self.sync_endpoint - + f"?pos={self.get_success(from_token.to_string(self.store))}", - { - "lists": {}, - "extensions": { - "e2ee": { - "enabled": True, - } - }, - }, - access_token=user1_tok, - ) - self.assertEqual(channel.code, 200, channel.json_body) + response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) # Device list updates show up self.assertEqual( - channel.json_body["extensions"]["e2ee"] - .get("device_lists", {}) - .get("changed"), + response_body["extensions"]["e2ee"].get("device_lists", {}).get("changed"), [user3_id], ) self.assertEqual( - channel.json_body["extensions"]["e2ee"].get("device_lists", {}).get("left"), + response_body["extensions"]["e2ee"].get("device_lists", {}).get("left"), [user4_id], ) @@ -5221,24 +5507,19 @@ class SlidingSyncE2eeExtensionTestCase(unittest.HomeserverTestCase): ) # Make a Sliding Sync request with the e2ee extension enabled - channel = self.make_request( - "POST", - self.sync_endpoint, - { - "lists": {}, - "extensions": { - "e2ee": { - "enabled": True, - } - }, + sync_body = { + "lists": {}, + "extensions": { + "e2ee": { + "enabled": True, + } }, - access_token=user1_tok, - ) - self.assertEqual(channel.code, 200, channel.json_body) + } + response_body, _ = self.do_sync(sync_body, tok=user1_tok) # Check for those one time key counts self.assertEqual( - channel.json_body["extensions"]["e2ee"].get("device_one_time_keys_count"), + response_body["extensions"]["e2ee"].get("device_one_time_keys_count"), { "alg1": 1, "alg2": 2, @@ -5282,25 +5563,747 @@ class SlidingSyncE2eeExtensionTestCase(unittest.HomeserverTestCase): self.assertEqual(fallback_res, ["alg1"], fallback_res) # Make a Sliding Sync request with the e2ee extension enabled + sync_body = { + "lists": {}, + "extensions": { + "e2ee": { + "enabled": True, + } + }, + } + response_body, _ = self.do_sync(sync_body, tok=user1_tok) + + # Check for the unused fallback key types + self.assertListEqual( + response_body["extensions"]["e2ee"].get("device_unused_fallback_key_types"), + ["alg1"], + ) + + +class SlidingSyncAccountDataExtensionTestCase(SlidingSyncBase): + """Tests for the account_data sliding sync extension""" + + servlets = [ + synapse.rest.admin.register_servlets, + login.register_servlets, + room.register_servlets, + sync.register_servlets, + sendtodevice.register_servlets, + ] + + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + self.store = hs.get_datastores().main + self.account_data_handler = hs.get_account_data_handler() + + def test_no_data_initial_sync(self) -> None: + """ + Test that enabling the account_data extension works during an intitial sync, + even if there is no-data. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + # Make an initial Sliding Sync request with the account_data extension enabled + sync_body = { + "lists": {}, + "extensions": { + "account_data": { + "enabled": True, + } + }, + } + response_body, _ = self.do_sync(sync_body, tok=user1_tok) + + self.assertIncludes( + { + global_event["type"] + for global_event in response_body["extensions"]["account_data"].get( + "global" + ) + }, + # Even though we don't have any global account data set, Synapse saves some + # default push rules for us. + {AccountDataTypes.PUSH_RULES}, + exact=True, + ) + self.assertIncludes( + response_body["extensions"]["account_data"].get("rooms").keys(), + set(), + exact=True, + ) + + def test_no_data_incremental_sync(self) -> None: + """ + Test that enabling account_data extension works during an incremental sync, even + if there is no-data. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + sync_body = { + "lists": {}, + "extensions": { + "account_data": { + "enabled": True, + } + }, + } + _, from_token = self.do_sync(sync_body, tok=user1_tok) + + # Make an incremental Sliding Sync request with the account_data extension enabled + response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) + + # There has been no account data changes since the `from_token` so we shouldn't + # see any account data here. + self.assertIncludes( + { + global_event["type"] + for global_event in response_body["extensions"]["account_data"].get( + "global" + ) + }, + set(), + exact=True, + ) + self.assertIncludes( + response_body["extensions"]["account_data"].get("rooms").keys(), + set(), + exact=True, + ) + + def test_global_account_data_initial_sync(self) -> None: + """ + On initial sync, we should return all global account data on initial sync. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + # Update the global account data + self.get_success( + self.account_data_handler.add_account_data_for_user( + user_id=user1_id, + account_data_type="org.matrix.foobarbaz", + content={"foo": "bar"}, + ) + ) + + # Make an initial Sliding Sync request with the account_data extension enabled + sync_body = { + "lists": {}, + "extensions": { + "account_data": { + "enabled": True, + } + }, + } + response_body, _ = self.do_sync(sync_body, tok=user1_tok) + + # It should show us all of the global account data + self.assertIncludes( + { + global_event["type"] + for global_event in response_body["extensions"]["account_data"].get( + "global" + ) + }, + {AccountDataTypes.PUSH_RULES, "org.matrix.foobarbaz"}, + exact=True, + ) + self.assertIncludes( + response_body["extensions"]["account_data"].get("rooms").keys(), + set(), + exact=True, + ) + + def test_global_account_data_incremental_sync(self) -> None: + """ + On incremental sync, we should only account data that has changed since the + `from_token`. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + # Add some global account data + self.get_success( + self.account_data_handler.add_account_data_for_user( + user_id=user1_id, + account_data_type="org.matrix.foobarbaz", + content={"foo": "bar"}, + ) + ) + + sync_body = { + "lists": {}, + "extensions": { + "account_data": { + "enabled": True, + } + }, + } + _, from_token = self.do_sync(sync_body, tok=user1_tok) + + # Add some other global account data + self.get_success( + self.account_data_handler.add_account_data_for_user( + user_id=user1_id, + account_data_type="org.matrix.doodardaz", + content={"doo": "dar"}, + ) + ) + + # Make an incremental Sliding Sync request with the account_data extension enabled + response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) + + self.assertIncludes( + { + global_event["type"] + for global_event in response_body["extensions"]["account_data"].get( + "global" + ) + }, + # We should only see the new global account data that happened after the `from_token` + {"org.matrix.doodardaz"}, + exact=True, + ) + self.assertIncludes( + response_body["extensions"]["account_data"].get("rooms").keys(), + set(), + exact=True, + ) + + def test_room_account_data_initial_sync(self) -> None: + """ + On initial sync, we return all account data for a given room but only for + rooms that we request and are being returned in the Sliding Sync response. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + # Create a room and add some room account data + room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok) + self.get_success( + self.account_data_handler.add_account_data_to_room( + user_id=user1_id, + room_id=room_id1, + account_data_type="org.matrix.roorarraz", + content={"roo": "rar"}, + ) + ) + + # Create another room with some room account data + room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok) + self.get_success( + self.account_data_handler.add_account_data_to_room( + user_id=user1_id, + room_id=room_id2, + account_data_type="org.matrix.roorarraz", + content={"roo": "rar"}, + ) + ) + + # Make an initial Sliding Sync request with the account_data extension enabled + sync_body = { + "lists": {}, + "room_subscriptions": { + room_id1: { + "required_state": [], + "timeline_limit": 0, + } + }, + "extensions": { + "account_data": { + "enabled": True, + "rooms": [room_id1, room_id2], + } + }, + } + response_body, _ = self.do_sync(sync_body, tok=user1_tok) + + self.assertIsNotNone(response_body["extensions"]["account_data"].get("global")) + # Even though we requested room2, we only expect room1 to show up because that's + # the only room in the Sliding Sync response (room2 is not one of our room + # subscriptions or in a sliding window list). + self.assertIncludes( + response_body["extensions"]["account_data"].get("rooms").keys(), + {room_id1}, + exact=True, + ) + self.assertIncludes( + { + event["type"] + for event in response_body["extensions"]["account_data"] + .get("rooms") + .get(room_id1) + }, + {"org.matrix.roorarraz"}, + exact=True, + ) + + def test_room_account_data_incremental_sync(self) -> None: + """ + On incremental sync, we return all account data for a given room but only for + rooms that we request and are being returned in the Sliding Sync response. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + # Create a room and add some room account data + room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok) + self.get_success( + self.account_data_handler.add_account_data_to_room( + user_id=user1_id, + room_id=room_id1, + account_data_type="org.matrix.roorarraz", + content={"roo": "rar"}, + ) + ) + + # Create another room with some room account data + room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok) + self.get_success( + self.account_data_handler.add_account_data_to_room( + user_id=user1_id, + room_id=room_id2, + account_data_type="org.matrix.roorarraz", + content={"roo": "rar"}, + ) + ) + + sync_body = { + "lists": {}, + "room_subscriptions": { + room_id1: { + "required_state": [], + "timeline_limit": 0, + } + }, + "extensions": { + "account_data": { + "enabled": True, + "rooms": [room_id1, room_id2], + } + }, + } + _, from_token = self.do_sync(sync_body, tok=user1_tok) + + # Add some other room account data + self.get_success( + self.account_data_handler.add_account_data_to_room( + user_id=user1_id, + room_id=room_id1, + account_data_type="org.matrix.roorarraz2", + content={"roo": "rar"}, + ) + ) + self.get_success( + self.account_data_handler.add_account_data_to_room( + user_id=user1_id, + room_id=room_id2, + account_data_type="org.matrix.roorarraz2", + content={"roo": "rar"}, + ) + ) + + # Make an incremental Sliding Sync request with the account_data extension enabled + response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) + + self.assertIsNotNone(response_body["extensions"]["account_data"].get("global")) + # Even though we requested room2, we only expect room1 to show up because that's + # the only room in the Sliding Sync response (room2 is not one of our room + # subscriptions or in a sliding window list). + self.assertIncludes( + response_body["extensions"]["account_data"].get("rooms").keys(), + {room_id1}, + exact=True, + ) + # We should only see the new room account data that happened after the `from_token` + self.assertIncludes( + { + event["type"] + for event in response_body["extensions"]["account_data"] + .get("rooms") + .get(room_id1) + }, + {"org.matrix.roorarraz2"}, + exact=True, + ) + + def test_room_account_data_relevant_rooms(self) -> None: + """ + Test out different variations of `lists`/`rooms` we are requesting account data for. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + # Create a room and add some room account data + room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok) + self.get_success( + self.account_data_handler.add_account_data_to_room( + user_id=user1_id, + room_id=room_id1, + account_data_type="org.matrix.roorarraz", + content={"roo": "rar"}, + ) + ) + + # Create another room with some room account data + room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok) + self.get_success( + self.account_data_handler.add_account_data_to_room( + user_id=user1_id, + room_id=room_id2, + account_data_type="org.matrix.roorarraz", + content={"roo": "rar"}, + ) + ) + + # Create another room with some room account data + room_id3 = self.helper.create_room_as(user1_id, tok=user1_tok) + self.get_success( + self.account_data_handler.add_account_data_to_room( + user_id=user1_id, + room_id=room_id3, + account_data_type="org.matrix.roorarraz", + content={"roo": "rar"}, + ) + ) + + # Create another room with some room account data + room_id4 = self.helper.create_room_as(user1_id, tok=user1_tok) + self.get_success( + self.account_data_handler.add_account_data_to_room( + user_id=user1_id, + room_id=room_id4, + account_data_type="org.matrix.roorarraz", + content={"roo": "rar"}, + ) + ) + + # Create another room with some room account data + room_id5 = self.helper.create_room_as(user1_id, tok=user1_tok) + self.get_success( + self.account_data_handler.add_account_data_to_room( + user_id=user1_id, + room_id=room_id5, + account_data_type="org.matrix.roorarraz", + content={"roo": "rar"}, + ) + ) + + room_id_to_human_name_map = { + room_id1: "room1", + room_id2: "room2", + room_id3: "room3", + room_id4: "room4", + room_id5: "room5", + } + + # Mix lists and rooms + sync_body = { + "lists": { + # We expect this list range to include room5 and room4 + "foo-list": { + "ranges": [[0, 1]], + "required_state": [], + "timeline_limit": 0, + }, + # We expect this list range to include room5, room4, room3 + "bar-list": { + "ranges": [[0, 2]], + "required_state": [], + "timeline_limit": 0, + }, + }, + "room_subscriptions": { + room_id1: { + "required_state": [], + "timeline_limit": 0, + } + }, + "extensions": { + "account_data": { + "enabled": True, + "lists": ["foo-list", "non-existent-list"], + "rooms": [room_id1, room_id2, "!non-existent-room"], + } + }, + } + response_body, _ = self.do_sync(sync_body, tok=user1_tok) + + # room1: ✅ Requested via `rooms` and a room subscription exists + # room2: ❌ Requested via `rooms` but not in the response (from lists or room subscriptions) + # room3: ❌ Not requested + # room4: ✅ Shows up because requested via `lists` and list exists in the response + # room5: ✅ Shows up because requested via `lists` and list exists in the response + self.assertIncludes( + { + room_id_to_human_name_map[room_id] + for room_id in response_body["extensions"]["account_data"] + .get("rooms") + .keys() + }, + {"room1", "room4", "room5"}, + exact=True, + ) + + # Try wildcards (this is the default) + sync_body = { + "lists": { + # We expect this list range to include room5 and room4 + "foo-list": { + "ranges": [[0, 1]], + "required_state": [], + "timeline_limit": 0, + }, + # We expect this list range to include room5, room4, room3 + "bar-list": { + "ranges": [[0, 2]], + "required_state": [], + "timeline_limit": 0, + }, + }, + "room_subscriptions": { + room_id1: { + "required_state": [], + "timeline_limit": 0, + } + }, + "extensions": { + "account_data": { + "enabled": True, + # "lists": ["*"], + # "rooms": ["*"], + } + }, + } + response_body, _ = self.do_sync(sync_body, tok=user1_tok) + + # room1: ✅ Shows up because of default `rooms` wildcard and is in one of the room subscriptions + # room2: ❌ Not requested + # room3: ✅ Shows up because of default `lists` wildcard and is in a list + # room4: ✅ Shows up because of default `lists` wildcard and is in a list + # room5: ✅ Shows up because of default `lists` wildcard and is in a list + self.assertIncludes( + { + room_id_to_human_name_map[room_id] + for room_id in response_body["extensions"]["account_data"] + .get("rooms") + .keys() + }, + {"room1", "room3", "room4", "room5"}, + exact=True, + ) + + # Empty list will return nothing + sync_body = { + "lists": { + # We expect this list range to include room5 and room4 + "foo-list": { + "ranges": [[0, 1]], + "required_state": [], + "timeline_limit": 0, + }, + # We expect this list range to include room5, room4, room3 + "bar-list": { + "ranges": [[0, 2]], + "required_state": [], + "timeline_limit": 0, + }, + }, + "room_subscriptions": { + room_id1: { + "required_state": [], + "timeline_limit": 0, + } + }, + "extensions": { + "account_data": { + "enabled": True, + "lists": [], + "rooms": [], + } + }, + } + response_body, _ = self.do_sync(sync_body, tok=user1_tok) + + # room1: ❌ Not requested + # room2: ❌ Not requested + # room3: ❌ Not requested + # room4: ❌ Not requested + # room5: ❌ Not requested + self.assertIncludes( + { + room_id_to_human_name_map[room_id] + for room_id in response_body["extensions"]["account_data"] + .get("rooms") + .keys() + }, + set(), + exact=True, + ) + + # Try wildcard and none + sync_body = { + "lists": { + # We expect this list range to include room5 and room4 + "foo-list": { + "ranges": [[0, 1]], + "required_state": [], + "timeline_limit": 0, + }, + # We expect this list range to include room5, room4, room3 + "bar-list": { + "ranges": [[0, 2]], + "required_state": [], + "timeline_limit": 0, + }, + }, + "room_subscriptions": { + room_id1: { + "required_state": [], + "timeline_limit": 0, + } + }, + "extensions": { + "account_data": { + "enabled": True, + "lists": ["*"], + "rooms": [], + } + }, + } + response_body, _ = self.do_sync(sync_body, tok=user1_tok) + + # room1: ❌ Not requested + # room2: ❌ Not requested + # room3: ✅ Shows up because of default `lists` wildcard and is in a list + # room4: ✅ Shows up because of default `lists` wildcard and is in a list + # room5: ✅ Shows up because of default `lists` wildcard and is in a list + self.assertIncludes( + { + room_id_to_human_name_map[room_id] + for room_id in response_body["extensions"]["account_data"] + .get("rooms") + .keys() + }, + {"room3", "room4", "room5"}, + exact=True, + ) + + def test_wait_for_new_data(self) -> None: + """ + Test to make sure that the Sliding Sync request waits for new data to arrive. + + (Only applies to incremental syncs with a `timeout` specified) + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + room_id = self.helper.create_room_as(user2_id, tok=user2_tok) + self.helper.join(room_id, user1_id, tok=user1_tok) + + sync_body = { + "lists": {}, + "extensions": { + "account_data": { + "enabled": True, + } + }, + } + _, from_token = self.do_sync(sync_body, tok=user1_tok) + + # Make an incremental Sliding Sync request with the account_data extension enabled channel = self.make_request( "POST", - self.sync_endpoint, + self.sync_endpoint + f"?timeout=10000&pos={from_token}", + content=sync_body, + access_token=user1_tok, + await_result=False, + ) + # Block for 5 seconds to make sure we are `notifier.wait_for_events(...)` + with self.assertRaises(TimedOutException): + channel.await_result(timeout_ms=5000) + # Bump the global account data to trigger new results + self.get_success( + self.account_data_handler.add_account_data_for_user( + user1_id, + "org.matrix.foobarbaz", + {"foo": "bar"}, + ) + ) + # Should respond before the 10 second timeout + channel.await_result(timeout_ms=3000) + self.assertEqual(channel.code, 200, channel.json_body) + + # We should see the global account data update + self.assertIncludes( { - "lists": {}, - "extensions": { - "e2ee": { - "enabled": True, - } - }, + global_event["type"] + for global_event in channel.json_body["extensions"]["account_data"].get( + "global" + ) }, + {"org.matrix.foobarbaz"}, + exact=True, + ) + self.assertIncludes( + channel.json_body["extensions"]["account_data"].get("rooms").keys(), + set(), + exact=True, + ) + + def test_wait_for_new_data_timeout(self) -> None: + """ + Test to make sure that the Sliding Sync request waits for new data to arrive but + no data ever arrives so we timeout. We're also making sure that the default data + from the account_data extension doesn't trigger a false-positive for new data. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + sync_body = { + "lists": {}, + "extensions": { + "account_data": { + "enabled": True, + } + }, + } + _, from_token = self.do_sync(sync_body, tok=user1_tok) + + # Make the Sliding Sync request + channel = self.make_request( + "POST", + self.sync_endpoint + f"?timeout=10000&pos={from_token}", + content=sync_body, access_token=user1_tok, + await_result=False, ) + # Block for 5 seconds to make sure we are `notifier.wait_for_events(...)` + with self.assertRaises(TimedOutException): + channel.await_result(timeout_ms=5000) + # Wake-up `notifier.wait_for_events(...)` that will cause us test + # `SlidingSyncResult.__bool__` for new results. + self._bump_notifier_wait_for_events( + user1_id, + # We choose `StreamKeyType.PRESENCE` because we're testing for account data + # and don't want to contaminate the account data results using + # `StreamKeyType.ACCOUNT_DATA`. + wake_stream_key=StreamKeyType.PRESENCE, + ) + # Block for a little bit more to ensure we don't see any new results. + with self.assertRaises(TimedOutException): + channel.await_result(timeout_ms=4000) + # Wait for the sync to complete (wait for the rest of the 10 second timeout, + # 5000 + 4000 + 1200 > 10000) + channel.await_result(timeout_ms=1200) self.assertEqual(channel.code, 200, channel.json_body) - # Check for the unused fallback key types - self.assertListEqual( - channel.json_body["extensions"]["e2ee"].get( - "device_unused_fallback_key_types" - ), - ["alg1"], + self.assertIsNotNone( + channel.json_body["extensions"]["account_data"].get("global") + ) + self.assertIsNotNone( + channel.json_body["extensions"]["account_data"].get("rooms") ) |