diff options
author | Mathieu Velten <mathieuv@matrix.org> | 2022-07-28 12:25:11 +0200 |
---|---|---|
committer | Mathieu Velten <mathieuv@matrix.org> | 2022-08-02 15:08:12 +0200 |
commit | dfca02b6deb1cc29859c370efecab4fcf2179fd4 (patch) | |
tree | 7c3e4cc8fcacc97c0f73567a869c0b999446791e | |
parent | Fix error when out of servers to sync partial state with (#13432) (diff) | |
download | synapse-dfca02b6deb1cc29859c370efecab4fcf2179fd4.tar.xz |
Add do_not_use_to_device_limit to sync filter
-rw-r--r-- | changelog.d/13412.feature | 1 | ||||
-rw-r--r-- | synapse/api/filtering.py | 10 | ||||
-rw-r--r-- | synapse/config/experimental.py | 6 | ||||
-rw-r--r-- | synapse/handlers/sync.py | 6 | ||||
-rw-r--r-- | synapse/storage/databases/main/deviceinbox.py | 7 | ||||
-rw-r--r-- | tests/rest/client/test_sync.py | 127 |
6 files changed, 151 insertions, 6 deletions
diff --git a/changelog.d/13412.feature b/changelog.d/13412.feature new file mode 100644 index 0000000000..6efc2678e0 --- /dev/null +++ b/changelog.d/13412.feature @@ -0,0 +1 @@ +Add do_not_use_to_device_limit to sync filter. This is an experiment to see if it improves client behavior. diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py index b007147519..b38f1abf63 100644 --- a/synapse/api/filtering.py +++ b/synapse/api/filtering.py @@ -134,6 +134,9 @@ USER_FILTER_SCHEMA = { "pattern": r"^((?!\\\\).)*$", }, }, + # This is an experiment, a MSC will follow if it happens to be useful + # for clients sync performance + "do_not_use_to_device_limit": {"type": "number"}, }, "additionalProperties": False, } @@ -219,6 +222,13 @@ class FilterCollection: self.event_fields = filter_json.get("event_fields", []) self.event_format = filter_json.get("event_format", "client") + self.to_device_limit = 100 + if hs.config.experimental.to_device_limit_enabled: + self.to_device_limit = filter_json.get("do_not_use_to_device_limit", 100) + # We don't want to overload the server so let's keep the limit under a thousand + if self.to_device_limit > 1000: + self.to_device_limit = 1000 + def __repr__(self) -> str: return "<FilterCollection %s>" % (json.dumps(self._filter_json),) diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py index c2ecd977cd..06f2e3f121 100644 --- a/synapse/config/experimental.py +++ b/synapse/config/experimental.py @@ -90,3 +90,9 @@ class ExperimentalConfig(Config): # MSC3848: Introduce errcodes for specific event sending failures self.msc3848_enabled: bool = experimental.get("msc3848_enabled", False) + + # Experimental feature to optimize client sync performance + # Will become a proper MSC if it appears to be useful + self.to_device_limit_enabled: bool = experimental.get( + "to_device_limit_enabled", False + ) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index d42a414c90..142068f366 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1329,7 +1329,11 @@ class SyncHandler: if device_id is not None and since_stream_id != int(now_token.to_device_key): messages, stream_id = await self.store.get_messages_for_device( - user_id, device_id, since_stream_id, now_token.to_device_key + user_id, + device_id, + since_stream_id, + now_token.to_device_key, + sync_result_builder.sync_config.filter_collection.to_device_limit, ) for message in messages: diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 73c95ffb6f..0bd7ceefeb 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -229,10 +229,6 @@ class DeviceInboxWorkerStore(SQLBaseStore): limit=limit, ) - if not user_id_device_id_to_messages: - # There were no messages! - return [], to_stream_id - # Extract the messages, no need to return the user and device ID again to_device_messages = user_id_device_id_to_messages.get((user_id, device_id), []) @@ -297,6 +293,9 @@ class DeviceInboxWorkerStore(SQLBaseStore): "without a specific user_id/device_id" ) + if limit == 0: + return {}, from_stream_id + user_ids_to_query: Set[str] = set() device_ids_to_query: Set[str] = set() diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py index b085c50356..c3c0ac3849 100644 --- a/tests/rest/client/test_sync.py +++ b/tests/rest/client/test_sync.py @@ -28,7 +28,16 @@ from synapse.api.constants import ( ReceiptTypes, RelationTypes, ) -from synapse.rest.client import devices, knock, login, read_marker, receipts, room, sync +from synapse.rest.client import ( + devices, + knock, + login, + read_marker, + receipts, + room, + sendtodevice, + sync, +) from synapse.server import HomeServer from synapse.types import JsonDict from synapse.util import Clock @@ -948,3 +957,119 @@ class ExcludeRoomTestCase(unittest.HomeserverTestCase): self.assertNotIn(self.excluded_room_id, channel.json_body["rooms"]["invite"]) self.assertIn(self.included_room_id, channel.json_body["rooms"]["invite"]) + + +class ToDeviceLimitTestCase(unittest.HomeserverTestCase): + servlets = [ + synapse.rest.admin.register_servlets, + login.register_servlets, + sendtodevice.register_servlets, + sync.register_servlets, + ] + + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + self.next_batch = "s0" + + # Register the first user (used to check the received to_device messages). + self.user_id = self.register_user("kermit", "monkey") + self.tok = self.login("kermit", "monkey") + + # Register the second user (used to send to_device messages to user1device). + self.user2 = self.register_user("kermit2", "monkey") + self.tok2 = self.login("kermit2", "monkey") + + self.tx_id = 0 + + # This will send one to_device message from kermit device to all kermit2 devices + def _send_to_device(self) -> None: + self.tx_id += 1 + chan = self.make_request( + "PUT", + f"/_matrix/client/v3/sendToDevice/m.test/{self.tx_id}", + content={"messages": {self.user_id: {"*": {"tx_id": self.tx_id}}}}, + access_token=self.tok2, + ) + self.assertEqual(chan.code, 200, chan.result) + + # This does an incremental sync for user kermit with do_not_use_to_device_limit + # setted and check the number of returned to_device msgs against + # expected_to_device_msgs value + def _limited_sync_and_check( + self, to_device_limit: int, expected_to_device_msgs: int + ) -> None: + channel = self.make_request( + "GET", + f'/sync?since={self.next_batch}&filter={{"do_not_use_to_device_limit": {to_device_limit}}}', + access_token=self.tok, + ) + self.assertEqual(channel.code, 200) + self.next_batch = channel.json_body["next_batch"] + + if expected_to_device_msgs > 0: + self.assertIn("to_device", channel.json_body) + self.assertIn("events", channel.json_body["to_device"]) + self.assertEqual( + expected_to_device_msgs, len(channel.json_body["to_device"]["events"]) + ) + + def test_to_device(self) -> None: + """Tests that to_device messages are correctly flowing to sync, + and that to_device_limit is ignored when the experimetal feature is not enabled. + """ + channel = self.make_request( + "GET", + "/sync", + access_token=self.tok, + ) + self.assertEqual(channel.code, 200) + self.next_batch = channel.json_body["next_batch"] + + for _ in range(4): + self._send_to_device() + + # 100 is the default limit, we should get back our 4 messages + self._limited_sync_and_check(100, 4) + + for _ in range(4): + self._send_to_device() + + # limit of 3 is setted but the experimental feature is not enabled, + # so we are still expecting 4 messages + self._limited_sync_and_check(3, 4) + + @override_config( + { + "experimental_features": { + "to_device_limit_enabled": True, + } + } + ) + def test_to_device_limit(self) -> None: + """Tests that to_device messages are correctly batched in incremental syncs + according to the specified to_device_limit. The limit can change between sync calls. + """ + channel = self.make_request( + "GET", + "/sync", + access_token=self.tok, + ) + self.assertEqual(channel.code, 200) + self.next_batch = channel.json_body["next_batch"] + + for _ in range(8): + self._send_to_device() + + self._limited_sync_and_check(3, 3) + self._limited_sync_and_check(4, 4) + self._limited_sync_and_check(0, 0) + self._limited_sync_and_check(3, 1) + + self._limited_sync_and_check(3, 0) + + for _ in range(1100): + self._send_to_device() + + # This tests the hardcoded 1000 limit used to avoid + # overloading a server + self._limited_sync_and_check(2000, 1000) + self._limited_sync_and_check(2000, 100) |