diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 612d274bdb..8ec5b190c8 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -372,27 +372,9 @@ class FederationServer(FederationBase):
(200, send_content)
)
- @defer.inlineCallbacks
@log_function
def on_query_client_keys(self, origin, content):
- query = []
- for user_id, device_ids in content.get("device_keys", {}).items():
- if not device_ids:
- query.append((user_id, None))
- else:
- for device_id in device_ids:
- query.append((user_id, device_id))
-
- results = yield self.store.get_e2e_device_keys(query)
-
- json_result = {}
- for user_id, device_keys in results.items():
- for device_id, json_bytes in device_keys.items():
- json_result.setdefault(user_id, {})[device_id] = json.loads(
- json_bytes
- )
-
- defer.returnValue({"device_keys": json_result})
+ return self.on_query_request("client_keys", content)
@defer.inlineCallbacks
@log_function
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 26fa88ae84..1a88413d18 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -367,10 +367,8 @@ class FederationThirdPartyInviteExchangeServlet(BaseFederationServlet):
class FederationClientKeysQueryServlet(BaseFederationServlet):
PATH = "/user/keys/query"
- @defer.inlineCallbacks
def on_POST(self, origin, content, query):
- response = yield self.handler.on_query_client_keys(origin, content)
- defer.returnValue((200, response))
+ return self.handler.on_query_client_keys(origin, content)
class FederationClientKeysClaimServlet(BaseFederationServlet):
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
new file mode 100644
index 0000000000..1312cdf5ab
--- /dev/null
+++ b/synapse/handlers/e2e_keys.py
@@ -0,0 +1,130 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import collections
+import json
+import logging
+
+from twisted.internet import defer
+
+from synapse.api import errors
+import synapse.types
+
+logger = logging.getLogger(__name__)
+
+
+class E2eKeysHandler(object):
+ def __init__(self, hs):
+ self.store = hs.get_datastore()
+ self.federation = hs.get_replication_layer()
+ self.is_mine_id = hs.is_mine_id
+ self.server_name = hs.hostname
+
+ # doesn't really work as part of the generic query API, because the
+ # query request requires an object POST, but we abuse the
+ # "query handler" interface.
+ self.federation.register_query_handler(
+ "client_keys", self.on_federation_query_client_keys
+ )
+
+ @defer.inlineCallbacks
+ def query_devices(self, query_body):
+ """ Handle a device key query from a client
+
+ {
+ "device_keys": {
+ "<user_id>": ["<device_id>"]
+ }
+ }
+ ->
+ {
+ "device_keys": {
+ "<user_id>": {
+ "<device_id>": {
+ ...
+ }
+ }
+ }
+ }
+ """
+ device_keys_query = query_body.get("device_keys", {})
+
+ # separate users by domain.
+ # make a map from domain to user_id to device_ids
+ queries_by_domain = collections.defaultdict(dict)
+ for user_id, device_ids in device_keys_query.items():
+ user = synapse.types.UserID.from_string(user_id)
+ queries_by_domain[user.domain][user_id] = device_ids
+
+ # do the queries
+ # TODO: do these in parallel
+ results = {}
+ for destination, destination_query in queries_by_domain.items():
+ if destination == self.server_name:
+ res = yield self.query_local_devices(destination_query)
+ else:
+ res = yield self.federation.query_client_keys(
+ destination, {"device_keys": destination_query}
+ )
+ res = res["device_keys"]
+ for user_id, keys in res.items():
+ if user_id in destination_query:
+ results[user_id] = keys
+
+ defer.returnValue((200, {"device_keys": results}))
+
+ @defer.inlineCallbacks
+ def query_local_devices(self, query):
+ """Get E2E device keys for local users
+
+ Args:
+ query (dict[string, list[string]|None): map from user_id to a list
+ of devices to query (None for all devices)
+
+ Returns:
+ defer.Deferred: (resolves to dict[string, dict[string, dict]]):
+ map from user_id -> device_id -> device details
+ """
+ local_query = []
+
+ for user_id, device_ids in query.items():
+ if not self.is_mine_id(user_id):
+ logger.warning("Request for keys for non-local user %s",
+ user_id)
+ raise errors.SynapseError(400, "Not a user here")
+
+ if not device_ids:
+ local_query.append((user_id, None))
+ else:
+ for device_id in device_ids:
+ local_query.append((user_id, device_id))
+
+ results = yield self.store.get_e2e_device_keys(local_query)
+
+ # un-jsonify the results
+ json_result = collections.defaultdict(dict)
+ for user_id, device_keys in results.items():
+ for device_id, json_bytes in device_keys.items():
+ json_result[user_id][device_id] = json.loads(json_bytes)
+
+ defer.returnValue(json_result)
+
+ @defer.inlineCallbacks
+ def on_federation_query_client_keys(self, query_body):
+ """ Handle a device key query from a federated server
+ """
+ device_keys_query = query_body.get("device_keys", {})
+ res = yield self.query_local_devices(device_keys_query)
+ defer.returnValue({"device_keys": res})
diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py
index dc1d4d8fc6..705a0b6c17 100644
--- a/synapse/rest/client/v2_alpha/keys.py
+++ b/synapse/rest/client/v2_alpha/keys.py
@@ -186,17 +186,19 @@ class KeyQueryServlet(RestServlet):
)
def __init__(self, hs):
+ """
+ Args:
+ hs (synapse.server.HomeServer):
+ """
super(KeyQueryServlet, self).__init__()
- self.store = hs.get_datastore()
self.auth = hs.get_auth()
- self.federation = hs.get_replication_layer()
- self.is_mine = hs.is_mine
+ self.e2e_keys_handler = hs.get_e2e_keys_handler()
@defer.inlineCallbacks
def on_POST(self, request, user_id, device_id):
yield self.auth.get_user_by_req(request)
body = parse_json_object_from_request(request)
- result = yield self.handle_request(body)
+ result = yield self.e2e_keys_handler.query_devices(body)
defer.returnValue(result)
@defer.inlineCallbacks
@@ -205,45 +207,11 @@ class KeyQueryServlet(RestServlet):
auth_user_id = requester.user.to_string()
user_id = user_id if user_id else auth_user_id
device_ids = [device_id] if device_id else []
- result = yield self.handle_request(
+ result = yield self.e2e_keys_handler.query_devices(
{"device_keys": {user_id: device_ids}}
)
defer.returnValue(result)
- @defer.inlineCallbacks
- def handle_request(self, body):
- local_query = []
- remote_queries = {}
- for user_id, device_ids in body.get("device_keys", {}).items():
- user = UserID.from_string(user_id)
- if self.is_mine(user):
- if not device_ids:
- local_query.append((user_id, None))
- else:
- for device_id in device_ids:
- local_query.append((user_id, device_id))
- else:
- remote_queries.setdefault(user.domain, {})[user_id] = list(
- device_ids
- )
- results = yield self.store.get_e2e_device_keys(local_query)
-
- json_result = {}
- for user_id, device_keys in results.items():
- for device_id, json_bytes in device_keys.items():
- json_result.setdefault(user_id, {})[device_id] = json.loads(
- json_bytes
- )
-
- for destination, device_keys in remote_queries.items():
- remote_result = yield self.federation.query_client_keys(
- destination, {"device_keys": device_keys}
- )
- for user_id, keys in remote_result["device_keys"].items():
- if user_id in device_keys:
- json_result[user_id] = keys
- defer.returnValue((200, {"device_keys": json_result}))
-
class OneTimeKeyServlet(RestServlet):
"""
diff --git a/synapse/server.py b/synapse/server.py
index e8b166990d..6bb4988309 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -19,39 +19,38 @@
# partial one for unit test mocking.
# Imports required for the default HomeServer() implementation
-from twisted.web.client import BrowserLikePolicyForHTTPS
+import logging
+
from twisted.enterprise import adbapi
+from twisted.web.client import BrowserLikePolicyForHTTPS
-from synapse.appservice.scheduler import ApplicationServiceScheduler
+from synapse.api.auth import Auth
+from synapse.api.filtering import Filtering
+from synapse.api.ratelimiting import Ratelimiter
from synapse.appservice.api import ApplicationServiceApi
+from synapse.appservice.scheduler import ApplicationServiceScheduler
+from synapse.crypto.keyring import Keyring
+from synapse.events.builder import EventBuilderFactory
from synapse.federation import initialize_http_replication
-from synapse.handlers.device import DeviceHandler
-from synapse.http.client import SimpleHttpClient, InsecureInterceptableContextFactory
-from synapse.notifier import Notifier
-from synapse.api.auth import Auth
from synapse.handlers import Handlers
+from synapse.handlers.appservice import ApplicationServicesHandler
+from synapse.handlers.auth import AuthHandler
+from synapse.handlers.device import DeviceHandler
+from synapse.handlers.e2e_keys import E2eKeysHandler
from synapse.handlers.presence import PresenceHandler
+from synapse.handlers.room import RoomListHandler
from synapse.handlers.sync import SyncHandler
from synapse.handlers.typing import TypingHandler
-from synapse.handlers.room import RoomListHandler
-from synapse.handlers.auth import AuthHandler
-from synapse.handlers.appservice import ApplicationServicesHandler
+from synapse.http.client import SimpleHttpClient, InsecureInterceptableContextFactory
+from synapse.http.matrixfederationclient import MatrixFederationHttpClient
+from synapse.notifier import Notifier
+from synapse.push.pusherpool import PusherPool
+from synapse.rest.media.v1.media_repository import MediaRepository
from synapse.state import StateHandler
from synapse.storage import DataStore
+from synapse.streams.events import EventSources
from synapse.util import Clock
from synapse.util.distributor import Distributor
-from synapse.streams.events import EventSources
-from synapse.api.ratelimiting import Ratelimiter
-from synapse.crypto.keyring import Keyring
-from synapse.push.pusherpool import PusherPool
-from synapse.events.builder import EventBuilderFactory
-from synapse.api.filtering import Filtering
-from synapse.rest.media.v1.media_repository import MediaRepository
-
-from synapse.http.matrixfederationclient import MatrixFederationHttpClient
-
-import logging
-
logger = logging.getLogger(__name__)
@@ -94,6 +93,7 @@ class HomeServer(object):
'room_list_handler',
'auth_handler',
'device_handler',
+ 'e2e_keys_handler',
'application_service_api',
'application_service_scheduler',
'application_service_handler',
@@ -202,6 +202,9 @@ class HomeServer(object):
def build_device_handler(self):
return DeviceHandler(self)
+ def build_e2e_keys_handler(self):
+ return E2eKeysHandler(self)
+
def build_application_service_api(self):
return ApplicationServiceApi(self)
diff --git a/synapse/server.pyi b/synapse/server.pyi
index 902f725c06..c0aa868c4f 100644
--- a/synapse/server.pyi
+++ b/synapse/server.pyi
@@ -1,6 +1,7 @@
import synapse.handlers
import synapse.handlers.auth
import synapse.handlers.device
+import synapse.handlers.e2e_keys
import synapse.storage
import synapse.state
@@ -14,6 +15,9 @@ class HomeServer(object):
def get_device_handler(self) -> synapse.handlers.device.DeviceHandler:
pass
+ def get_e2e_keys_handler(self) -> synapse.handlers.e2e_keys.E2eKeysHandler:
+ pass
+
def get_handlers(self) -> synapse.handlers.Handlers:
pass
|