summary refs log tree commit diff
diff options
context:
space:
mode:
authorMathieu Velten <mathieuv@matrix.org>2022-07-28 12:25:11 +0200
committerMathieu Velten <mathieuv@matrix.org>2022-08-02 15:08:12 +0200
commitdfca02b6deb1cc29859c370efecab4fcf2179fd4 (patch)
tree7c3e4cc8fcacc97c0f73567a869c0b999446791e
parentFix error when out of servers to sync partial state with (#13432) (diff)
downloadsynapse-dfca02b6deb1cc29859c370efecab4fcf2179fd4.tar.xz
Add do_not_use_to_device_limit to sync filter
-rw-r--r--changelog.d/13412.feature1
-rw-r--r--synapse/api/filtering.py10
-rw-r--r--synapse/config/experimental.py6
-rw-r--r--synapse/handlers/sync.py6
-rw-r--r--synapse/storage/databases/main/deviceinbox.py7
-rw-r--r--tests/rest/client/test_sync.py127
6 files changed, 151 insertions, 6 deletions
diff --git a/changelog.d/13412.feature b/changelog.d/13412.feature
new file mode 100644
index 0000000000..6efc2678e0
--- /dev/null
+++ b/changelog.d/13412.feature
@@ -0,0 +1 @@
+Add do_not_use_to_device_limit to sync filter. This is an experiment to see if it improves client behavior.
diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py
index b007147519..b38f1abf63 100644
--- a/synapse/api/filtering.py
+++ b/synapse/api/filtering.py
@@ -134,6 +134,9 @@ USER_FILTER_SCHEMA = {
                 "pattern": r"^((?!\\\\).)*$",
             },
         },
+        # This is an experiment, a MSC will follow if it happens to be useful
+        # for clients sync performance
+        "do_not_use_to_device_limit": {"type": "number"},
     },
     "additionalProperties": False,
 }
@@ -219,6 +222,13 @@ class FilterCollection:
         self.event_fields = filter_json.get("event_fields", [])
         self.event_format = filter_json.get("event_format", "client")
 
+        self.to_device_limit = 100
+        if hs.config.experimental.to_device_limit_enabled:
+            self.to_device_limit = filter_json.get("do_not_use_to_device_limit", 100)
+            # We don't want to overload the server so let's keep the limit under a thousand
+            if self.to_device_limit > 1000:
+                self.to_device_limit = 1000
+
     def __repr__(self) -> str:
         return "<FilterCollection %s>" % (json.dumps(self._filter_json),)
 
diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py
index c2ecd977cd..06f2e3f121 100644
--- a/synapse/config/experimental.py
+++ b/synapse/config/experimental.py
@@ -90,3 +90,9 @@ class ExperimentalConfig(Config):
 
         # MSC3848: Introduce errcodes for specific event sending failures
         self.msc3848_enabled: bool = experimental.get("msc3848_enabled", False)
+
+        # Experimental feature to optimize client sync performance
+        # Will become a proper MSC if it appears to be useful
+        self.to_device_limit_enabled: bool = experimental.get(
+            "to_device_limit_enabled", False
+        )
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index d42a414c90..142068f366 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -1329,7 +1329,11 @@ class SyncHandler:
 
         if device_id is not None and since_stream_id != int(now_token.to_device_key):
             messages, stream_id = await self.store.get_messages_for_device(
-                user_id, device_id, since_stream_id, now_token.to_device_key
+                user_id,
+                device_id,
+                since_stream_id,
+                now_token.to_device_key,
+                sync_result_builder.sync_config.filter_collection.to_device_limit,
             )
 
             for message in messages:
diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py
index 73c95ffb6f..0bd7ceefeb 100644
--- a/synapse/storage/databases/main/deviceinbox.py
+++ b/synapse/storage/databases/main/deviceinbox.py
@@ -229,10 +229,6 @@ class DeviceInboxWorkerStore(SQLBaseStore):
             limit=limit,
         )
 
-        if not user_id_device_id_to_messages:
-            # There were no messages!
-            return [], to_stream_id
-
         # Extract the messages, no need to return the user and device ID again
         to_device_messages = user_id_device_id_to_messages.get((user_id, device_id), [])
 
@@ -297,6 +293,9 @@ class DeviceInboxWorkerStore(SQLBaseStore):
                 "without a specific user_id/device_id"
             )
 
+        if limit == 0:
+            return {}, from_stream_id
+
         user_ids_to_query: Set[str] = set()
         device_ids_to_query: Set[str] = set()
 
diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py
index b085c50356..c3c0ac3849 100644
--- a/tests/rest/client/test_sync.py
+++ b/tests/rest/client/test_sync.py
@@ -28,7 +28,16 @@ from synapse.api.constants import (
     ReceiptTypes,
     RelationTypes,
 )
-from synapse.rest.client import devices, knock, login, read_marker, receipts, room, sync
+from synapse.rest.client import (
+    devices,
+    knock,
+    login,
+    read_marker,
+    receipts,
+    room,
+    sendtodevice,
+    sync,
+)
 from synapse.server import HomeServer
 from synapse.types import JsonDict
 from synapse.util import Clock
@@ -948,3 +957,119 @@ class ExcludeRoomTestCase(unittest.HomeserverTestCase):
 
         self.assertNotIn(self.excluded_room_id, channel.json_body["rooms"]["invite"])
         self.assertIn(self.included_room_id, channel.json_body["rooms"]["invite"])
+
+
+class ToDeviceLimitTestCase(unittest.HomeserverTestCase):
+    servlets = [
+        synapse.rest.admin.register_servlets,
+        login.register_servlets,
+        sendtodevice.register_servlets,
+        sync.register_servlets,
+    ]
+
+    def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
+        self.next_batch = "s0"
+
+        # Register the first user (used to check the received to_device messages).
+        self.user_id = self.register_user("kermit", "monkey")
+        self.tok = self.login("kermit", "monkey")
+
+        # Register the second user (used to send to_device messages to user1device).
+        self.user2 = self.register_user("kermit2", "monkey")
+        self.tok2 = self.login("kermit2", "monkey")
+
+        self.tx_id = 0
+
+    # This will send one to_device message from kermit device to all kermit2 devices
+    def _send_to_device(self) -> None:
+        self.tx_id += 1
+        chan = self.make_request(
+            "PUT",
+            f"/_matrix/client/v3/sendToDevice/m.test/{self.tx_id}",
+            content={"messages": {self.user_id: {"*": {"tx_id": self.tx_id}}}},
+            access_token=self.tok2,
+        )
+        self.assertEqual(chan.code, 200, chan.result)
+
+    # This does an incremental sync for user kermit with do_not_use_to_device_limit
+    # setted and check the number of returned to_device msgs against
+    # expected_to_device_msgs value
+    def _limited_sync_and_check(
+        self, to_device_limit: int, expected_to_device_msgs: int
+    ) -> None:
+        channel = self.make_request(
+            "GET",
+            f'/sync?since={self.next_batch}&filter={{"do_not_use_to_device_limit": {to_device_limit}}}',
+            access_token=self.tok,
+        )
+        self.assertEqual(channel.code, 200)
+        self.next_batch = channel.json_body["next_batch"]
+
+        if expected_to_device_msgs > 0:
+            self.assertIn("to_device", channel.json_body)
+            self.assertIn("events", channel.json_body["to_device"])
+            self.assertEqual(
+                expected_to_device_msgs, len(channel.json_body["to_device"]["events"])
+            )
+
+    def test_to_device(self) -> None:
+        """Tests that to_device messages are correctly flowing to sync,
+        and that to_device_limit is ignored when the experimetal feature is not enabled.
+        """
+        channel = self.make_request(
+            "GET",
+            "/sync",
+            access_token=self.tok,
+        )
+        self.assertEqual(channel.code, 200)
+        self.next_batch = channel.json_body["next_batch"]
+
+        for _ in range(4):
+            self._send_to_device()
+
+        # 100 is the default limit, we should get back our 4 messages
+        self._limited_sync_and_check(100, 4)
+
+        for _ in range(4):
+            self._send_to_device()
+
+        # limit of 3 is setted but the experimental feature is not enabled,
+        # so we are still expecting 4 messages
+        self._limited_sync_and_check(3, 4)
+
+    @override_config(
+        {
+            "experimental_features": {
+                "to_device_limit_enabled": True,
+            }
+        }
+    )
+    def test_to_device_limit(self) -> None:
+        """Tests that to_device messages are correctly batched in incremental syncs
+        according to the specified to_device_limit. The limit can change between sync calls.
+        """
+        channel = self.make_request(
+            "GET",
+            "/sync",
+            access_token=self.tok,
+        )
+        self.assertEqual(channel.code, 200)
+        self.next_batch = channel.json_body["next_batch"]
+
+        for _ in range(8):
+            self._send_to_device()
+
+        self._limited_sync_and_check(3, 3)
+        self._limited_sync_and_check(4, 4)
+        self._limited_sync_and_check(0, 0)
+        self._limited_sync_and_check(3, 1)
+
+        self._limited_sync_and_check(3, 0)
+
+        for _ in range(1100):
+            self._send_to_device()
+
+        # This tests the hardcoded 1000 limit used to avoid
+        # overloading a server
+        self._limited_sync_and_check(2000, 1000)
+        self._limited_sync_and_check(2000, 100)