summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/__init__.py2
-rw-r--r--synapse/crypto/keyclient.py12
-rw-r--r--synapse/handlers/device.py4
-rw-r--r--synapse/handlers/federation.py35
-rw-r--r--synapse/push/emailpusher.py5
-rw-r--r--synapse/push/httppusher.py3
-rw-r--r--synapse/push/push_tools.py2
-rw-r--r--synapse/replication/slave/storage/events.py7
-rw-r--r--synapse/rest/client/v2_alpha/keys.py48
-rw-r--r--synapse/rest/client/versions.py6
-rw-r--r--synapse/storage/end_to_end_keys.py15
-rw-r--r--synapse/storage/event_push_actions.py203
-rw-r--r--synapse/storage/schema/delta/33/devices_for_e2e_keys.sql19
-rw-r--r--synapse/util/presentable_names.py5
-rw-r--r--synapse/util/retryutils.py2
15 files changed, 304 insertions, 64 deletions
diff --git a/synapse/__init__.py b/synapse/__init__.py
index 2750ad3f7a..8f0176e182 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -16,4 +16,4 @@
 """ This is a reference implementation of a Matrix home server.
 """
 
-__version__ = "0.16.1-r1"
+__version__ = "0.17.0-rc1"
diff --git a/synapse/crypto/keyclient.py b/synapse/crypto/keyclient.py
index 54b83da9d8..c2bd64d6c2 100644
--- a/synapse/crypto/keyclient.py
+++ b/synapse/crypto/keyclient.py
@@ -77,10 +77,12 @@ class SynapseKeyClientProtocol(HTTPClient):
     def __init__(self):
         self.remote_key = defer.Deferred()
         self.host = None
+        self._peer = None
 
     def connectionMade(self):
-        self.host = self.transport.getHost()
-        logger.debug("Connected to %s", self.host)
+        self._peer = self.transport.getPeer()
+        logger.debug("Connected to %s", self._peer)
+
         self.sendCommand(b"GET", self.path)
         if self.host:
             self.sendHeader(b"Host", self.host)
@@ -124,7 +126,10 @@ class SynapseKeyClientProtocol(HTTPClient):
         self.timer.cancel()
 
     def on_timeout(self):
-        logger.debug("Timeout waiting for response from %s", self.host)
+        logger.debug(
+            "Timeout waiting for response from %s: %s",
+            self.host, self._peer,
+        )
         self.errback(IOError("Timeout waiting for response"))
         self.transport.abortConnection()
 
@@ -133,4 +138,5 @@ class SynapseKeyClientFactory(Factory):
     def protocol(self):
         protocol = SynapseKeyClientProtocol()
         protocol.path = self.path
+        protocol.host = self.host
         return protocol
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index eaead50800..f4bf159bb5 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -143,6 +143,10 @@ class DeviceHandler(BaseHandler):
             delete_refresh_tokens=True,
         )
 
+        yield self.store.delete_e2e_keys_by_device(
+            user_id=user_id, device_id=device_id
+        )
+
     @defer.inlineCallbacks
     def update_device(self, user_id, device_id, content):
         """ Update the given device
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 3f138daf17..1323235b62 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -124,7 +124,7 @@ class FederationHandler(BaseHandler):
 
             try:
                 event_stream_id, max_stream_id = yield self._persist_auth_tree(
-                    auth_chain, state, event
+                    origin, auth_chain, state, event
                 )
             except AuthError as e:
                 raise FederationError(
@@ -637,7 +637,7 @@ class FederationHandler(BaseHandler):
                 pass
 
             event_stream_id, max_stream_id = yield self._persist_auth_tree(
-                auth_chain, state, event
+                origin, auth_chain, state, event
             )
 
             with PreserveLoggingContext():
@@ -1155,11 +1155,19 @@ class FederationHandler(BaseHandler):
         )
 
     @defer.inlineCallbacks
-    def _persist_auth_tree(self, auth_events, state, event):
+    def _persist_auth_tree(self, origin, auth_events, state, event):
         """Checks the auth chain is valid (and passes auth checks) for the
         state and event. Then persists the auth chain and state atomically.
         Persists the event seperately.
 
+        Will attempt to fetch missing auth events.
+
+        Args:
+            origin (str): Where the events came from
+            auth_events (list)
+            state (list)
+            event (Event)
+
         Returns:
             2-tuple of (event_stream_id, max_stream_id) from the persist_event
             call for `event`
@@ -1172,7 +1180,7 @@ class FederationHandler(BaseHandler):
 
         event_map = {
             e.event_id: e
-            for e in auth_events
+            for e in itertools.chain(auth_events, state, [event])
         }
 
         create_event = None
@@ -1181,10 +1189,29 @@ class FederationHandler(BaseHandler):
                 create_event = e
                 break
 
+        missing_auth_events = set()
+        for e in itertools.chain(auth_events, state, [event]):
+            for e_id, _ in e.auth_events:
+                if e_id not in event_map:
+                    missing_auth_events.add(e_id)
+
+        for e_id in missing_auth_events:
+            m_ev = yield self.replication_layer.get_pdu(
+                [origin],
+                e_id,
+                outlier=True,
+                timeout=10000,
+            )
+            if m_ev and m_ev.event_id == e_id:
+                event_map[e_id] = m_ev
+            else:
+                logger.info("Failed to find auth event %r", e_id)
+
         for e in itertools.chain(auth_events, state, [event]):
             auth_for_e = {
                 (event_map[e_id].type, event_map[e_id].state_key): event_map[e_id]
                 for e_id, _ in e.auth_events
+                if e_id in event_map
             }
             if create_event:
                 auth_for_e[(EventTypes.Create, "")] = create_event
diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py
index 12a3ec7fd8..e224b68291 100644
--- a/synapse/push/emailpusher.py
+++ b/synapse/push/emailpusher.py
@@ -140,9 +140,8 @@ class EmailPusher(object):
         being run.
         """
         start = 0 if INCLUDE_ALL_UNREAD_NOTIFS else self.last_stream_ordering
-        unprocessed = yield self.store.get_unread_push_actions_for_user_in_range(
-            self.user_id, start, self.max_stream_ordering
-        )
+        fn = self.store.get_unread_push_actions_for_user_in_range_for_email
+        unprocessed = yield fn(self.user_id, start, self.max_stream_ordering)
 
         soonest_due_at = None
 
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index 2acc6cc214..9a7db61220 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -141,7 +141,8 @@ class HttpPusher(object):
         run once per pusher.
         """
 
-        unprocessed = yield self.store.get_unread_push_actions_for_user_in_range(
+        fn = self.store.get_unread_push_actions_for_user_in_range_for_http
+        unprocessed = yield fn(
             self.user_id, self.last_stream_ordering, self.max_stream_ordering
         )
 
diff --git a/synapse/push/push_tools.py b/synapse/push/push_tools.py
index 6f2d1ad57d..d555a33e9a 100644
--- a/synapse/push/push_tools.py
+++ b/synapse/push/push_tools.py
@@ -54,7 +54,7 @@ def get_context_for_event(state_handler, ev, user_id):
     room_state = yield state_handler.get_current_state(ev.room_id)
 
     # we no longer bother setting room_alias, and make room_name the
-    # human-readable name instead, be that m.room.namer, an alias or
+    # human-readable name instead, be that m.room.name, an alias or
     # a list of people in the room
     name = calculate_room_name(
         room_state, user_id, fallback_to_single_member=False
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index 369d839464..6a644f1386 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -93,8 +93,11 @@ class SlavedEventStore(BaseSlavedStore):
         StreamStore.__dict__["get_recent_event_ids_for_room"]
     )
 
-    get_unread_push_actions_for_user_in_range = (
-        DataStore.get_unread_push_actions_for_user_in_range.__func__
+    get_unread_push_actions_for_user_in_range_for_http = (
+        DataStore.get_unread_push_actions_for_user_in_range_for_http.__func__
+    )
+    get_unread_push_actions_for_user_in_range_for_email = (
+        DataStore.get_unread_push_actions_for_user_in_range_for_email.__func__
     )
     get_push_action_users_in_range = (
         DataStore.get_push_action_users_in_range.__func__
diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py
index 56364af337..dc1d4d8fc6 100644
--- a/synapse/rest/client/v2_alpha/keys.py
+++ b/synapse/rest/client/v2_alpha/keys.py
@@ -19,6 +19,9 @@ import simplejson as json
 from canonicaljson import encode_canonical_json
 from twisted.internet import defer
 
+import synapse.api.errors
+import synapse.server
+import synapse.types
 from synapse.http.servlet import RestServlet, parse_json_object_from_request
 from synapse.types import UserID
 from ._base import client_v2_patterns
@@ -28,7 +31,7 @@ logger = logging.getLogger(__name__)
 
 class KeyUploadServlet(RestServlet):
     """
-    POST /keys/upload/<device_id> HTTP/1.1
+    POST /keys/upload HTTP/1.1
     Content-Type: application/json
 
     {
@@ -51,23 +54,45 @@ class KeyUploadServlet(RestServlet):
       },
     }
     """
-    PATTERNS = client_v2_patterns("/keys/upload/(?P<device_id>[^/]*)", releases=())
+    PATTERNS = client_v2_patterns("/keys/upload(/(?P<device_id>[^/]+))?$",
+                                  releases=())
 
     def __init__(self, hs):
+        """
+        Args:
+            hs (synapse.server.HomeServer): server
+        """
         super(KeyUploadServlet, self).__init__()
         self.store = hs.get_datastore()
         self.clock = hs.get_clock()
         self.auth = hs.get_auth()
+        self.device_handler = hs.get_device_handler()
 
     @defer.inlineCallbacks
     def on_POST(self, request, device_id):
         requester = yield self.auth.get_user_by_req(request)
+
         user_id = requester.user.to_string()
-        # TODO: Check that the device_id matches that in the authentication
-        # or derive the device_id from the authentication instead.
 
         body = parse_json_object_from_request(request)
 
+        if device_id is not None:
+            # passing the device_id here is deprecated; however, we allow it
+            # for now for compatibility with older clients.
+            if (requester.device_id is not None and
+                    device_id != requester.device_id):
+                logger.warning("Client uploading keys for a different device "
+                               "(logged in as %s, uploading for %s)",
+                               requester.device_id, device_id)
+        else:
+            device_id = requester.device_id
+
+        if device_id is None:
+            raise synapse.api.errors.SynapseError(
+                400,
+                "To upload keys, you must pass device_id when authenticating"
+            )
+
         time_now = self.clock.time_msec()
 
         # TODO: Validate the JSON to make sure it has the right keys.
@@ -100,13 +125,14 @@ class KeyUploadServlet(RestServlet):
                 user_id, device_id, time_now, key_list
             )
 
-        result = yield self.store.count_e2e_one_time_keys(user_id, device_id)
-        defer.returnValue((200, {"one_time_key_counts": result}))
-
-    @defer.inlineCallbacks
-    def on_GET(self, request, device_id):
-        requester = yield self.auth.get_user_by_req(request)
-        user_id = requester.user.to_string()
+        # the device should have been registered already, but it may have been
+        # deleted due to a race with a DELETE request. Or we may be using an
+        # old access_token without an associated device_id. Either way, we
+        # need to double-check the device is registered to avoid ending up with
+        # keys without a corresponding device.
+        self.device_handler.check_device_registered(
+            user_id, device_id, "unknown device"
+        )
 
         result = yield self.store.count_e2e_one_time_keys(user_id, device_id)
         defer.returnValue((200, {"one_time_key_counts": result}))
diff --git a/synapse/rest/client/versions.py b/synapse/rest/client/versions.py
index ca5468c402..e984ea47db 100644
--- a/synapse/rest/client/versions.py
+++ b/synapse/rest/client/versions.py
@@ -26,7 +26,11 @@ class VersionsRestServlet(RestServlet):
 
     def on_GET(self, request):
         return (200, {
-            "versions": ["r0.0.1"]
+            "versions": [
+                "r0.0.1",
+                "r0.1.0",
+                "r0.2.0",
+            ]
         })
 
 
diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py
index 2e89066515..62b7790e91 100644
--- a/synapse/storage/end_to_end_keys.py
+++ b/synapse/storage/end_to_end_keys.py
@@ -13,6 +13,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import twisted.internet.defer
+
 from ._base import SQLBaseStore
 
 
@@ -123,3 +125,16 @@ class EndToEndKeyStore(SQLBaseStore):
         return self.runInteraction(
             "claim_e2e_one_time_keys", _claim_e2e_one_time_keys
         )
+
+    @twisted.internet.defer.inlineCallbacks
+    def delete_e2e_keys_by_device(self, user_id, device_id):
+        yield self._simple_delete(
+            table="e2e_device_keys_json",
+            keyvalues={"user_id": user_id, "device_id": device_id},
+            desc="delete_e2e_device_keys_by_device"
+        )
+        yield self._simple_delete(
+            table="e2e_one_time_keys_json",
+            keyvalues={"user_id": user_id, "device_id": device_id},
+            desc="delete_e2e_one_time_keys_by_device"
+        )
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index 3d93285f84..df4000d0da 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -117,24 +117,42 @@ class EventPushActionsStore(SQLBaseStore):
         defer.returnValue(ret)
 
     @defer.inlineCallbacks
-    def get_unread_push_actions_for_user_in_range(self, user_id,
-                                                  min_stream_ordering,
-                                                  max_stream_ordering=None,
-                                                  limit=20):
+    def get_unread_push_actions_for_user_in_range_for_http(
+        self, user_id, min_stream_ordering, max_stream_ordering, limit=20
+    ):
+        """Get a list of the most recent unread push actions for a given user,
+        within the given stream ordering range. Called by the httppusher.
+
+        Args:
+            user_id (str): The user to fetch push actions for.
+            min_stream_ordering(int): The exclusive lower bound on the
+                stream ordering of event push actions to fetch.
+            max_stream_ordering(int): The inclusive upper bound on the
+                stream ordering of event push actions to fetch.
+            limit (int): The maximum number of rows to return.
+        Returns:
+            A promise which resolves to a list of dicts with the keys "event_id",
+            "room_id", "stream_ordering", "actions".
+            The list will be ordered by ascending stream_ordering.
+            The list will have between 0~limit entries.
+        """
+        # find rooms that have a read receipt in them and return the next
+        # push actions
         def get_after_receipt(txn):
+            # find rooms that have a read receipt in them and return the next
+            # push actions
             sql = (
-                "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, "
-                "e.received_ts "
-                "FROM ("
-                "   SELECT room_id, user_id, "
-                "       max(topological_ordering) as topological_ordering, "
-                "       max(stream_ordering) as stream_ordering "
-                "       FROM events"
-                "   NATURAL JOIN receipts_linearized WHERE receipt_type = 'm.read'"
-                "   GROUP BY room_id, user_id"
+                "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions"
+                " FROM ("
+                "   SELECT room_id,"
+                "       MAX(topological_ordering) as topological_ordering,"
+                "       MAX(stream_ordering) as stream_ordering"
+                "   FROM events"
+                "   INNER JOIN receipts_linearized USING (room_id, event_id)"
+                "   WHERE receipt_type = 'm.read' AND user_id = ?"
+                "   GROUP BY room_id"
                 ") AS rl,"
                 " event_push_actions AS ep"
-                " INNER JOIN events AS e USING (room_id, event_id)"
                 " WHERE"
                 "   ep.room_id = rl.room_id"
                 "   AND ("
@@ -144,44 +162,159 @@ class EventPushActionsStore(SQLBaseStore):
                 "           AND ep.stream_ordering > rl.stream_ordering"
                 "       )"
                 "   )"
-                "   AND ep.stream_ordering > ?"
                 "   AND ep.user_id = ?"
-                "   AND ep.user_id = rl.user_id"
+                "   AND ep.stream_ordering > ?"
+                "   AND ep.stream_ordering <= ?"
+                " ORDER BY ep.stream_ordering ASC LIMIT ?"
             )
-            args = [min_stream_ordering, user_id]
-            if max_stream_ordering is not None:
-                sql += " AND ep.stream_ordering <= ?"
-                args.append(max_stream_ordering)
-            sql += " ORDER BY ep.stream_ordering DESC LIMIT ?"
-            args.append(limit)
+            args = [
+                user_id, user_id,
+                min_stream_ordering, max_stream_ordering, limit,
+            ]
             txn.execute(sql, args)
             return txn.fetchall()
         after_read_receipt = yield self.runInteraction(
-            "get_unread_push_actions_for_user_in_range", get_after_receipt
+            "get_unread_push_actions_for_user_in_range_http_arr", get_after_receipt
         )
 
+        # There are rooms with push actions in them but you don't have a read receipt in
+        # them e.g. rooms you've been invited to, so get push actions for rooms which do
+        # not have read receipts in them too.
         def get_no_receipt(txn):
             sql = (
                 "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,"
                 " e.received_ts"
                 " FROM event_push_actions AS ep"
-                " JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id"
-                " WHERE ep.room_id not in ("
-                "   SELECT room_id FROM events NATURAL JOIN receipts_linearized"
+                " INNER JOIN events AS e USING (room_id, event_id)"
+                " WHERE"
+                "   ep.room_id NOT IN ("
+                "     SELECT room_id FROM receipts_linearized"
+                "       WHERE receipt_type = 'm.read' AND user_id = ?"
+                "       GROUP BY room_id"
+                "   )"
+                "   AND ep.user_id = ?"
+                "   AND ep.stream_ordering > ?"
+                "   AND ep.stream_ordering <= ?"
+                " ORDER BY ep.stream_ordering ASC LIMIT ?"
+            )
+            args = [
+                user_id, user_id,
+                min_stream_ordering, max_stream_ordering, limit,
+            ]
+            txn.execute(sql, args)
+            return txn.fetchall()
+        no_read_receipt = yield self.runInteraction(
+            "get_unread_push_actions_for_user_in_range_http_nrr", get_no_receipt
+        )
+
+        notifs = [
+            {
+                "event_id": row[0],
+                "room_id": row[1],
+                "stream_ordering": row[2],
+                "actions": json.loads(row[3]),
+            } for row in after_read_receipt + no_read_receipt
+        ]
+
+        # Now sort it so it's ordered correctly, since currently it will
+        # contain results from the first query, correctly ordered, followed
+        # by results from the second query, but we want them all ordered
+        # by stream_ordering, oldest first.
+        notifs.sort(key=lambda r: r['stream_ordering'])
+
+        # Take only up to the limit. We have to stop at the limit because
+        # one of the subqueries may have hit the limit.
+        defer.returnValue(notifs[:limit])
+
+    @defer.inlineCallbacks
+    def get_unread_push_actions_for_user_in_range_for_email(
+        self, user_id, min_stream_ordering, max_stream_ordering, limit=20
+    ):
+        """Get a list of the most recent unread push actions for a given user,
+        within the given stream ordering range. Called by the emailpusher
+
+        Args:
+            user_id (str): The user to fetch push actions for.
+            min_stream_ordering(int): The exclusive lower bound on the
+                stream ordering of event push actions to fetch.
+            max_stream_ordering(int): The inclusive upper bound on the
+                stream ordering of event push actions to fetch.
+            limit (int): The maximum number of rows to return.
+        Returns:
+            A promise which resolves to a list of dicts with the keys "event_id",
+            "room_id", "stream_ordering", "actions", "received_ts".
+            The list will be ordered by descending received_ts.
+            The list will have between 0~limit entries.
+        """
+        # find rooms that have a read receipt in them and return the most recent
+        # push actions
+        def get_after_receipt(txn):
+            sql = (
+                "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,"
+                "  e.received_ts"
+                " FROM ("
+                "   SELECT room_id,"
+                "       MAX(topological_ordering) as topological_ordering,"
+                "       MAX(stream_ordering) as stream_ordering"
+                "   FROM events"
+                "   INNER JOIN receipts_linearized USING (room_id, event_id)"
                 "   WHERE receipt_type = 'm.read' AND user_id = ?"
                 "   GROUP BY room_id"
-                ") AND ep.user_id = ? AND ep.stream_ordering > ?"
+                ") AS rl,"
+                " event_push_actions AS ep"
+                " INNER JOIN events AS e USING (room_id, event_id)"
+                " WHERE"
+                "   ep.room_id = rl.room_id"
+                "   AND ("
+                "       ep.topological_ordering > rl.topological_ordering"
+                "       OR ("
+                "           ep.topological_ordering = rl.topological_ordering"
+                "           AND ep.stream_ordering > rl.stream_ordering"
+                "       )"
+                "   )"
+                "   AND ep.user_id = ?"
+                "   AND ep.stream_ordering > ?"
+                "   AND ep.stream_ordering <= ?"
+                " ORDER BY ep.stream_ordering DESC LIMIT ?"
+            )
+            args = [
+                user_id, user_id,
+                min_stream_ordering, max_stream_ordering, limit,
+            ]
+            txn.execute(sql, args)
+            return txn.fetchall()
+        after_read_receipt = yield self.runInteraction(
+            "get_unread_push_actions_for_user_in_range_email_arr", get_after_receipt
+        )
+
+        # There are rooms with push actions in them but you don't have a read receipt in
+        # them e.g. rooms you've been invited to, so get push actions for rooms which do
+        # not have read receipts in them too.
+        def get_no_receipt(txn):
+            sql = (
+                "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,"
+                " e.received_ts"
+                " FROM event_push_actions AS ep"
+                " INNER JOIN events AS e USING (room_id, event_id)"
+                " WHERE"
+                "   ep.room_id NOT IN ("
+                "     SELECT room_id FROM receipts_linearized"
+                "       WHERE receipt_type = 'm.read' AND user_id = ?"
+                "       GROUP BY room_id"
+                "   )"
+                "   AND ep.user_id = ?"
+                "   AND ep.stream_ordering > ?"
+                "   AND ep.stream_ordering <= ?"
+                " ORDER BY ep.stream_ordering DESC LIMIT ?"
             )
-            args = [user_id, user_id, min_stream_ordering]
-            if max_stream_ordering is not None:
-                sql += " AND ep.stream_ordering <= ?"
-                args.append(max_stream_ordering)
-            sql += " ORDER BY ep.stream_ordering DESC LIMIT ?"
-            args.append(limit)
+            args = [
+                user_id, user_id,
+                min_stream_ordering, max_stream_ordering, limit,
+            ]
             txn.execute(sql, args)
             return txn.fetchall()
         no_read_receipt = yield self.runInteraction(
-            "get_unread_push_actions_for_user_in_range", get_no_receipt
+            "get_unread_push_actions_for_user_in_range_email_nrr", get_no_receipt
         )
 
         # Make a list of dicts from the two sets of results.
@@ -198,7 +331,7 @@ class EventPushActionsStore(SQLBaseStore):
         # Now sort it so it's ordered correctly, since currently it will
         # contain results from the first query, correctly ordered, followed
         # by results from the second query, but we want them all ordered
-        # by received_ts
+        # by received_ts (most recent first)
         notifs.sort(key=lambda r: -(r['received_ts'] or 0))
 
         # Now return the first `limit`
diff --git a/synapse/storage/schema/delta/33/devices_for_e2e_keys.sql b/synapse/storage/schema/delta/33/devices_for_e2e_keys.sql
new file mode 100644
index 0000000000..140f2b63e0
--- /dev/null
+++ b/synapse/storage/schema/delta/33/devices_for_e2e_keys.sql
@@ -0,0 +1,19 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- make sure that we have a device record for each set of E2E keys, so that the
+-- user can delete them if they like.
+INSERT INTO devices
+    SELECT user_id, device_id, 'unknown device' FROM e2e_device_keys_json;
diff --git a/synapse/util/presentable_names.py b/synapse/util/presentable_names.py
index 4c54812e6f..f68676e9e7 100644
--- a/synapse/util/presentable_names.py
+++ b/synapse/util/presentable_names.py
@@ -83,7 +83,10 @@ def calculate_room_name(room_state, user_id, fallback_to_members=True,
     ):
         if ("m.room.member", my_member_event.sender) in room_state:
             inviter_member_event = room_state[("m.room.member", my_member_event.sender)]
-            return "Invite from %s" % (name_from_member_event(inviter_member_event),)
+            if fallback_to_single_member:
+                return "Invite from %s" % (name_from_member_event(inviter_member_event),)
+            else:
+                return None
         else:
             return "Room Invite"
 
diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py
index 43cf11f3f6..49527f4d21 100644
--- a/synapse/util/retryutils.py
+++ b/synapse/util/retryutils.py
@@ -128,7 +128,7 @@ class RetryDestinationLimiter(object):
             )
 
         valid_err_code = False
-        if exc_type is CodeMessageException:
+        if exc_type is not None and issubclass(exc_type, CodeMessageException):
             valid_err_code = 0 <= exc_val.code < 500
 
         if exc_type is None or valid_err_code: