summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--scripts-dev/federation_client.py1
-rw-r--r--synapse/federation/federation_server.py20
-rw-r--r--synapse/federation/transport/server.py4
-rw-r--r--synapse/handlers/e2e_keys.py130
-rw-r--r--synapse/rest/client/v2_alpha/keys.py46
-rw-r--r--synapse/server.py45
-rw-r--r--synapse/server.pyi4
-rw-r--r--synapse/storage/events.py38
8 files changed, 186 insertions, 102 deletions
diff --git a/scripts-dev/federation_client.py b/scripts-dev/federation_client.py
index caa3cee4e7..59c3dce3d7 100644
--- a/scripts-dev/federation_client.py
+++ b/scripts-dev/federation_client.py
@@ -128,6 +128,7 @@ def get_json(origin_name, origin_key, destination, path):
         headers={"Authorization": authorization_headers[0]},
         verify=False,
     )
+    sys.stderr.write("Status Code: %d\n" % (result.status_code,))
     return result.json()
 
 
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 612d274bdb..8ec5b190c8 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -372,27 +372,9 @@ class FederationServer(FederationBase):
             (200, send_content)
         )
 
-    @defer.inlineCallbacks
     @log_function
     def on_query_client_keys(self, origin, content):
-        query = []
-        for user_id, device_ids in content.get("device_keys", {}).items():
-            if not device_ids:
-                query.append((user_id, None))
-            else:
-                for device_id in device_ids:
-                    query.append((user_id, device_id))
-
-        results = yield self.store.get_e2e_device_keys(query)
-
-        json_result = {}
-        for user_id, device_keys in results.items():
-            for device_id, json_bytes in device_keys.items():
-                json_result.setdefault(user_id, {})[device_id] = json.loads(
-                    json_bytes
-                )
-
-        defer.returnValue({"device_keys": json_result})
+        return self.on_query_request("client_keys", content)
 
     @defer.inlineCallbacks
     @log_function
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 26fa88ae84..1a88413d18 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -367,10 +367,8 @@ class FederationThirdPartyInviteExchangeServlet(BaseFederationServlet):
 class FederationClientKeysQueryServlet(BaseFederationServlet):
     PATH = "/user/keys/query"
 
-    @defer.inlineCallbacks
     def on_POST(self, origin, content, query):
-        response = yield self.handler.on_query_client_keys(origin, content)
-        defer.returnValue((200, response))
+        return self.handler.on_query_client_keys(origin, content)
 
 
 class FederationClientKeysClaimServlet(BaseFederationServlet):
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
new file mode 100644
index 0000000000..1312cdf5ab
--- /dev/null
+++ b/synapse/handlers/e2e_keys.py
@@ -0,0 +1,130 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import collections
+import json
+import logging
+
+from twisted.internet import defer
+
+from synapse.api import errors
+import synapse.types
+
+logger = logging.getLogger(__name__)
+
+
+class E2eKeysHandler(object):
+    def __init__(self, hs):
+        self.store = hs.get_datastore()
+        self.federation = hs.get_replication_layer()
+        self.is_mine_id = hs.is_mine_id
+        self.server_name = hs.hostname
+
+        # doesn't really work as part of the generic query API, because the
+        # query request requires an object POST, but we abuse the
+        # "query handler" interface.
+        self.federation.register_query_handler(
+            "client_keys", self.on_federation_query_client_keys
+        )
+
+    @defer.inlineCallbacks
+    def query_devices(self, query_body):
+        """ Handle a device key query from a client
+
+        {
+            "device_keys": {
+                "<user_id>": ["<device_id>"]
+            }
+        }
+        ->
+        {
+            "device_keys": {
+                "<user_id>": {
+                    "<device_id>": {
+                        ...
+                    }
+                }
+            }
+        }
+        """
+        device_keys_query = query_body.get("device_keys", {})
+
+        # separate users by domain.
+        # make a map from domain to user_id to device_ids
+        queries_by_domain = collections.defaultdict(dict)
+        for user_id, device_ids in device_keys_query.items():
+            user = synapse.types.UserID.from_string(user_id)
+            queries_by_domain[user.domain][user_id] = device_ids
+
+        # do the queries
+        # TODO: do these in parallel
+        results = {}
+        for destination, destination_query in queries_by_domain.items():
+            if destination == self.server_name:
+                res = yield self.query_local_devices(destination_query)
+            else:
+                res = yield self.federation.query_client_keys(
+                    destination, {"device_keys": destination_query}
+                )
+                res = res["device_keys"]
+            for user_id, keys in res.items():
+                if user_id in destination_query:
+                    results[user_id] = keys
+
+        defer.returnValue((200, {"device_keys": results}))
+
+    @defer.inlineCallbacks
+    def query_local_devices(self, query):
+        """Get E2E device keys for local users
+
+        Args:
+            query (dict[string, list[string]|None): map from user_id to a list
+                 of devices to query (None for all devices)
+
+        Returns:
+            defer.Deferred: (resolves to dict[string, dict[string, dict]]):
+                 map from user_id -> device_id -> device details
+        """
+        local_query = []
+
+        for user_id, device_ids in query.items():
+            if not self.is_mine_id(user_id):
+                logger.warning("Request for keys for non-local user %s",
+                               user_id)
+                raise errors.SynapseError(400, "Not a user here")
+
+            if not device_ids:
+                local_query.append((user_id, None))
+            else:
+                for device_id in device_ids:
+                    local_query.append((user_id, device_id))
+
+        results = yield self.store.get_e2e_device_keys(local_query)
+
+        # un-jsonify the results
+        json_result = collections.defaultdict(dict)
+        for user_id, device_keys in results.items():
+            for device_id, json_bytes in device_keys.items():
+                json_result[user_id][device_id] = json.loads(json_bytes)
+
+        defer.returnValue(json_result)
+
+    @defer.inlineCallbacks
+    def on_federation_query_client_keys(self, query_body):
+        """ Handle a device key query from a federated server
+        """
+        device_keys_query = query_body.get("device_keys", {})
+        res = yield self.query_local_devices(device_keys_query)
+        defer.returnValue({"device_keys": res})
diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py
index 5fa33aceea..c5ff16adf3 100644
--- a/synapse/rest/client/v2_alpha/keys.py
+++ b/synapse/rest/client/v2_alpha/keys.py
@@ -184,17 +184,19 @@ class KeyQueryServlet(RestServlet):
     )
 
     def __init__(self, hs):
+        """
+        Args:
+            hs (synapse.server.HomeServer):
+        """
         super(KeyQueryServlet, self).__init__()
-        self.store = hs.get_datastore()
         self.auth = hs.get_auth()
-        self.federation = hs.get_replication_layer()
-        self.is_mine = hs.is_mine
+        self.e2e_keys_handler = hs.get_e2e_keys_handler()
 
     @defer.inlineCallbacks
     def on_POST(self, request, user_id, device_id):
         yield self.auth.get_user_by_req(request)
         body = parse_json_object_from_request(request)
-        result = yield self.handle_request(body)
+        result = yield self.e2e_keys_handler.query_devices(body)
         defer.returnValue(result)
 
     @defer.inlineCallbacks
@@ -203,45 +205,11 @@ class KeyQueryServlet(RestServlet):
         auth_user_id = requester.user.to_string()
         user_id = user_id if user_id else auth_user_id
         device_ids = [device_id] if device_id else []
-        result = yield self.handle_request(
+        result = yield self.e2e_keys_handler.query_devices(
             {"device_keys": {user_id: device_ids}}
         )
         defer.returnValue(result)
 
-    @defer.inlineCallbacks
-    def handle_request(self, body):
-        local_query = []
-        remote_queries = {}
-        for user_id, device_ids in body.get("device_keys", {}).items():
-            user = UserID.from_string(user_id)
-            if self.is_mine(user):
-                if not device_ids:
-                    local_query.append((user_id, None))
-                else:
-                    for device_id in device_ids:
-                        local_query.append((user_id, device_id))
-            else:
-                remote_queries.setdefault(user.domain, {})[user_id] = list(
-                    device_ids
-                )
-        results = yield self.store.get_e2e_device_keys(local_query)
-
-        json_result = {}
-        for user_id, device_keys in results.items():
-            for device_id, json_bytes in device_keys.items():
-                json_result.setdefault(user_id, {})[device_id] = json.loads(
-                    json_bytes
-                )
-
-        for destination, device_keys in remote_queries.items():
-            remote_result = yield self.federation.query_client_keys(
-                destination, {"device_keys": device_keys}
-            )
-            for user_id, keys in remote_result["device_keys"].items():
-                if user_id in device_keys:
-                    json_result[user_id] = keys
-        defer.returnValue((200, {"device_keys": json_result}))
-
 
 class OneTimeKeyServlet(RestServlet):
     """
diff --git a/synapse/server.py b/synapse/server.py
index e8b166990d..6bb4988309 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -19,39 +19,38 @@
 # partial one for unit test mocking.
 
 # Imports required for the default HomeServer() implementation
-from twisted.web.client import BrowserLikePolicyForHTTPS
+import logging
+
 from twisted.enterprise import adbapi
+from twisted.web.client import BrowserLikePolicyForHTTPS
 
-from synapse.appservice.scheduler import ApplicationServiceScheduler
+from synapse.api.auth import Auth
+from synapse.api.filtering import Filtering
+from synapse.api.ratelimiting import Ratelimiter
 from synapse.appservice.api import ApplicationServiceApi
+from synapse.appservice.scheduler import ApplicationServiceScheduler
+from synapse.crypto.keyring import Keyring
+from synapse.events.builder import EventBuilderFactory
 from synapse.federation import initialize_http_replication
-from synapse.handlers.device import DeviceHandler
-from synapse.http.client import SimpleHttpClient, InsecureInterceptableContextFactory
-from synapse.notifier import Notifier
-from synapse.api.auth import Auth
 from synapse.handlers import Handlers
+from synapse.handlers.appservice import ApplicationServicesHandler
+from synapse.handlers.auth import AuthHandler
+from synapse.handlers.device import DeviceHandler
+from synapse.handlers.e2e_keys import E2eKeysHandler
 from synapse.handlers.presence import PresenceHandler
+from synapse.handlers.room import RoomListHandler
 from synapse.handlers.sync import SyncHandler
 from synapse.handlers.typing import TypingHandler
-from synapse.handlers.room import RoomListHandler
-from synapse.handlers.auth import AuthHandler
-from synapse.handlers.appservice import ApplicationServicesHandler
+from synapse.http.client import SimpleHttpClient, InsecureInterceptableContextFactory
+from synapse.http.matrixfederationclient import MatrixFederationHttpClient
+from synapse.notifier import Notifier
+from synapse.push.pusherpool import PusherPool
+from synapse.rest.media.v1.media_repository import MediaRepository
 from synapse.state import StateHandler
 from synapse.storage import DataStore
+from synapse.streams.events import EventSources
 from synapse.util import Clock
 from synapse.util.distributor import Distributor
-from synapse.streams.events import EventSources
-from synapse.api.ratelimiting import Ratelimiter
-from synapse.crypto.keyring import Keyring
-from synapse.push.pusherpool import PusherPool
-from synapse.events.builder import EventBuilderFactory
-from synapse.api.filtering import Filtering
-from synapse.rest.media.v1.media_repository import MediaRepository
-
-from synapse.http.matrixfederationclient import MatrixFederationHttpClient
-
-import logging
-
 
 logger = logging.getLogger(__name__)
 
@@ -94,6 +93,7 @@ class HomeServer(object):
         'room_list_handler',
         'auth_handler',
         'device_handler',
+        'e2e_keys_handler',
         'application_service_api',
         'application_service_scheduler',
         'application_service_handler',
@@ -202,6 +202,9 @@ class HomeServer(object):
     def build_device_handler(self):
         return DeviceHandler(self)
 
+    def build_e2e_keys_handler(self):
+        return E2eKeysHandler(self)
+
     def build_application_service_api(self):
         return ApplicationServiceApi(self)
 
diff --git a/synapse/server.pyi b/synapse/server.pyi
index 902f725c06..c0aa868c4f 100644
--- a/synapse/server.pyi
+++ b/synapse/server.pyi
@@ -1,6 +1,7 @@
 import synapse.handlers
 import synapse.handlers.auth
 import synapse.handlers.device
+import synapse.handlers.e2e_keys
 import synapse.storage
 import synapse.state
 
@@ -14,6 +15,9 @@ class HomeServer(object):
     def get_device_handler(self) -> synapse.handlers.device.DeviceHandler:
         pass
 
+    def get_e2e_keys_handler(self) -> synapse.handlers.e2e_keys.E2eKeysHandler:
+        pass
+
     def get_handlers(self) -> synapse.handlers.Handlers:
         pass
 
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index c63ca36df6..4664cfe6d9 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -26,7 +26,7 @@ from synapse.api.constants import EventTypes
 from synapse.api.errors import SynapseError
 
 from canonicaljson import encode_canonical_json
-from collections import deque, namedtuple
+from collections import deque, namedtuple, OrderedDict
 
 import synapse
 import synapse.metrics
@@ -403,6 +403,23 @@ class EventsStore(SQLBaseStore):
         and the rejections table. Things reading from those table will need to check
         whether the event was rejected.
         """
+        # Ensure that we don't have the same event twice.
+        # Pick the earliest non-outlier if there is one, else the earliest one.
+        new_events_and_contexts = OrderedDict()
+        for event, context in events_and_contexts:
+            prev_event_context = new_events_and_contexts.get(event.event_id)
+            if prev_event_context:
+                if not event.internal_metadata.is_outlier():
+                    if prev_event_context[0].internal_metadata.is_outlier():
+                        # To ensure correct ordering we pop, as OrderedDict is
+                        # ordered by first insertion.
+                        new_events_and_contexts.pop(event.event_id, None)
+                        new_events_and_contexts[event.event_id] = (event, context)
+            else:
+                new_events_and_contexts[event.event_id] = (event, context)
+
+        events_and_contexts = new_events_and_contexts.values()
+
         depth_updates = {}
         for event, context in events_and_contexts:
             # Remove the any existing cache entries for the event_ids
@@ -433,8 +450,6 @@ class EventsStore(SQLBaseStore):
             for event_id, outlier in txn.fetchall()
         }
 
-        # Remove the events that we've seen before.
-        event_map = {}
         to_remove = set()
         for event, context in events_and_contexts:
             if context.rejected:
@@ -445,23 +460,6 @@ class EventsStore(SQLBaseStore):
                     to_remove.add(event)
                 continue
 
-            # Handle the case of the list including the same event multiple
-            # times. The tricky thing here is when they differ by whether
-            # they are an outlier.
-            if event.event_id in event_map:
-                other = event_map[event.event_id]
-
-                if not other.internal_metadata.is_outlier():
-                    to_remove.add(event)
-                    continue
-                elif not event.internal_metadata.is_outlier():
-                    to_remove.add(event)
-                    continue
-                else:
-                    to_remove.add(other)
-
-            event_map[event.event_id] = event
-
             if event.event_id not in have_persisted:
                 continue