diff --git a/synapse/replication/http/devices.py b/synapse/replication/http/devices.py
index 7c4941c3d3..ea5c08e6cf 100644
--- a/synapse/replication/http/devices.py
+++ b/synapse/replication/http/devices.py
@@ -13,12 +13,13 @@
# limitations under the License.
import logging
-from typing import TYPE_CHECKING, Optional, Tuple
+from typing import TYPE_CHECKING, Dict, List, Optional, Tuple
from twisted.web.server import Request
from synapse.http.server import HttpServer
from synapse.http.servlet import parse_json_object_from_request
+from synapse.logging.opentracing import active_span
from synapse.replication.http._base import ReplicationEndpoint
from synapse.types import JsonDict
@@ -84,6 +85,76 @@ class ReplicationUserDevicesResyncRestServlet(ReplicationEndpoint):
return 200, user_devices
+class ReplicationMultiUserDevicesResyncRestServlet(ReplicationEndpoint):
+ """Ask master to resync the device list for multiple users from the same
+ remote server by contacting their server.
+
+ This must happen on master so that the results can be correctly cached in
+ the database and streamed to workers.
+
+ Request format:
+
+ POST /_synapse/replication/multi_user_device_resync
+
+ {
+ "user_ids": ["@alice:example.org", "@bob:example.org", ...]
+ }
+
+ Response is roughly equivalent to ` /_matrix/federation/v1/user/devices/:user_id`
+ response, but there is a map from user ID to response, e.g.:
+
+ {
+ "@alice:example.org": {
+ "devices": [
+ {
+ "device_id": "JLAFKJWSCS",
+ "keys": { ... },
+ "device_display_name": "Alice's Mobile Phone"
+ }
+ ]
+ },
+ ...
+ }
+ """
+
+ NAME = "multi_user_device_resync"
+ PATH_ARGS = ()
+ CACHE = False
+
+ def __init__(self, hs: "HomeServer"):
+ super().__init__(hs)
+
+ from synapse.handlers.device import DeviceHandler
+
+ handler = hs.get_device_handler()
+ assert isinstance(handler, DeviceHandler)
+ self.device_list_updater = handler.device_list_updater
+
+ self.store = hs.get_datastores().main
+ self.clock = hs.get_clock()
+
+ @staticmethod
+ async def _serialize_payload(user_ids: List[str]) -> JsonDict: # type: ignore[override]
+ return {"user_ids": user_ids}
+
+ async def _handle_request( # type: ignore[override]
+ self, request: Request
+ ) -> Tuple[int, Dict[str, Optional[JsonDict]]]:
+ content = parse_json_object_from_request(request)
+ user_ids: List[str] = content["user_ids"]
+
+ logger.info("Resync for %r", user_ids)
+ span = active_span()
+ if span:
+ span.set_tag("user_ids", f"{user_ids!r}")
+
+ multi_user_devices = await self.device_list_updater.multi_user_device_resync(
+ user_ids
+ )
+
+ return 200, multi_user_devices
+
+
class ReplicationUploadKeysForUserRestServlet(ReplicationEndpoint):
"""Ask master to upload keys for the user and send them out over federation to
update other servers.
@@ -151,4 +222,5 @@ class ReplicationUploadKeysForUserRestServlet(ReplicationEndpoint):
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
ReplicationUserDevicesResyncRestServlet(hs).register(http_server)
+ ReplicationMultiUserDevicesResyncRestServlet(hs).register(http_server)
ReplicationUploadKeysForUserRestServlet(hs).register(http_server)
|