summary refs log tree commit diff
path: root/synapse/handlers/e2e_keys.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/e2e_keys.py')
-rw-r--r--synapse/handlers/e2e_keys.py115
1 files changed, 90 insertions, 25 deletions
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 73a14cf952..9c7e9494d6 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -13,12 +13,15 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import collections
 import json
 import logging
 
 from twisted.internet import defer
 
+from synapse.api import errors
 import synapse.types
+
 from ._base import BaseHandler
 
 logger = logging.getLogger(__name__)
@@ -29,39 +32,101 @@ class E2eKeysHandler(BaseHandler):
         super(E2eKeysHandler, self).__init__(hs)
         self.store = hs.get_datastore()
         self.federation = hs.get_replication_layer()
-        self.is_mine = hs.is_mine
+        self.is_mine_id = hs.is_mine_id
+
+        # doesn't really work as part of the generic query API, because the
+        # query request requires an object POST, but we abuse the
+        # "query handler" interface.
+        self.federation.register_query_handler(
+            "client_keys", self.on_federation_query_client_keys
+        )
 
     @defer.inlineCallbacks
     def query_devices(self, query_body):
-        local_query = []
-        remote_queries = {}
-        for user_id, device_ids in query_body.get("device_keys", {}).items():
+        """ Handle a device key query from a client
+
+        {
+            "device_keys": {
+                "<user_id>": ["<device_id>"]
+            }
+        }
+        ->
+        {
+            "device_keys": {
+                "<user_id>": {
+                    "<device_id>": {
+                        ...
+                    }
+                }
+            }
+        }
+        """
+        device_keys_query = query_body.get("device_keys", {})
+
+        # separate users by domain.
+        # make a map from domain to user_id to device_ids
+        queries_by_domain = collections.defaultdict(dict)
+        for user_id, device_ids in device_keys_query.items():
             user = synapse.types.UserID.from_string(user_id)
-            if self.is_mine(user):
-                if not device_ids:
-                    local_query.append((user_id, None))
-                else:
-                    for device_id in device_ids:
-                        local_query.append((user_id, device_id))
+            queries_by_domain[user.domain][user_id] = device_ids
+
+        # do the queries
+        # TODO: do these in parallel
+        results = {}
+        for destination, destination_query in queries_by_domain.items():
+            if destination == self.hs.hostname:
+                res = yield self.query_local_devices(destination_query)
             else:
-                remote_queries.setdefault(user.domain, {})[user_id] = list(
-                    device_ids
+                res = yield self.federation.query_client_keys(
+                    destination, {"device_keys": destination_query}
                 )
+                res = res["device_keys"]
+            for user_id, keys in res.items():
+                if user_id in destination_query:
+                    results[user_id] = keys
+
+        defer.returnValue((200, {"device_keys": results}))
+
+    @defer.inlineCallbacks
+    def query_local_devices(self, query):
+        """Get E2E device keys for local users
+
+        Args:
+            query (dict[string, list[string]|None): map from user_id to a list
+                 of devices to query (None for all devices)
+
+        Returns:
+            defer.Deferred: (resolves to dict[string, dict[string, dict]]):
+                 map from user_id -> device_id -> device details
+        """
+        local_query = []
+
+        for user_id, device_ids in query.items():
+            if not self.is_mine_id(user_id):
+                logger.warning("Request for keys for non-local user %s",
+                               user_id)
+                raise errors.SynapseError(400, "Not a user here")
+
+            if not device_ids:
+                local_query.append((user_id, None))
+            else:
+                for device_id in device_ids:
+                    local_query.append((user_id, device_id))
+
         results = yield self.store.get_e2e_device_keys(local_query)
 
-        json_result = {}
+        # un-jsonify the results
+        json_result = collections.defaultdict(dict)
         for user_id, device_keys in results.items():
             for device_id, json_bytes in device_keys.items():
-                json_result.setdefault(user_id, {})[
-                    device_id] = json.loads(
-                    json_bytes
-                )
+                json_result[user_id][device_id] = json.loads(json_bytes)
 
-        for destination, device_keys in remote_queries.items():
-            remote_result = yield self.federation.query_client_keys(
-                destination, {"device_keys": device_keys}
-            )
-            for user_id, keys in remote_result["device_keys"].items():
-                if user_id in device_keys:
-                    json_result[user_id] = keys
-        defer.returnValue((200, {"device_keys": json_result}))
+        defer.returnValue(json_result)
+
+    @defer.inlineCallbacks
+    def on_federation_query_client_keys(self, query_body):
+        """ Handle a device key query from a federated server
+        """
+        device_keys_query = query_body.get("device_keys", {})
+        res = yield self.query_local_devices(device_keys_query)
+        defer.returnValue({"device_keys": res})