summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/federation/federation_client.py113
-rw-r--r--synapse/federation/federation_server.py43
-rw-r--r--synapse/federation/transport/client.py22
-rw-r--r--synapse/federation/transport/server.py16
-rw-r--r--synapse/handlers/device.py2
-rw-r--r--synapse/handlers/e2e_keys.py139
-rw-r--r--synapse/rest/client/v2_alpha/keys.py50
-rw-r--r--synapse/rest/media/v1/preview_url_resource.py86
-rw-r--r--synapse/server.py45
-rw-r--r--synapse/server.pyi4
-rw-r--r--synapse/storage/end_to_end_keys.py60
-rw-r--r--synapse/storage/events.py116
-rw-r--r--synapse/storage/schema/delta/33/devices_for_e2e_keys.sql2
-rw-r--r--synapse/storage/schema/delta/33/devices_for_e2e_keys_clear_unknown_device.sql20
14 files changed, 573 insertions, 145 deletions
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index b06387051c..65778fd4ee 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -314,6 +314,40 @@ class FederationClient(FederationBase):
             Deferred: Results in a list of PDUs.
         """
 
+        try:
+            # First we try and ask for just the IDs, as thats far quicker if
+            # we have most of the state and auth_chain already.
+            # However, this may 404 if the other side has an old synapse.
+            result = yield self.transport_layer.get_room_state_ids(
+                destination, room_id, event_id=event_id,
+            )
+
+            state_event_ids = result["pdu_ids"]
+            auth_event_ids = result.get("auth_chain_ids", [])
+
+            fetched_events, failed_to_fetch = yield self.get_events(
+                [destination], room_id, set(state_event_ids + auth_event_ids)
+            )
+
+            if failed_to_fetch:
+                logger.warn("Failed to get %r", failed_to_fetch)
+
+            event_map = {
+                ev.event_id: ev for ev in fetched_events
+            }
+
+            pdus = [event_map[e_id] for e_id in state_event_ids]
+            auth_chain = [event_map[e_id] for e_id in auth_event_ids]
+
+            auth_chain.sort(key=lambda e: e.depth)
+
+            defer.returnValue((pdus, auth_chain))
+        except HttpResponseException as e:
+            if e.code == 400 or e.code == 404:
+                logger.info("Failed to use get_room_state_ids API, falling back")
+            else:
+                raise e
+
         result = yield self.transport_layer.get_room_state(
             destination, room_id, event_id=event_id,
         )
@@ -327,12 +361,26 @@ class FederationClient(FederationBase):
             for p in result.get("auth_chain", [])
         ]
 
+        seen_events = yield self.store.get_events([
+            ev.event_id for ev in itertools.chain(pdus, auth_chain)
+        ])
+
         signed_pdus = yield self._check_sigs_and_hash_and_fetch(
-            destination, pdus, outlier=True
+            destination,
+            [p for p in pdus if p.event_id not in seen_events],
+            outlier=True
+        )
+        signed_pdus.extend(
+            seen_events[p.event_id] for p in pdus if p.event_id in seen_events
         )
 
         signed_auth = yield self._check_sigs_and_hash_and_fetch(
-            destination, auth_chain, outlier=True
+            destination,
+            [p for p in auth_chain if p.event_id not in seen_events],
+            outlier=True
+        )
+        signed_auth.extend(
+            seen_events[p.event_id] for p in auth_chain if p.event_id in seen_events
         )
 
         signed_auth.sort(key=lambda e: e.depth)
@@ -340,6 +388,67 @@ class FederationClient(FederationBase):
         defer.returnValue((signed_pdus, signed_auth))
 
     @defer.inlineCallbacks
+    def get_events(self, destinations, room_id, event_ids, return_local=True):
+        """Fetch events from some remote destinations, checking if we already
+        have them.
+
+        Args:
+            destinations (list)
+            room_id (str)
+            event_ids (list)
+            return_local (bool): Whether to include events we already have in
+                the DB in the returned list of events
+
+        Returns:
+            Deferred: A deferred resolving to a 2-tuple where the first is a list of
+            events and the second is a list of event ids that we failed to fetch.
+        """
+        if return_local:
+            seen_events = yield self.store.get_events(event_ids)
+            signed_events = seen_events.values()
+        else:
+            seen_events = yield self.store.have_events(event_ids)
+            signed_events = []
+
+        failed_to_fetch = set()
+
+        missing_events = set(event_ids)
+        for k in seen_events:
+            missing_events.discard(k)
+
+        if not missing_events:
+            defer.returnValue((signed_events, failed_to_fetch))
+
+        def random_server_list():
+            srvs = list(destinations)
+            random.shuffle(srvs)
+            return srvs
+
+        batch_size = 20
+        missing_events = list(missing_events)
+        for i in xrange(0, len(missing_events), batch_size):
+            batch = set(missing_events[i:i + batch_size])
+
+            deferreds = [
+                self.get_pdu(
+                    destinations=random_server_list(),
+                    event_id=e_id,
+                )
+                for e_id in batch
+            ]
+
+            res = yield defer.DeferredList(deferreds, consumeErrors=True)
+            for success, result in res:
+                if success:
+                    signed_events.append(result)
+                    batch.discard(result.event_id)
+
+            # We removed all events we successfully fetched from `batch`
+            failed_to_fetch.update(batch)
+
+        defer.returnValue((signed_events, failed_to_fetch))
+
+    @defer.inlineCallbacks
     @log_function
     def get_event_auth(self, destination, room_id, event_id):
         res = yield self.transport_layer.get_event_auth(
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 612d274bdb..aba19639c7 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -215,6 +215,27 @@ class FederationServer(FederationBase):
         defer.returnValue((200, resp))
 
     @defer.inlineCallbacks
+    def on_state_ids_request(self, origin, room_id, event_id):
+        if not event_id:
+            raise NotImplementedError("Specify an event")
+
+        in_room = yield self.auth.check_host_in_room(room_id, origin)
+        if not in_room:
+            raise AuthError(403, "Host not in room.")
+
+        pdus = yield self.handler.get_state_for_pdu(
+            room_id, event_id,
+        )
+        auth_chain = yield self.store.get_auth_chain(
+            [pdu.event_id for pdu in pdus]
+        )
+
+        defer.returnValue((200, {
+            "pdu_ids": [pdu.event_id for pdu in pdus],
+            "auth_chain_ids": [pdu.event_id for pdu in auth_chain],
+        }))
+
+    @defer.inlineCallbacks
     def _on_context_state_request_compute(self, room_id, event_id):
         pdus = yield self.handler.get_state_for_pdu(
             room_id, event_id,
@@ -372,27 +393,9 @@ class FederationServer(FederationBase):
             (200, send_content)
         )
 
-    @defer.inlineCallbacks
     @log_function
     def on_query_client_keys(self, origin, content):
-        query = []
-        for user_id, device_ids in content.get("device_keys", {}).items():
-            if not device_ids:
-                query.append((user_id, None))
-            else:
-                for device_id in device_ids:
-                    query.append((user_id, device_id))
-
-        results = yield self.store.get_e2e_device_keys(query)
-
-        json_result = {}
-        for user_id, device_keys in results.items():
-            for device_id, json_bytes in device_keys.items():
-                json_result.setdefault(user_id, {})[device_id] = json.loads(
-                    json_bytes
-                )
-
-        defer.returnValue({"device_keys": json_result})
+        return self.on_query_request("client_keys", content)
 
     @defer.inlineCallbacks
     @log_function
@@ -602,7 +605,7 @@ class FederationServer(FederationBase):
                     origin, pdu.room_id, pdu.event_id,
                 )
             except:
-                logger.warn("Failed to get state for event: %s", pdu.event_id)
+                logger.exception("Failed to get state for event: %s", pdu.event_id)
 
         yield self.handler.on_receive_pdu(
             origin,
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index ebb698e278..3d088e43cb 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -55,6 +55,28 @@ class TransportLayerClient(object):
         )
 
     @log_function
+    def get_room_state_ids(self, destination, room_id, event_id):
+        """ Requests all state for a given room from the given server at the
+        given event. Returns the state's event_id's
+
+        Args:
+            destination (str): The host name of the remote home server we want
+                to get the state from.
+            context (str): The name of the context we want the state of
+            event_id (str): The event we want the context at.
+
+        Returns:
+            Deferred: Results in a dict received from the remote homeserver.
+        """
+        logger.debug("get_room_state_ids dest=%s, room=%s",
+                     destination, room_id)
+
+        path = PREFIX + "/state_ids/%s/" % room_id
+        return self.client.get_json(
+            destination, path=path, args={"event_id": event_id},
+        )
+
+    @log_function
     def get_event(self, destination, event_id, timeout=None):
         """ Requests the pdu with give id and origin from the given server.
 
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 26fa88ae84..0bc6e0801d 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -271,6 +271,17 @@ class FederationStateServlet(BaseFederationServlet):
         )
 
 
+class FederationStateIdsServlet(BaseFederationServlet):
+    PATH = "/state_ids/(?P<room_id>[^/]*)/"
+
+    def on_GET(self, origin, content, query, room_id):
+        return self.handler.on_state_ids_request(
+            origin,
+            room_id,
+            query.get("event_id", [None])[0],
+        )
+
+
 class FederationBackfillServlet(BaseFederationServlet):
     PATH = "/backfill/(?P<context>[^/]*)/"
 
@@ -367,10 +378,8 @@ class FederationThirdPartyInviteExchangeServlet(BaseFederationServlet):
 class FederationClientKeysQueryServlet(BaseFederationServlet):
     PATH = "/user/keys/query"
 
-    @defer.inlineCallbacks
     def on_POST(self, origin, content, query):
-        response = yield self.handler.on_query_client_keys(origin, content)
-        defer.returnValue((200, response))
+        return self.handler.on_query_client_keys(origin, content)
 
 
 class FederationClientKeysClaimServlet(BaseFederationServlet):
@@ -538,6 +547,7 @@ SERVLET_CLASSES = (
     FederationPullServlet,
     FederationEventServlet,
     FederationStateServlet,
+    FederationStateIdsServlet,
     FederationBackfillServlet,
     FederationQueryServlet,
     FederationMakeJoinServlet,
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index f4bf159bb5..8d630c6b1a 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -29,7 +29,7 @@ class DeviceHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def check_device_registered(self, user_id, device_id,
-                                initial_device_display_name):
+                                initial_device_display_name=None):
         """
         If the given device has not been registered, register it with the
         supplied display name.
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
new file mode 100644
index 0000000000..2c7bfd91ed
--- /dev/null
+++ b/synapse/handlers/e2e_keys.py
@@ -0,0 +1,139 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import collections
+import json
+import logging
+
+from twisted.internet import defer
+
+from synapse.api import errors
+import synapse.types
+
+logger = logging.getLogger(__name__)
+
+
+class E2eKeysHandler(object):
+    def __init__(self, hs):
+        self.store = hs.get_datastore()
+        self.federation = hs.get_replication_layer()
+        self.is_mine_id = hs.is_mine_id
+        self.server_name = hs.hostname
+
+        # doesn't really work as part of the generic query API, because the
+        # query request requires an object POST, but we abuse the
+        # "query handler" interface.
+        self.federation.register_query_handler(
+            "client_keys", self.on_federation_query_client_keys
+        )
+
+    @defer.inlineCallbacks
+    def query_devices(self, query_body):
+        """ Handle a device key query from a client
+
+        {
+            "device_keys": {
+                "<user_id>": ["<device_id>"]
+            }
+        }
+        ->
+        {
+            "device_keys": {
+                "<user_id>": {
+                    "<device_id>": {
+                        ...
+                    }
+                }
+            }
+        }
+        """
+        device_keys_query = query_body.get("device_keys", {})
+
+        # separate users by domain.
+        # make a map from domain to user_id to device_ids
+        queries_by_domain = collections.defaultdict(dict)
+        for user_id, device_ids in device_keys_query.items():
+            user = synapse.types.UserID.from_string(user_id)
+            queries_by_domain[user.domain][user_id] = device_ids
+
+        # do the queries
+        # TODO: do these in parallel
+        results = {}
+        for destination, destination_query in queries_by_domain.items():
+            if destination == self.server_name:
+                res = yield self.query_local_devices(destination_query)
+            else:
+                res = yield self.federation.query_client_keys(
+                    destination, {"device_keys": destination_query}
+                )
+                res = res["device_keys"]
+            for user_id, keys in res.items():
+                if user_id in destination_query:
+                    results[user_id] = keys
+
+        defer.returnValue((200, {"device_keys": results}))
+
+    @defer.inlineCallbacks
+    def query_local_devices(self, query):
+        """Get E2E device keys for local users
+
+        Args:
+            query (dict[string, list[string]|None): map from user_id to a list
+                 of devices to query (None for all devices)
+
+        Returns:
+            defer.Deferred: (resolves to dict[string, dict[string, dict]]):
+                 map from user_id -> device_id -> device details
+        """
+        local_query = []
+
+        result_dict = {}
+        for user_id, device_ids in query.items():
+            if not self.is_mine_id(user_id):
+                logger.warning("Request for keys for non-local user %s",
+                               user_id)
+                raise errors.SynapseError(400, "Not a user here")
+
+            if not device_ids:
+                local_query.append((user_id, None))
+            else:
+                for device_id in device_ids:
+                    local_query.append((user_id, device_id))
+
+            # make sure that each queried user appears in the result dict
+            result_dict[user_id] = {}
+
+        results = yield self.store.get_e2e_device_keys(local_query)
+
+        # Build the result structure, un-jsonify the results, and add the
+        # "unsigned" section
+        for user_id, device_keys in results.items():
+            for device_id, device_info in device_keys.items():
+                r = json.loads(device_info["key_json"])
+                r["unsigned"] = {}
+                display_name = device_info["device_display_name"]
+                if display_name is not None:
+                    r["unsigned"]["device_display_name"] = display_name
+                result_dict[user_id][device_id] = r
+
+        defer.returnValue(result_dict)
+
+    @defer.inlineCallbacks
+    def on_federation_query_client_keys(self, query_body):
+        """ Handle a device key query from a federated server
+        """
+        device_keys_query = query_body.get("device_keys", {})
+        res = yield self.query_local_devices(device_keys_query)
+        defer.returnValue({"device_keys": res})
diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py
index dc1d4d8fc6..c5ff16adf3 100644
--- a/synapse/rest/client/v2_alpha/keys.py
+++ b/synapse/rest/client/v2_alpha/keys.py
@@ -130,9 +130,7 @@ class KeyUploadServlet(RestServlet):
         # old access_token without an associated device_id. Either way, we
         # need to double-check the device is registered to avoid ending up with
         # keys without a corresponding device.
-        self.device_handler.check_device_registered(
-            user_id, device_id, "unknown device"
-        )
+        self.device_handler.check_device_registered(user_id, device_id)
 
         result = yield self.store.count_e2e_one_time_keys(user_id, device_id)
         defer.returnValue((200, {"one_time_key_counts": result}))
@@ -186,17 +184,19 @@ class KeyQueryServlet(RestServlet):
     )
 
     def __init__(self, hs):
+        """
+        Args:
+            hs (synapse.server.HomeServer):
+        """
         super(KeyQueryServlet, self).__init__()
-        self.store = hs.get_datastore()
         self.auth = hs.get_auth()
-        self.federation = hs.get_replication_layer()
-        self.is_mine = hs.is_mine
+        self.e2e_keys_handler = hs.get_e2e_keys_handler()
 
     @defer.inlineCallbacks
     def on_POST(self, request, user_id, device_id):
         yield self.auth.get_user_by_req(request)
         body = parse_json_object_from_request(request)
-        result = yield self.handle_request(body)
+        result = yield self.e2e_keys_handler.query_devices(body)
         defer.returnValue(result)
 
     @defer.inlineCallbacks
@@ -205,45 +205,11 @@ class KeyQueryServlet(RestServlet):
         auth_user_id = requester.user.to_string()
         user_id = user_id if user_id else auth_user_id
         device_ids = [device_id] if device_id else []
-        result = yield self.handle_request(
+        result = yield self.e2e_keys_handler.query_devices(
             {"device_keys": {user_id: device_ids}}
         )
         defer.returnValue(result)
 
-    @defer.inlineCallbacks
-    def handle_request(self, body):
-        local_query = []
-        remote_queries = {}
-        for user_id, device_ids in body.get("device_keys", {}).items():
-            user = UserID.from_string(user_id)
-            if self.is_mine(user):
-                if not device_ids:
-                    local_query.append((user_id, None))
-                else:
-                    for device_id in device_ids:
-                        local_query.append((user_id, device_id))
-            else:
-                remote_queries.setdefault(user.domain, {})[user_id] = list(
-                    device_ids
-                )
-        results = yield self.store.get_e2e_device_keys(local_query)
-
-        json_result = {}
-        for user_id, device_keys in results.items():
-            for device_id, json_bytes in device_keys.items():
-                json_result.setdefault(user_id, {})[device_id] = json.loads(
-                    json_bytes
-                )
-
-        for destination, device_keys in remote_queries.items():
-            remote_result = yield self.federation.query_client_keys(
-                destination, {"device_keys": device_keys}
-            )
-            for user_id, keys in remote_result["device_keys"].items():
-                if user_id in device_keys:
-                    json_result[user_id] = keys
-        defer.returnValue((200, {"device_keys": json_result}))
-
 
 class OneTimeKeyServlet(RestServlet):
     """
diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py
index 74c64f1371..4060593f7f 100644
--- a/synapse/rest/media/v1/preview_url_resource.py
+++ b/synapse/rest/media/v1/preview_url_resource.py
@@ -29,6 +29,8 @@ from synapse.http.server import (
 from synapse.util.async import ObservableDeferred
 from synapse.util.stringutils import is_ascii
 
+from copy import deepcopy
+
 import os
 import re
 import fnmatch
@@ -329,20 +331,23 @@ class PreviewUrlResource(Resource):
                 # ...or if they are within a <script/> or <style/> tag.
                 # This is a very very very coarse approximation to a plain text
                 # render of the page.
-                text_nodes = tree.xpath("//text()[not(ancestor::header | ancestor::nav | "
-                                        "ancestor::aside | ancestor::footer | "
-                                        "ancestor::script | ancestor::style)]" +
-                                        "[ancestor::body]")
-                text = ''
-                for text_node in text_nodes:
-                    if len(text) < 500:
-                        text += text_node + ' '
-                    else:
-                        break
-                text = re.sub(r'[\t ]+', ' ', text)
-                text = re.sub(r'[\t \r\n]*[\r\n]+', '\n', text)
-                text = text.strip()[:500]
-                og['og:description'] = text if text else None
+
+                # We don't just use XPATH here as that is slow on some machines.
+
+                # We clone `tree` as we modify it.
+                cloned_tree = deepcopy(tree.find("body"))
+
+                TAGS_TO_REMOVE = ("header", "nav", "aside", "footer", "script", "style",)
+                for el in cloned_tree.iter(TAGS_TO_REMOVE):
+                    el.getparent().remove(el)
+
+                # Split all the text nodes into paragraphs (by splitting on new
+                # lines)
+                text_nodes = (
+                    re.sub(r'\s+', '\n', el.text).strip()
+                    for el in cloned_tree.iter() if el.text
+                )
+                og['og:description'] = summarize_paragraphs(text_nodes)
 
         # TODO: delete the url downloads to stop diskfilling,
         # as we only ever cared about its OG
@@ -450,3 +455,56 @@ class PreviewUrlResource(Resource):
             content_type.startswith("application/xhtml")
         ):
             return True
+
+
+def summarize_paragraphs(text_nodes, min_size=200, max_size=500):
+    # Try to get a summary of between 200 and 500 words, respecting
+    # first paragraph and then word boundaries.
+    # TODO: Respect sentences?
+
+    description = ''
+
+    # Keep adding paragraphs until we get to the MIN_SIZE.
+    for text_node in text_nodes:
+        if len(description) < min_size:
+            text_node = re.sub(r'[\t \r\n]+', ' ', text_node)
+            description += text_node + '\n\n'
+        else:
+            break
+
+    description = description.strip()
+    description = re.sub(r'[\t ]+', ' ', description)
+    description = re.sub(r'[\t \r\n]*[\r\n]+', '\n\n', description)
+
+    # If the concatenation of paragraphs to get above MIN_SIZE
+    # took us over MAX_SIZE, then we need to truncate mid paragraph
+    if len(description) > max_size:
+        new_desc = ""
+
+        # This splits the paragraph into words, but keeping the
+        # (preceeding) whitespace intact so we can easily concat
+        # words back together.
+        for match in re.finditer("\s*\S+", description):
+            word = match.group()
+
+            # Keep adding words while the total length is less than
+            # MAX_SIZE.
+            if len(word) + len(new_desc) < max_size:
+                new_desc += word
+            else:
+                # At this point the next word *will* take us over
+                # MAX_SIZE, but we also want to ensure that its not
+                # a huge word. If it is add it anyway and we'll
+                # truncate later.
+                if len(new_desc) < min_size:
+                    new_desc += word
+                break
+
+        # Double check that we're not over the limit
+        if len(new_desc) > max_size:
+            new_desc = new_desc[:max_size]
+
+        # We always add an ellipsis because at the very least
+        # we chopped mid paragraph.
+        description = new_desc.strip() + "…"
+    return description if description else None
diff --git a/synapse/server.py b/synapse/server.py
index e8b166990d..6bb4988309 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -19,39 +19,38 @@
 # partial one for unit test mocking.
 
 # Imports required for the default HomeServer() implementation
-from twisted.web.client import BrowserLikePolicyForHTTPS
+import logging
+
 from twisted.enterprise import adbapi
+from twisted.web.client import BrowserLikePolicyForHTTPS
 
-from synapse.appservice.scheduler import ApplicationServiceScheduler
+from synapse.api.auth import Auth
+from synapse.api.filtering import Filtering
+from synapse.api.ratelimiting import Ratelimiter
 from synapse.appservice.api import ApplicationServiceApi
+from synapse.appservice.scheduler import ApplicationServiceScheduler
+from synapse.crypto.keyring import Keyring
+from synapse.events.builder import EventBuilderFactory
 from synapse.federation import initialize_http_replication
-from synapse.handlers.device import DeviceHandler
-from synapse.http.client import SimpleHttpClient, InsecureInterceptableContextFactory
-from synapse.notifier import Notifier
-from synapse.api.auth import Auth
 from synapse.handlers import Handlers
+from synapse.handlers.appservice import ApplicationServicesHandler
+from synapse.handlers.auth import AuthHandler
+from synapse.handlers.device import DeviceHandler
+from synapse.handlers.e2e_keys import E2eKeysHandler
 from synapse.handlers.presence import PresenceHandler
+from synapse.handlers.room import RoomListHandler
 from synapse.handlers.sync import SyncHandler
 from synapse.handlers.typing import TypingHandler
-from synapse.handlers.room import RoomListHandler
-from synapse.handlers.auth import AuthHandler
-from synapse.handlers.appservice import ApplicationServicesHandler
+from synapse.http.client import SimpleHttpClient, InsecureInterceptableContextFactory
+from synapse.http.matrixfederationclient import MatrixFederationHttpClient
+from synapse.notifier import Notifier
+from synapse.push.pusherpool import PusherPool
+from synapse.rest.media.v1.media_repository import MediaRepository
 from synapse.state import StateHandler
 from synapse.storage import DataStore
+from synapse.streams.events import EventSources
 from synapse.util import Clock
 from synapse.util.distributor import Distributor
-from synapse.streams.events import EventSources
-from synapse.api.ratelimiting import Ratelimiter
-from synapse.crypto.keyring import Keyring
-from synapse.push.pusherpool import PusherPool
-from synapse.events.builder import EventBuilderFactory
-from synapse.api.filtering import Filtering
-from synapse.rest.media.v1.media_repository import MediaRepository
-
-from synapse.http.matrixfederationclient import MatrixFederationHttpClient
-
-import logging
-
 
 logger = logging.getLogger(__name__)
 
@@ -94,6 +93,7 @@ class HomeServer(object):
         'room_list_handler',
         'auth_handler',
         'device_handler',
+        'e2e_keys_handler',
         'application_service_api',
         'application_service_scheduler',
         'application_service_handler',
@@ -202,6 +202,9 @@ class HomeServer(object):
     def build_device_handler(self):
         return DeviceHandler(self)
 
+    def build_e2e_keys_handler(self):
+        return E2eKeysHandler(self)
+
     def build_application_service_api(self):
         return ApplicationServiceApi(self)
 
diff --git a/synapse/server.pyi b/synapse/server.pyi
index 902f725c06..c0aa868c4f 100644
--- a/synapse/server.pyi
+++ b/synapse/server.pyi
@@ -1,6 +1,7 @@
 import synapse.handlers
 import synapse.handlers.auth
 import synapse.handlers.device
+import synapse.handlers.e2e_keys
 import synapse.storage
 import synapse.state
 
@@ -14,6 +15,9 @@ class HomeServer(object):
     def get_device_handler(self) -> synapse.handlers.device.DeviceHandler:
         pass
 
+    def get_e2e_keys_handler(self) -> synapse.handlers.e2e_keys.E2eKeysHandler:
+        pass
+
     def get_handlers(self) -> synapse.handlers.Handlers:
         pass
 
diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py
index 62b7790e91..385d607056 100644
--- a/synapse/storage/end_to_end_keys.py
+++ b/synapse/storage/end_to_end_keys.py
@@ -12,6 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import collections
 
 import twisted.internet.defer
 
@@ -38,24 +39,49 @@ class EndToEndKeyStore(SQLBaseStore):
             query_list(list): List of pairs of user_ids and device_ids.
         Returns:
             Dict mapping from user-id to dict mapping from device_id to
-            key json byte strings.
+            dict containing "key_json", "device_display_name".
         """
-        def _get_e2e_device_keys(txn):
-            result = {}
-            for user_id, device_id in query_list:
-                user_result = result.setdefault(user_id, {})
-                keyvalues = {"user_id": user_id}
-                if device_id:
-                    keyvalues["device_id"] = device_id
-                rows = self._simple_select_list_txn(
-                    txn, table="e2e_device_keys_json",
-                    keyvalues=keyvalues,
-                    retcols=["device_id", "key_json"]
-                )
-                for row in rows:
-                    user_result[row["device_id"]] = row["key_json"]
-            return result
-        return self.runInteraction("get_e2e_device_keys", _get_e2e_device_keys)
+        if not query_list:
+            return {}
+
+        return self.runInteraction(
+            "get_e2e_device_keys", self._get_e2e_device_keys_txn, query_list
+        )
+
+    def _get_e2e_device_keys_txn(self, txn, query_list):
+        query_clauses = []
+        query_params = []
+
+        for (user_id, device_id) in query_list:
+            query_clause = "k.user_id = ?"
+            query_params.append(user_id)
+
+            if device_id:
+                query_clause += " AND k.device_id = ?"
+                query_params.append(device_id)
+
+            query_clauses.append(query_clause)
+
+        sql = (
+            "SELECT k.user_id, k.device_id, "
+            "    d.display_name AS device_display_name, "
+            "    k.key_json"
+            " FROM e2e_device_keys_json k"
+            "    LEFT JOIN devices d ON d.user_id = k.user_id"
+            "      AND d.device_id = k.device_id"
+            " WHERE %s"
+        ) % (
+            " OR ".join("(" + q + ")" for q in query_clauses)
+        )
+
+        txn.execute(sql, query_params)
+        rows = self.cursor_to_dict(txn)
+
+        result = collections.defaultdict(dict)
+        for row in rows:
+            result[row["user_id"]][row["device_id"]] = row
+
+        return result
 
     def add_e2e_one_time_keys(self, user_id, device_id, time_now, key_list):
         def _add_e2e_one_time_keys(txn):
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index c63ca36df6..e4dbaa3547 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -26,7 +26,8 @@ from synapse.api.constants import EventTypes
 from synapse.api.errors import SynapseError
 
 from canonicaljson import encode_canonical_json
-from collections import deque, namedtuple
+from collections import deque, namedtuple, OrderedDict
+from functools import wraps
 
 import synapse
 import synapse.metrics
@@ -150,6 +151,26 @@ class _EventPeristenceQueue(object):
 _EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event"))
 
 
+def _retry_on_integrity_error(func):
+    """Wraps a database function so that it gets retried on IntegrityError,
+    with `delete_existing=True` passed in.
+
+    Args:
+        func: function that returns a Deferred and accepts a `delete_existing` arg
+    """
+    @wraps(func)
+    @defer.inlineCallbacks
+    def f(self, *args, **kwargs):
+        try:
+            res = yield func(self, *args, **kwargs)
+        except self.database_engine.module.IntegrityError:
+            logger.exception("IntegrityError, retrying.")
+            res = yield func(self, *args, delete_existing=True, **kwargs)
+        defer.returnValue(res)
+
+    return f
+
+
 class EventsStore(SQLBaseStore):
     EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
     EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"
@@ -229,8 +250,10 @@ class EventsStore(SQLBaseStore):
 
         self._event_persist_queue.handle_queue(room_id, persisting_queue)
 
+    @_retry_on_integrity_error
     @defer.inlineCallbacks
-    def _persist_events(self, events_and_contexts, backfilled=False):
+    def _persist_events(self, events_and_contexts, backfilled=False,
+                        delete_existing=False):
         if not events_and_contexts:
             return
 
@@ -273,12 +296,15 @@ class EventsStore(SQLBaseStore):
                         self._persist_events_txn,
                         events_and_contexts=chunk,
                         backfilled=backfilled,
+                        delete_existing=delete_existing,
                     )
                     persist_event_counter.inc_by(len(chunk))
 
+    @_retry_on_integrity_error
     @defer.inlineCallbacks
     @log_function
-    def _persist_event(self, event, context, current_state=None, backfilled=False):
+    def _persist_event(self, event, context, current_state=None, backfilled=False,
+                       delete_existing=False):
         try:
             with self._stream_id_gen.get_next() as stream_ordering:
                 with self._state_groups_id_gen.get_next() as state_group_id:
@@ -291,6 +317,7 @@ class EventsStore(SQLBaseStore):
                         context=context,
                         current_state=current_state,
                         backfilled=backfilled,
+                        delete_existing=delete_existing,
                     )
                     persist_event_counter.inc()
         except _RollbackButIsFineException:
@@ -353,7 +380,8 @@ class EventsStore(SQLBaseStore):
         defer.returnValue({e.event_id: e for e in events})
 
     @log_function
-    def _persist_event_txn(self, txn, event, context, current_state, backfilled=False):
+    def _persist_event_txn(self, txn, event, context, current_state, backfilled=False,
+                           delete_existing=False):
         # We purposefully do this first since if we include a `current_state`
         # key, we *want* to update the `current_state_events` table
         if current_state:
@@ -393,16 +421,38 @@ class EventsStore(SQLBaseStore):
             txn,
             [(event, context)],
             backfilled=backfilled,
+            delete_existing=delete_existing,
         )
 
     @log_function
-    def _persist_events_txn(self, txn, events_and_contexts, backfilled):
+    def _persist_events_txn(self, txn, events_and_contexts, backfilled,
+                            delete_existing=False):
         """Insert some number of room events into the necessary database tables.
 
         Rejected events are only inserted into the events table, the events_json table,
         and the rejections table. Things reading from those table will need to check
         whether the event was rejected.
+
+        If delete_existing is True then existing events will be purged from the
+        database before insertion. This is useful when retrying due to IntegrityError.
         """
+        # Ensure that we don't have the same event twice.
+        # Pick the earliest non-outlier if there is one, else the earliest one.
+        new_events_and_contexts = OrderedDict()
+        for event, context in events_and_contexts:
+            prev_event_context = new_events_and_contexts.get(event.event_id)
+            if prev_event_context:
+                if not event.internal_metadata.is_outlier():
+                    if prev_event_context[0].internal_metadata.is_outlier():
+                        # To ensure correct ordering we pop, as OrderedDict is
+                        # ordered by first insertion.
+                        new_events_and_contexts.pop(event.event_id, None)
+                        new_events_and_contexts[event.event_id] = (event, context)
+            else:
+                new_events_and_contexts[event.event_id] = (event, context)
+
+        events_and_contexts = new_events_and_contexts.values()
+
         depth_updates = {}
         for event, context in events_and_contexts:
             # Remove the any existing cache entries for the event_ids
@@ -433,8 +483,6 @@ class EventsStore(SQLBaseStore):
             for event_id, outlier in txn.fetchall()
         }
 
-        # Remove the events that we've seen before.
-        event_map = {}
         to_remove = set()
         for event, context in events_and_contexts:
             if context.rejected:
@@ -445,23 +493,6 @@ class EventsStore(SQLBaseStore):
                     to_remove.add(event)
                 continue
 
-            # Handle the case of the list including the same event multiple
-            # times. The tricky thing here is when they differ by whether
-            # they are an outlier.
-            if event.event_id in event_map:
-                other = event_map[event.event_id]
-
-                if not other.internal_metadata.is_outlier():
-                    to_remove.add(event)
-                    continue
-                elif not event.internal_metadata.is_outlier():
-                    to_remove.add(event)
-                    continue
-                else:
-                    to_remove.add(other)
-
-            event_map[event.event_id] = event
-
             if event.event_id not in have_persisted:
                 continue
 
@@ -539,6 +570,43 @@ class EventsStore(SQLBaseStore):
                 ]
             }
 
+        if delete_existing:
+            # For paranoia reasons, we go and delete all the existing entries
+            # for these events so we can reinsert them.
+            # This gets around any problems with some tables already having
+            # entries.
+
+            logger.info("Deleting existing")
+
+            for table in (
+                "events",
+                "event_auth",
+                "event_json",
+                "event_content_hashes",
+                "event_destinations",
+                "event_edge_hashes",
+                "event_edges",
+                "event_forward_extremities",
+                "event_push_actions",
+                "event_reference_hashes",
+                "event_search",
+                "event_signatures",
+                "event_to_state_groups",
+                "guest_access",
+                "history_visibility",
+                "local_invites",
+                "room_names",
+                "state_events",
+                "rejections",
+                "redactions",
+                "room_memberships",
+                "state_events"
+            ):
+                txn.executemany(
+                    "DELETE FROM %s WHERE event_id = ?" % (table,),
+                    [(ev.event_id,) for ev, _ in events_and_contexts]
+                )
+
         self._simple_insert_many_txn(
             txn,
             table="event_json",
diff --git a/synapse/storage/schema/delta/33/devices_for_e2e_keys.sql b/synapse/storage/schema/delta/33/devices_for_e2e_keys.sql
index 140f2b63e0..aa4a3b9f2f 100644
--- a/synapse/storage/schema/delta/33/devices_for_e2e_keys.sql
+++ b/synapse/storage/schema/delta/33/devices_for_e2e_keys.sql
@@ -16,4 +16,4 @@
 -- make sure that we have a device record for each set of E2E keys, so that the
 -- user can delete them if they like.
 INSERT INTO devices
-    SELECT user_id, device_id, 'unknown device' FROM e2e_device_keys_json;
+    SELECT user_id, device_id, NULL FROM e2e_device_keys_json;
diff --git a/synapse/storage/schema/delta/33/devices_for_e2e_keys_clear_unknown_device.sql b/synapse/storage/schema/delta/33/devices_for_e2e_keys_clear_unknown_device.sql
new file mode 100644
index 0000000000..6671573398
--- /dev/null
+++ b/synapse/storage/schema/delta/33/devices_for_e2e_keys_clear_unknown_device.sql
@@ -0,0 +1,20 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- a previous version of the "devices_for_e2e_keys" delta set all the device
+-- names to "unknown device". This wasn't terribly helpful
+UPDATE devices
+    SET display_name = NULL
+    WHERE display_name = 'unknown device';