summary refs log tree commit diff
path: root/synapse/replication
diff options
context:
space:
mode:
authorreivilibre <oliverw@matrix.org>2023-01-10 11:17:59 +0000
committerGitHub <noreply@github.com>2023-01-10 11:17:59 +0000
commitba4ea7d13ffae53644b206222af95a5171faa27c (patch)
tree7867aabc7a90d7ad1b539c015db7115d50af1d8c /synapse/replication
parentAdd missing worker settings to shared configuration (#14748) (diff)
downloadsynapse-ba4ea7d13ffae53644b206222af95a5171faa27c.tar.xz
Batch up replication requests to request the resyncing of remote users's devices. (#14716)
Diffstat (limited to 'synapse/replication')
-rw-r--r--synapse/replication/http/devices.py74
1 files changed, 73 insertions, 1 deletions
diff --git a/synapse/replication/http/devices.py b/synapse/replication/http/devices.py
index 7c4941c3d3..ea5c08e6cf 100644
--- a/synapse/replication/http/devices.py
+++ b/synapse/replication/http/devices.py
@@ -13,12 +13,13 @@
 # limitations under the License.
 
 import logging
-from typing import TYPE_CHECKING, Optional, Tuple
+from typing import TYPE_CHECKING, Dict, List, Optional, Tuple
 
 from twisted.web.server import Request
 
 from synapse.http.server import HttpServer
 from synapse.http.servlet import parse_json_object_from_request
+from synapse.logging.opentracing import active_span
 from synapse.replication.http._base import ReplicationEndpoint
 from synapse.types import JsonDict
 
@@ -84,6 +85,76 @@ class ReplicationUserDevicesResyncRestServlet(ReplicationEndpoint):
         return 200, user_devices
 
 
+class ReplicationMultiUserDevicesResyncRestServlet(ReplicationEndpoint):
+    """Ask master to resync the device list for multiple users from the same
+    remote server by contacting their server.
+
+    This must happen on master so that the results can be correctly cached in
+    the database and streamed to workers.
+
+    Request format:
+
+        POST /_synapse/replication/multi_user_device_resync
+
+        {
+            "user_ids": ["@alice:example.org", "@bob:example.org", ...]
+        }
+
+    Response is roughly equivalent to ` /_matrix/federation/v1/user/devices/:user_id`
+    response, but there is a map from user ID to response, e.g.:
+
+        {
+            "@alice:example.org": {
+                "devices": [
+                    {
+                        "device_id": "JLAFKJWSCS",
+                        "keys": { ... },
+                        "device_display_name": "Alice's Mobile Phone"
+                    }
+                ]
+            },
+            ...
+        }
+    """
+
+    NAME = "multi_user_device_resync"
+    PATH_ARGS = ()
+    CACHE = False
+
+    def __init__(self, hs: "HomeServer"):
+        super().__init__(hs)
+
+        from synapse.handlers.device import DeviceHandler
+
+        handler = hs.get_device_handler()
+        assert isinstance(handler, DeviceHandler)
+        self.device_list_updater = handler.device_list_updater
+
+        self.store = hs.get_datastores().main
+        self.clock = hs.get_clock()
+
+    @staticmethod
+    async def _serialize_payload(user_ids: List[str]) -> JsonDict:  # type: ignore[override]
+        return {"user_ids": user_ids}
+
+    async def _handle_request(  # type: ignore[override]
+        self, request: Request
+    ) -> Tuple[int, Dict[str, Optional[JsonDict]]]:
+        content = parse_json_object_from_request(request)
+        user_ids: List[str] = content["user_ids"]
+
+        logger.info("Resync for %r", user_ids)
+        span = active_span()
+        if span:
+            span.set_tag("user_ids", f"{user_ids!r}")
+
+        multi_user_devices = await self.device_list_updater.multi_user_device_resync(
+            user_ids
+        )
+
+        return 200, multi_user_devices
+
+
 class ReplicationUploadKeysForUserRestServlet(ReplicationEndpoint):
     """Ask master to upload keys for the user and send them out over federation to
     update other servers.
@@ -151,4 +222,5 @@ class ReplicationUploadKeysForUserRestServlet(ReplicationEndpoint):
 
 def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
     ReplicationUserDevicesResyncRestServlet(hs).register(http_server)
+    ReplicationMultiUserDevicesResyncRestServlet(hs).register(http_server)
     ReplicationUploadKeysForUserRestServlet(hs).register(http_server)