summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2019-11-26 12:56:05 +0000
committerGitHub <noreply@github.com>2019-11-26 12:56:05 +0000
commitf9f1c8acbb04ff1623ca981e76bee2d124c54858 (patch)
tree965aaa961d87a4dadc838b41f8cfa574e239bcc1 /synapse
parentMake sure that we close cursors before returning from a query (#6408) (diff)
parentFixup docs (diff)
downloadsynapse-f9f1c8acbb04ff1623ca981e76bee2d124c54858.tar.xz
Merge pull request #6332 from matrix-org/erikj/query_devices_fix
Fix caching devices for remote servers in worker.
Diffstat (limited to 'synapse')
-rw-r--r--synapse/handlers/e2e_keys.py19
-rw-r--r--synapse/replication/http/__init__.py10
-rw-r--r--synapse/replication/http/devices.py73
3 files changed, 98 insertions, 4 deletions
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index f09a0b73c8..28c12753c1 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -30,6 +30,7 @@ from twisted.internet import defer
 from synapse.api.errors import CodeMessageException, Codes, NotFoundError, SynapseError
 from synapse.logging.context import make_deferred_yieldable, run_in_background
 from synapse.logging.opentracing import log_kv, set_tag, tag_args, trace
+from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
 from synapse.types import (
     UserID,
     get_domain_from_id,
@@ -53,6 +54,12 @@ class E2eKeysHandler(object):
 
         self._edu_updater = SigningKeyEduUpdater(hs, self)
 
+        self._is_master = hs.config.worker_app is None
+        if not self._is_master:
+            self._user_device_resync_client = ReplicationUserDevicesResyncRestServlet.make_client(
+                hs
+            )
+
         federation_registry = hs.get_federation_registry()
 
         # FIXME: switch to m.signing_key_update when MSC1756 is merged into the spec
@@ -191,9 +198,15 @@ class E2eKeysHandler(object):
                 # probably be tracking their device lists. However, we haven't
                 # done an initial sync on the device list so we do it now.
                 try:
-                    user_devices = yield self.device_handler.device_list_updater.user_device_resync(
-                        user_id
-                    )
+                    if self._is_master:
+                        user_devices = yield self.device_handler.device_list_updater.user_device_resync(
+                            user_id
+                        )
+                    else:
+                        user_devices = yield self._user_device_resync_client(
+                            user_id=user_id
+                        )
+
                     user_devices = user_devices["devices"]
                     for device in user_devices:
                         results[user_id] = {device["device_id"]: device["keys"]}
diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py
index 81b85352b1..28dbc6fcba 100644
--- a/synapse/replication/http/__init__.py
+++ b/synapse/replication/http/__init__.py
@@ -14,7 +14,14 @@
 # limitations under the License.
 
 from synapse.http.server import JsonResource
-from synapse.replication.http import federation, login, membership, register, send_event
+from synapse.replication.http import (
+    devices,
+    federation,
+    login,
+    membership,
+    register,
+    send_event,
+)
 
 REPLICATION_PREFIX = "/_synapse/replication"
 
@@ -30,3 +37,4 @@ class ReplicationRestResource(JsonResource):
         federation.register_servlets(hs, self)
         login.register_servlets(hs, self)
         register.register_servlets(hs, self)
+        devices.register_servlets(hs, self)
diff --git a/synapse/replication/http/devices.py b/synapse/replication/http/devices.py
new file mode 100644
index 0000000000..e32aac0a25
--- /dev/null
+++ b/synapse/replication/http/devices.py
@@ -0,0 +1,73 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from synapse.replication.http._base import ReplicationEndpoint
+
+logger = logging.getLogger(__name__)
+
+
+class ReplicationUserDevicesResyncRestServlet(ReplicationEndpoint):
+    """Ask master to resync the device list for a user by contacting their
+    server.
+
+    This must happen on master so that the results can be correctly cached in
+    the database and streamed to workers.
+
+    Request format:
+
+        POST /_synapse/replication/user_device_resync/:user_id
+
+        {}
+
+    Response is equivalent to ` /_matrix/federation/v1/user/devices/:user_id`
+    response, e.g.:
+
+        {
+            "user_id": "@alice:example.org",
+            "devices": [
+                {
+                    "device_id": "JLAFKJWSCS",
+                    "keys": { ... },
+                    "device_display_name": "Alice's Mobile Phone"
+                }
+            ]
+        }
+    """
+
+    NAME = "user_device_resync"
+    PATH_ARGS = ("user_id",)
+    CACHE = False
+
+    def __init__(self, hs):
+        super(ReplicationUserDevicesResyncRestServlet, self).__init__(hs)
+
+        self.device_list_updater = hs.get_device_handler().device_list_updater
+        self.store = hs.get_datastore()
+        self.clock = hs.get_clock()
+
+    @staticmethod
+    def _serialize_payload(user_id):
+        return {}
+
+    async def _handle_request(self, request, user_id):
+        user_devices = await self.device_list_updater.user_device_resync(user_id)
+
+        return 200, user_devices
+
+
+def register_servlets(hs, http_server):
+    ReplicationUserDevicesResyncRestServlet(hs).register(http_server)