summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/__init__.py3
-rw-r--r--synapse/api/auth.py2
-rw-r--r--synapse/app/appservice.py5
-rw-r--r--synapse/app/client_reader.py2
-rw-r--r--synapse/app/event_creator.py2
-rw-r--r--synapse/app/federation_reader.py2
-rw-r--r--synapse/app/federation_sender.py2
-rw-r--r--synapse/app/frontend_proxy.py2
-rwxr-xr-xsynapse/app/homeserver.py2
-rw-r--r--synapse/app/media_repository.py2
-rw-r--r--synapse/app/pusher.py2
-rw-r--r--synapse/app/synchrotron.py2
-rwxr-xr-xsynapse/app/synctl.py4
-rw-r--r--synapse/app/user_dir.py2
-rw-r--r--synapse/appservice/__init__.py6
-rw-r--r--synapse/appservice/api.py22
-rw-r--r--synapse/crypto/keyring.py31
-rw-r--r--synapse/event_auth.py109
-rw-r--r--synapse/federation/send_queue.py63
-rw-r--r--synapse/federation/transaction_queue.py4
-rw-r--r--synapse/handlers/auth.py17
-rw-r--r--synapse/handlers/deactivate_account.py27
-rw-r--r--synapse/handlers/federation.py6
-rw-r--r--synapse/handlers/identity.py56
-rw-r--r--synapse/handlers/message.py4
-rw-r--r--synapse/handlers/register.py5
-rw-r--r--synapse/http/__init__.py13
-rw-r--r--synapse/http/client.py9
-rw-r--r--synapse/http/matrixfederationclient.py25
-rw-r--r--synapse/http/request_metrics.py10
-rw-r--r--synapse/http/site.py9
-rw-r--r--synapse/metrics/__init__.py23
-rw-r--r--synapse/push/pusherpool.py3
-rw-r--r--synapse/python_dependencies.py4
-rw-r--r--synapse/rest/client/v1/admin.py12
-rw-r--r--synapse/rest/client/v1/register.py7
-rw-r--r--synapse/rest/client/v2_alpha/account.py20
-rw-r--r--synapse/rest/client/v2_alpha/register.py3
-rw-r--r--synapse/state.py4
-rw-r--r--synapse/storage/registration.py9
-rw-r--r--synapse/storage/roommember.py26
-rw-r--r--synapse/storage/state.py6
-rw-r--r--synapse/util/async.py10
-rw-r--r--synapse/util/caches/__init__.py10
-rw-r--r--synapse/util/caches/descriptors.py4
-rw-r--r--synapse/util/caches/stream_change_cache.py57
46 files changed, 393 insertions, 255 deletions
diff --git a/synapse/__init__.py b/synapse/__init__.py
index 78fc63aa49..faa183a99e 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -16,4 +17,4 @@
 """ This is a reference implementation of a Matrix home server.
 """
 
-__version__ = "0.31.1"
+__version__ = "0.31.2"
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index 06fa38366d..66639b0089 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -655,7 +655,7 @@ class Auth(object):
             auth_events[(EventTypes.PowerLevels, "")] = power_level_event
 
         send_level = event_auth.get_send_level(
-            EventTypes.Aliases, "", auth_events
+            EventTypes.Aliases, "", power_level_event,
         )
         user_level = event_auth.get_user_power_level(user_id, auth_events)
 
diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py
index dd114dee07..4319ddce03 100644
--- a/synapse/app/appservice.py
+++ b/synapse/app/appservice.py
@@ -23,6 +23,7 @@ from synapse.config._base import ConfigError
 from synapse.config.homeserver import HomeServerConfig
 from synapse.config.logger import setup_logging
 from synapse.http.site import SynapseSite
+from synapse.metrics import RegistryProxy
 from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
 from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
 from synapse.replication.slave.storage.directory import DirectoryStore
@@ -62,7 +63,7 @@ class AppserviceServer(HomeServer):
         for res in listener_config["resources"]:
             for name in res["names"]:
                 if name == "metrics":
-                    resources[METRICS_PREFIX] = MetricsResource(self)
+                    resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
 
         root_resource = create_resource_tree(resources, NoResource())
 
@@ -97,7 +98,7 @@ class AppserviceServer(HomeServer):
             elif listener["type"] == "metrics":
                 if not self.get_config().enable_metrics:
                     logger.warn(("Metrics listener configured, but "
-                                 "collect_metrics is not enabled!"))
+                                 "enable_metrics is not True!"))
                 else:
                     _base.listen_metrics(listener["bind_addresses"],
                                          listener["port"])
diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py
index 85dada7f9f..654ddb8414 100644
--- a/synapse/app/client_reader.py
+++ b/synapse/app/client_reader.py
@@ -122,7 +122,7 @@ class ClientReaderServer(HomeServer):
             elif listener["type"] == "metrics":
                 if not self.get_config().enable_metrics:
                     logger.warn(("Metrics listener configured, but "
-                                 "collect_metrics is not enabled!"))
+                                 "enable_metrics is not True!"))
                 else:
                     _base.listen_metrics(listener["bind_addresses"],
                                          listener["port"])
diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py
index 5ca77c0f1a..441467093a 100644
--- a/synapse/app/event_creator.py
+++ b/synapse/app/event_creator.py
@@ -138,7 +138,7 @@ class EventCreatorServer(HomeServer):
             elif listener["type"] == "metrics":
                 if not self.get_config().enable_metrics:
                     logger.warn(("Metrics listener configured, but "
-                                 "collect_metrics is not enabled!"))
+                                 "enable_metrics is not True!"))
                 else:
                     _base.listen_metrics(listener["bind_addresses"],
                                          listener["port"])
diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py
index 2a1995d0cd..b2415cc671 100644
--- a/synapse/app/federation_reader.py
+++ b/synapse/app/federation_reader.py
@@ -111,7 +111,7 @@ class FederationReaderServer(HomeServer):
             elif listener["type"] == "metrics":
                 if not self.get_config().enable_metrics:
                     logger.warn(("Metrics listener configured, but "
-                                 "collect_metrics is not enabled!"))
+                                 "enable_metrics is not True!"))
                 else:
                     _base.listen_metrics(listener["bind_addresses"],
                                          listener["port"])
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 81ad574043..13d2b70053 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -125,7 +125,7 @@ class FederationSenderServer(HomeServer):
             elif listener["type"] == "metrics":
                 if not self.get_config().enable_metrics:
                     logger.warn(("Metrics listener configured, but "
-                                 "collect_metrics is not enabled!"))
+                                 "enable_metrics is not True!"))
                 else:
                     _base.listen_metrics(listener["bind_addresses"],
                                          listener["port"])
diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py
index 5a164a7a95..d2bae4ad03 100644
--- a/synapse/app/frontend_proxy.py
+++ b/synapse/app/frontend_proxy.py
@@ -176,7 +176,7 @@ class FrontendProxyServer(HomeServer):
             elif listener["type"] == "metrics":
                 if not self.get_config().enable_metrics:
                     logger.warn(("Metrics listener configured, but "
-                                 "collect_metrics is not enabled!"))
+                                 "enable_metrics is not True!"))
                 else:
                     _base.listen_metrics(listener["bind_addresses"],
                                          listener["port"])
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 714f98a3e0..f855925fc8 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -266,7 +266,7 @@ class SynapseHomeServer(HomeServer):
             elif listener["type"] == "metrics":
                 if not self.get_config().enable_metrics:
                     logger.warn(("Metrics listener configured, but "
-                                 "collect_metrics is not enabled!"))
+                                 "enable_metrics is not True!"))
                 else:
                     _base.listen_metrics(listener["bind_addresses"],
                                          listener["port"])
diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py
index 006bba80a8..19a682cce3 100644
--- a/synapse/app/media_repository.py
+++ b/synapse/app/media_repository.py
@@ -118,7 +118,7 @@ class MediaRepositoryServer(HomeServer):
             elif listener["type"] == "metrics":
                 if not self.get_config().enable_metrics:
                     logger.warn(("Metrics listener configured, but "
-                                 "collect_metrics is not enabled!"))
+                                 "enable_metrics is not True!"))
                 else:
                     _base.listen_metrics(listener["bind_addresses"],
                                          listener["port"])
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index 64df47f9cc..13cfbd08b0 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -128,7 +128,7 @@ class PusherServer(HomeServer):
             elif listener["type"] == "metrics":
                 if not self.get_config().enable_metrics:
                     logger.warn(("Metrics listener configured, but "
-                                 "collect_metrics is not enabled!"))
+                                 "enable_metrics is not True!"))
                 else:
                     _base.listen_metrics(listener["bind_addresses"],
                                          listener["port"])
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 6808d6d3e0..82f06ea185 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -305,7 +305,7 @@ class SynchrotronServer(HomeServer):
             elif listener["type"] == "metrics":
                 if not self.get_config().enable_metrics:
                     logger.warn(("Metrics listener configured, but "
-                                 "collect_metrics is not enabled!"))
+                                 "enable_metrics is not True!"))
                 else:
                     _base.listen_metrics(listener["bind_addresses"],
                                          listener["port"])
diff --git a/synapse/app/synctl.py b/synapse/app/synctl.py
index 712dfa870e..56ae086128 100755
--- a/synapse/app/synctl.py
+++ b/synapse/app/synctl.py
@@ -171,6 +171,10 @@ def main():
     if cache_factor:
         os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor)
 
+    cache_factors = config.get("synctl_cache_factors", {})
+    for cache_name, factor in cache_factors.iteritems():
+        os.environ["SYNAPSE_CACHE_FACTOR_" + cache_name.upper()] = str(factor)
+
     worker_configfiles = []
     if options.worker:
         start_stop_synapse = False
diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py
index ada1c13cec..f5726e3df6 100644
--- a/synapse/app/user_dir.py
+++ b/synapse/app/user_dir.py
@@ -150,7 +150,7 @@ class UserDirectoryServer(HomeServer):
             elif listener["type"] == "metrics":
                 if not self.get_config().enable_metrics:
                     logger.warn(("Metrics listener configured, but "
-                                 "collect_metrics is not enabled!"))
+                                 "enable_metrics is not True!"))
                 else:
                     _base.listen_metrics(listener["bind_addresses"],
                                          listener["port"])
diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py
index 5fdb579723..d1c598622a 100644
--- a/synapse/appservice/__init__.py
+++ b/synapse/appservice/__init__.py
@@ -292,4 +292,8 @@ class ApplicationService(object):
         return self.rate_limited
 
     def __str__(self):
-        return "ApplicationService: %s" % (self.__dict__,)
+        # copy dictionary and redact token fields so they don't get logged
+        dict_copy = self.__dict__.copy()
+        dict_copy["token"] = "<redacted>"
+        dict_copy["hs_token"] = "<redacted>"
+        return "ApplicationService: %s" % (dict_copy,)
diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py
index 00efff1464..47251fb6ad 100644
--- a/synapse/appservice/api.py
+++ b/synapse/appservice/api.py
@@ -24,8 +24,27 @@ from synapse.types import ThirdPartyInstanceID
 import logging
 import urllib
 
+from prometheus_client import Counter
+
 logger = logging.getLogger(__name__)
 
+sent_transactions_counter = Counter(
+    "synapse_appservice_api_sent_transactions",
+    "Number of /transactions/ requests sent",
+    ["service"]
+)
+
+failed_transactions_counter = Counter(
+    "synapse_appservice_api_failed_transactions",
+    "Number of /transactions/ requests that failed to send",
+    ["service"]
+)
+
+sent_events_counter = Counter(
+    "synapse_appservice_api_sent_events",
+    "Number of events sent to the AS",
+    ["service"]
+)
 
 HOUR_IN_MS = 60 * 60 * 1000
 
@@ -219,12 +238,15 @@ class ApplicationServiceApi(SimpleHttpClient):
                 args={
                     "access_token": service.hs_token
                 })
+            sent_transactions_counter.labels(service.id).inc()
+            sent_events_counter.labels(service.id).inc(len(events))
             defer.returnValue(True)
             return
         except CodeMessageException as e:
             logger.warning("push_bulk to %s received %s", uri, e.code)
         except Exception as ex:
             logger.warning("push_bulk to %s threw exception %s", uri, ex)
+        failed_transactions_counter.labels(service.id).inc()
         defer.returnValue(False)
 
     def _serialize(self, events):
diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py
index 22ee0fc93f..9b17ef0a08 100644
--- a/synapse/crypto/keyring.py
+++ b/synapse/crypto/keyring.py
@@ -27,10 +27,12 @@ from synapse.util.metrics import Measure
 from twisted.internet import defer
 
 from signedjson.sign import (
-    verify_signed_json, signature_ids, sign_json, encode_canonical_json
+    verify_signed_json, signature_ids, sign_json, encode_canonical_json,
+    SignatureVerifyException,
 )
 from signedjson.key import (
-    is_signing_algorithm_supported, decode_verify_key_bytes
+    is_signing_algorithm_supported, decode_verify_key_bytes,
+    encode_verify_key_base64,
 )
 from unpaddedbase64 import decode_base64, encode_base64
 
@@ -56,7 +58,7 @@ Attributes:
     key_ids(set(str)): The set of key_ids to that could be used to verify the
         JSON object
     json_object(dict): The JSON object to verify.
-    deferred(twisted.internet.defer.Deferred):
+    deferred(Deferred[str, str, nacl.signing.VerifyKey]):
         A deferred (server_name, key_id, verify_key) tuple that resolves when
         a verify key has been fetched. The deferreds' callbacks are run with no
         logcontext.
@@ -736,6 +738,17 @@ class Keyring(object):
 
 @defer.inlineCallbacks
 def _handle_key_deferred(verify_request):
+    """Waits for the key to become available, and then performs a verification
+
+    Args:
+        verify_request (VerifyKeyRequest):
+
+    Returns:
+        Deferred[None]
+
+    Raises:
+        SynapseError if there was a problem performing the verification
+    """
     server_name = verify_request.server_name
     try:
         with PreserveLoggingContext():
@@ -768,11 +781,17 @@ def _handle_key_deferred(verify_request):
     ))
     try:
         verify_signed_json(json_object, server_name, verify_key)
-    except Exception:
+    except SignatureVerifyException as e:
+        logger.debug(
+            "Error verifying signature for %s:%s:%s with key %s: %s",
+            server_name, verify_key.alg, verify_key.version,
+            encode_verify_key_base64(verify_key),
+            str(e),
+        )
         raise SynapseError(
             401,
-            "Invalid signature for server %s with key %s:%s" % (
-                server_name, verify_key.alg, verify_key.version
+            "Invalid signature for server %s with key %s:%s: %s" % (
+                server_name, verify_key.alg, verify_key.version, str(e),
             ),
             Codes.UNAUTHORIZED,
         )
diff --git a/synapse/event_auth.py b/synapse/event_auth.py
index eaf9cecde6..f512d88145 100644
--- a/synapse/event_auth.py
+++ b/synapse/event_auth.py
@@ -34,9 +34,11 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True):
         event: the event being checked.
         auth_events (dict: event-key -> event): the existing room state.
 
+    Raises:
+        AuthError if the checks fail
 
     Returns:
-        True if the auth checks pass.
+         if the auth checks pass.
     """
     if do_size_check:
         _check_size_limits(event)
@@ -71,7 +73,7 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True):
         # Oh, we don't know what the state of the room was, so we
         # are trusting that this is allowed (at least for now)
         logger.warn("Trusting event: %s", event.event_id)
-        return True
+        return
 
     if event.type == EventTypes.Create:
         room_id_domain = get_domain_from_id(event.room_id)
@@ -81,7 +83,8 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True):
                 "Creation event's room_id domain does not match sender's"
             )
         # FIXME
-        return True
+        logger.debug("Allowing! %s", event)
+        return
 
     creation_event = auth_events.get((EventTypes.Create, ""), None)
 
@@ -118,7 +121,8 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True):
                 403,
                 "Alias event's state_key does not match sender's domain"
             )
-        return True
+        logger.debug("Allowing! %s", event)
+        return
 
     if logger.isEnabledFor(logging.DEBUG):
         logger.debug(
@@ -127,14 +131,9 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True):
         )
 
     if event.type == EventTypes.Member:
-        allowed = _is_membership_change_allowed(
-            event, auth_events
-        )
-        if allowed:
-            logger.debug("Allowing! %s", event)
-        else:
-            logger.debug("Denying! %s", event)
-        return allowed
+        _is_membership_change_allowed(event, auth_events)
+        logger.debug("Allowing! %s", event)
+        return
 
     _check_event_sender_in_room(event, auth_events)
 
@@ -153,7 +152,8 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True):
                 )
             )
         else:
-            return True
+            logger.debug("Allowing! %s", event)
+            return
 
     _can_send_event(event, auth_events)
 
@@ -200,7 +200,7 @@ def _is_membership_change_allowed(event, auth_events):
         create = auth_events.get(key)
         if create and event.prev_events[0][0] == create.event_id:
             if create.content["creator"] == event.state_key:
-                return True
+                return
 
     target_user_id = event.state_key
 
@@ -265,13 +265,13 @@ def _is_membership_change_allowed(event, auth_events):
             raise AuthError(
                 403, "%s is banned from the room" % (target_user_id,)
             )
-        return True
+        return
 
     if Membership.JOIN != membership:
         if (caller_invited
                 and Membership.LEAVE == membership
                 and target_user_id == event.user_id):
-            return True
+            return
 
         if not caller_in_room:  # caller isn't joined
             raise AuthError(
@@ -334,8 +334,6 @@ def _is_membership_change_allowed(event, auth_events):
     else:
         raise AuthError(500, "Unknown membership %s" % membership)
 
-    return True
-
 
 def _check_event_sender_in_room(event, auth_events):
     key = (EventTypes.Member, event.user_id, )
@@ -355,35 +353,46 @@ def _check_joined_room(member, user_id, room_id):
         ))
 
 
-def get_send_level(etype, state_key, auth_events):
-    key = (EventTypes.PowerLevels, "", )
-    send_level_event = auth_events.get(key)
-    send_level = None
-    if send_level_event:
-        send_level = send_level_event.content.get("events", {}).get(
-            etype
-        )
-        if send_level is None:
-            if state_key is not None:
-                send_level = send_level_event.content.get(
-                    "state_default", 50
-                )
-            else:
-                send_level = send_level_event.content.get(
-                    "events_default", 0
-                )
+def get_send_level(etype, state_key, power_levels_event):
+    """Get the power level required to send an event of a given type
+
+    The federation spec [1] refers to this as "Required Power Level".
+
+    https://matrix.org/docs/spec/server_server/unstable.html#definitions
 
-    if send_level:
-        send_level = int(send_level)
+    Args:
+        etype (str): type of event
+        state_key (str|None): state_key of state event, or None if it is not
+            a state event.
+        power_levels_event (synapse.events.EventBase|None): power levels event
+            in force at this point in the room
+    Returns:
+        int: power level required to send this event.
+    """
+
+    if power_levels_event:
+        power_levels_content = power_levels_event.content
     else:
-        send_level = 0
+        power_levels_content = {}
+
+    # see if we have a custom level for this event type
+    send_level = power_levels_content.get("events", {}).get(etype)
+
+    # otherwise, fall back to the state_default/events_default.
+    if send_level is None:
+        if state_key is not None:
+            send_level = power_levels_content.get("state_default", 50)
+        else:
+            send_level = power_levels_content.get("events_default", 0)
 
-    return send_level
+    return int(send_level)
 
 
 def _can_send_event(event, auth_events):
+    power_levels_event = _get_power_level_event(auth_events)
+
     send_level = get_send_level(
-        event.type, event.get("state_key", None), auth_events
+        event.type, event.get("state_key"), power_levels_event,
     )
     user_level = get_user_power_level(event.user_id, auth_events)
 
@@ -524,13 +533,22 @@ def _check_power_levels(event, auth_events):
 
 
 def _get_power_level_event(auth_events):
-    key = (EventTypes.PowerLevels, "", )
-    return auth_events.get(key)
+    return auth_events.get((EventTypes.PowerLevels, ""))
 
 
 def get_user_power_level(user_id, auth_events):
-    power_level_event = _get_power_level_event(auth_events)
+    """Get a user's power level
+
+    Args:
+        user_id (str): user's id to look up in power_levels
+        auth_events (dict[(str, str), synapse.events.EventBase]):
+            state in force at this point in the room (or rather, a subset of
+            it including at least the create event and power levels event.
 
+    Returns:
+        int: the user's power level in this room.
+    """
+    power_level_event = _get_power_level_event(auth_events)
     if power_level_event:
         level = power_level_event.content.get("users", {}).get(user_id)
         if not level:
@@ -541,6 +559,11 @@ def get_user_power_level(user_id, auth_events):
         else:
             return int(level)
     else:
+        # if there is no power levels event, the creator gets 100 and everyone
+        # else gets 0.
+
+        # some things which call this don't pass the create event: hack around
+        # that.
         key = (EventTypes.Create, "", )
         create_event = auth_events.get(key)
         if (create_event is not None and
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 9f1142b5a9..1d5c0f3797 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -35,7 +35,7 @@ from synapse.storage.presence import UserPresenceState
 from synapse.util.metrics import Measure
 from synapse.metrics import LaterGauge
 
-from blist import sorteddict
+from sortedcontainers import SortedDict
 from collections import namedtuple
 
 import logging
@@ -55,19 +55,19 @@ class FederationRemoteSendQueue(object):
         self.is_mine_id = hs.is_mine_id
 
         self.presence_map = {}  # Pending presence map user_id -> UserPresenceState
-        self.presence_changed = sorteddict()  # Stream position -> user_id
+        self.presence_changed = SortedDict()  # Stream position -> user_id
 
         self.keyed_edu = {}  # (destination, key) -> EDU
-        self.keyed_edu_changed = sorteddict()  # stream position -> (destination, key)
+        self.keyed_edu_changed = SortedDict()  # stream position -> (destination, key)
 
-        self.edus = sorteddict()  # stream position -> Edu
+        self.edus = SortedDict()  # stream position -> Edu
 
-        self.failures = sorteddict()  # stream position -> (destination, Failure)
+        self.failures = SortedDict()  # stream position -> (destination, Failure)
 
-        self.device_messages = sorteddict()  # stream position -> destination
+        self.device_messages = SortedDict()  # stream position -> destination
 
         self.pos = 1
-        self.pos_time = sorteddict()
+        self.pos_time = SortedDict()
 
         # EVERYTHING IS SAD. In particular, python only makes new scopes when
         # we make a new function, so we need to make a new function so the inner
@@ -98,7 +98,7 @@ class FederationRemoteSendQueue(object):
         now = self.clock.time_msec()
 
         keys = self.pos_time.keys()
-        time = keys.bisect_left(now - FIVE_MINUTES_AGO)
+        time = self.pos_time.bisect_left(now - FIVE_MINUTES_AGO)
         if not keys[:time]:
             return
 
@@ -113,7 +113,7 @@ class FederationRemoteSendQueue(object):
         with Measure(self.clock, "send_queue._clear"):
             # Delete things out of presence maps
             keys = self.presence_changed.keys()
-            i = keys.bisect_left(position_to_delete)
+            i = self.presence_changed.bisect_left(position_to_delete)
             for key in keys[:i]:
                 del self.presence_changed[key]
 
@@ -131,7 +131,7 @@ class FederationRemoteSendQueue(object):
 
             # Delete things out of keyed edus
             keys = self.keyed_edu_changed.keys()
-            i = keys.bisect_left(position_to_delete)
+            i = self.keyed_edu_changed.bisect_left(position_to_delete)
             for key in keys[:i]:
                 del self.keyed_edu_changed[key]
 
@@ -145,19 +145,19 @@ class FederationRemoteSendQueue(object):
 
             # Delete things out of edu map
             keys = self.edus.keys()
-            i = keys.bisect_left(position_to_delete)
+            i = self.edus.bisect_left(position_to_delete)
             for key in keys[:i]:
                 del self.edus[key]
 
             # Delete things out of failure map
             keys = self.failures.keys()
-            i = keys.bisect_left(position_to_delete)
+            i = self.failures.bisect_left(position_to_delete)
             for key in keys[:i]:
                 del self.failures[key]
 
             # Delete things out of device map
             keys = self.device_messages.keys()
-            i = keys.bisect_left(position_to_delete)
+            i = self.device_messages.bisect_left(position_to_delete)
             for key in keys[:i]:
                 del self.device_messages[key]
 
@@ -250,13 +250,12 @@ class FederationRemoteSendQueue(object):
             self._clear_queue_before_pos(federation_ack)
 
         # Fetch changed presence
-        keys = self.presence_changed.keys()
-        i = keys.bisect_right(from_token)
-        j = keys.bisect_right(to_token) + 1
+        i = self.presence_changed.bisect_right(from_token)
+        j = self.presence_changed.bisect_right(to_token) + 1
         dest_user_ids = [
             (pos, user_id)
-            for pos in keys[i:j]
-            for user_id in self.presence_changed[pos]
+            for pos, user_id_list in self.presence_changed.items()[i:j]
+            for user_id in user_id_list
         ]
 
         for (key, user_id) in dest_user_ids:
@@ -265,13 +264,12 @@ class FederationRemoteSendQueue(object):
             )))
 
         # Fetch changes keyed edus
-        keys = self.keyed_edu_changed.keys()
-        i = keys.bisect_right(from_token)
-        j = keys.bisect_right(to_token) + 1
+        i = self.keyed_edu_changed.bisect_right(from_token)
+        j = self.keyed_edu_changed.bisect_right(to_token) + 1
         # We purposefully clobber based on the key here, python dict comprehensions
         # always use the last value, so this will correctly point to the last
         # stream position.
-        keyed_edus = {self.keyed_edu_changed[k]: k for k in keys[i:j]}
+        keyed_edus = {v: k for k, v in self.keyed_edu_changed.items()[i:j]}
 
         for ((destination, edu_key), pos) in iteritems(keyed_edus):
             rows.append((pos, KeyedEduRow(
@@ -280,19 +278,17 @@ class FederationRemoteSendQueue(object):
             )))
 
         # Fetch changed edus
-        keys = self.edus.keys()
-        i = keys.bisect_right(from_token)
-        j = keys.bisect_right(to_token) + 1
-        edus = ((k, self.edus[k]) for k in keys[i:j])
+        i = self.edus.bisect_right(from_token)
+        j = self.edus.bisect_right(to_token) + 1
+        edus = self.edus.items()[i:j]
 
         for (pos, edu) in edus:
             rows.append((pos, EduRow(edu)))
 
         # Fetch changed failures
-        keys = self.failures.keys()
-        i = keys.bisect_right(from_token)
-        j = keys.bisect_right(to_token) + 1
-        failures = ((k, self.failures[k]) for k in keys[i:j])
+        i = self.failures.bisect_right(from_token)
+        j = self.failures.bisect_right(to_token) + 1
+        failures = self.failures.items()[i:j]
 
         for (pos, (destination, failure)) in failures:
             rows.append((pos, FailureRow(
@@ -301,10 +297,9 @@ class FederationRemoteSendQueue(object):
             )))
 
         # Fetch changed device messages
-        keys = self.device_messages.keys()
-        i = keys.bisect_right(from_token)
-        j = keys.bisect_right(to_token) + 1
-        device_messages = {self.device_messages[k]: k for k in keys[i:j]}
+        i = self.device_messages.bisect_right(from_token)
+        j = self.device_messages.bisect_right(to_token) + 1
+        device_messages = {v: k for k, v in self.device_messages.items()[i:j]}
 
         for (destination, pos) in iteritems(device_messages):
             rows.append((pos, DeviceRow(
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index f0aeb5a0d3..bcbce7f6eb 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -21,7 +21,6 @@ from .units import Transaction, Edu
 
 from synapse.api.errors import HttpResponseException, FederationDeniedError
 from synapse.util import logcontext, PreserveLoggingContext
-from synapse.util.async import run_on_reactor
 from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
 from synapse.util.metrics import measure_func
 from synapse.handlers.presence import format_user_presence_state, get_interested_remotes
@@ -451,9 +450,6 @@ class TransactionQueue(object):
             # hence why we throw the result away.
             yield get_retry_limiter(destination, self.clock, self.store)
 
-            # XXX: what's this for?
-            yield run_on_reactor()
-
             pending_pdus = []
             while True:
                 device_message_edus, device_stream_id, dev_list_id = (
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 3c0051586d..dabc744890 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -13,6 +13,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
 from twisted.internet import defer, threads
 
 from ._base import BaseHandler
@@ -23,7 +24,6 @@ from synapse.api.errors import (
 )
 from synapse.module_api import ModuleApi
 from synapse.types import UserID
-from synapse.util.async import run_on_reactor
 from synapse.util.caches.expiringcache import ExpiringCache
 from synapse.util.logcontext import make_deferred_yieldable
 
@@ -423,15 +423,11 @@ class AuthHandler(BaseHandler):
     def _check_msisdn(self, authdict, _):
         return self._check_threepid('msisdn', authdict)
 
-    @defer.inlineCallbacks
     def _check_dummy_auth(self, authdict, _):
-        yield run_on_reactor()
-        defer.returnValue(True)
+        return defer.succeed(True)
 
     @defer.inlineCallbacks
     def _check_threepid(self, medium, authdict):
-        yield run_on_reactor()
-
         if 'threepid_creds' not in authdict:
             raise LoginError(400, "Missing threepid_creds", Codes.MISSING_PARAM)
 
@@ -825,6 +821,15 @@ class AuthHandler(BaseHandler):
         if medium == 'email':
             address = address.lower()
 
+        identity_handler = self.hs.get_handlers().identity_handler
+        yield identity_handler.unbind_threepid(
+            user_id,
+            {
+                'medium': medium,
+                'address': address,
+            },
+        )
+
         ret = yield self.store.user_delete_threepid(
             user_id, medium, address,
         )
diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py
index c5e92f6214..8ec5ba2012 100644
--- a/synapse/handlers/deactivate_account.py
+++ b/synapse/handlers/deactivate_account.py
@@ -17,6 +17,7 @@ from twisted.internet import defer, reactor
 from ._base import BaseHandler
 from synapse.types import UserID, create_requester
 from synapse.util.logcontext import run_in_background
+from synapse.api.errors import SynapseError
 
 import logging
 
@@ -30,6 +31,7 @@ class DeactivateAccountHandler(BaseHandler):
         self._auth_handler = hs.get_auth_handler()
         self._device_handler = hs.get_device_handler()
         self._room_member_handler = hs.get_room_member_handler()
+        self._identity_handler = hs.get_handlers().identity_handler
         self.user_directory_handler = hs.get_user_directory_handler()
 
         # Flag that indicates whether the process to part users from rooms is running
@@ -52,14 +54,35 @@ class DeactivateAccountHandler(BaseHandler):
         # FIXME: Theoretically there is a race here wherein user resets
         # password using threepid.
 
-        # first delete any devices belonging to the user, which will also
+        # delete threepids first. We remove these from the IS so if this fails,
+        # leave the user still active so they can try again.
+        # Ideally we would prevent password resets and then do this in the
+        # background thread.
+        threepids = yield self.store.user_get_threepids(user_id)
+        for threepid in threepids:
+            try:
+                yield self._identity_handler.unbind_threepid(
+                    user_id,
+                    {
+                        'medium': threepid['medium'],
+                        'address': threepid['address'],
+                    },
+                )
+            except Exception:
+                # Do we want this to be a fatal error or should we carry on?
+                logger.exception("Failed to remove threepid from ID server")
+                raise SynapseError(400, "Failed to remove threepid from ID server")
+            yield self.store.user_delete_threepid(
+                user_id, threepid['medium'], threepid['address'],
+            )
+
+        # delete any devices belonging to the user, which will also
         # delete corresponding access tokens.
         yield self._device_handler.delete_all_devices_for_user(user_id)
         # then delete any remaining access tokens which weren't associated with
         # a device.
         yield self._auth_handler.delete_access_tokens_for_user(user_id)
 
-        yield self.store.user_delete_threepids(user_id)
         yield self.store.user_set_password_hash(user_id, None)
 
         # Add the user to a table of users pending deactivation (ie.
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 495ac4c648..af94bf33bc 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -39,7 +39,7 @@ from synapse.events.validator import EventValidator
 from synapse.util import unwrapFirstError, logcontext
 from synapse.util.metrics import measure_func
 from synapse.util.logutils import log_function
-from synapse.util.async import run_on_reactor, Linearizer
+from synapse.util.async import Linearizer
 from synapse.util.frozenutils import unfreeze
 from synapse.crypto.event_signing import (
     compute_event_signature, add_hashes_and_signatures,
@@ -1381,8 +1381,6 @@ class FederationHandler(BaseHandler):
     def get_state_for_pdu(self, room_id, event_id):
         """Returns the state at the event. i.e. not including said event.
         """
-        yield run_on_reactor()
-
         state_groups = yield self.store.get_state_groups(
             room_id, [event_id]
         )
@@ -1425,8 +1423,6 @@ class FederationHandler(BaseHandler):
     def get_state_ids_for_pdu(self, room_id, event_id):
         """Returns the state at the event. i.e. not including said event.
         """
-        yield run_on_reactor()
-
         state_groups = yield self.store.get_state_groups_ids(
             room_id, [event_id]
         )
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index 91a0898860..f00dfe1d3e 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -1,6 +1,7 @@
 # -*- coding: utf-8 -*-
 # Copyright 2015, 2016 OpenMarket Ltd
 # Copyright 2017 Vector Creations Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -26,7 +27,6 @@ from synapse.api.errors import (
     MatrixCodeMessageException, CodeMessageException
 )
 from ._base import BaseHandler
-from synapse.util.async import run_on_reactor
 from synapse.api.errors import SynapseError, Codes
 
 logger = logging.getLogger(__name__)
@@ -38,6 +38,7 @@ class IdentityHandler(BaseHandler):
         super(IdentityHandler, self).__init__(hs)
 
         self.http_client = hs.get_simple_http_client()
+        self.federation_http_client = hs.get_http_client()
 
         self.trusted_id_servers = set(hs.config.trusted_third_party_id_servers)
         self.trust_any_id_server_just_for_testing_do_not_use = (
@@ -60,8 +61,6 @@ class IdentityHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def threepid_from_creds(self, creds):
-        yield run_on_reactor()
-
         if 'id_server' in creds:
             id_server = creds['id_server']
         elif 'idServer' in creds:
@@ -104,7 +103,6 @@ class IdentityHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def bind_threepid(self, creds, mxid):
-        yield run_on_reactor()
         logger.debug("binding threepid %r to %s", creds, mxid)
         data = None
 
@@ -139,9 +137,53 @@ class IdentityHandler(BaseHandler):
         defer.returnValue(data)
 
     @defer.inlineCallbacks
-    def requestEmailToken(self, id_server, email, client_secret, send_attempt, **kwargs):
-        yield run_on_reactor()
+    def unbind_threepid(self, mxid, threepid):
+        """
+        Removes a binding from an identity server
+        Args:
+            mxid (str): Matrix user ID of binding to be removed
+            threepid (dict): Dict with medium & address of binding to be removed
+
+        Returns:
+            Deferred[bool]: True on success, otherwise False
+        """
+        logger.debug("unbinding threepid %r from %s", threepid, mxid)
+        if not self.trusted_id_servers:
+            logger.warn("Can't unbind threepid: no trusted ID servers set in config")
+            defer.returnValue(False)
+
+        # We don't track what ID server we added 3pids on (perhaps we ought to)
+        # but we assume that any of the servers in the trusted list are in the
+        # same ID server federation, so we can pick any one of them to send the
+        # deletion request to.
+        id_server = next(iter(self.trusted_id_servers))
+
+        url = "https://%s/_matrix/identity/api/v1/3pid/unbind" % (id_server,)
+        content = {
+            "mxid": mxid,
+            "threepid": threepid,
+        }
+        headers = {}
+        # we abuse the federation http client to sign the request, but we have to send it
+        # using the normal http client since we don't want the SRV lookup and want normal
+        # 'browser-like' HTTPS.
+        self.federation_http_client.sign_request(
+            destination=None,
+            method='POST',
+            url_bytes='/_matrix/identity/api/v1/3pid/unbind'.encode('ascii'),
+            headers_dict=headers,
+            content=content,
+            destination_is=id_server,
+        )
+        yield self.http_client.post_json_get_json(
+            url,
+            content,
+            headers,
+        )
+        defer.returnValue(True)
 
+    @defer.inlineCallbacks
+    def requestEmailToken(self, id_server, email, client_secret, send_attempt, **kwargs):
         if not self._should_trust_id_server(id_server):
             raise SynapseError(
                 400, "Untrusted ID server '%s'" % id_server,
@@ -176,8 +218,6 @@ class IdentityHandler(BaseHandler):
             self, id_server, country, phone_number,
             client_secret, send_attempt, **kwargs
     ):
-        yield run_on_reactor()
-
         if not self._should_trust_id_server(id_server):
             raise SynapseError(
                 400, "Untrusted ID server '%s'" % id_server,
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 1cb81b6cf8..18dcc6d196 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -36,7 +36,7 @@ from synapse.events.validator import EventValidator
 from synapse.types import (
     UserID, RoomAlias, RoomStreamToken,
 )
-from synapse.util.async import run_on_reactor, ReadWriteLock, Limiter
+from synapse.util.async import ReadWriteLock, Limiter
 from synapse.util.logcontext import run_in_background
 from synapse.util.metrics import measure_func
 from synapse.util.frozenutils import frozendict_json_encoder
@@ -959,9 +959,7 @@ class EventCreationHandler(object):
             event_stream_id, max_stream_id
         )
 
-        @defer.inlineCallbacks
         def _notify():
-            yield run_on_reactor()
             try:
                 self.notifier.on_new_room_event(
                     event, event_stream_id, max_stream_id,
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 7e52adda3c..e76ef5426d 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -24,7 +24,7 @@ from synapse.api.errors import (
 from synapse.http.client import CaptchaServerHttpClient
 from synapse import types
 from synapse.types import UserID, create_requester, RoomID, RoomAlias
-from synapse.util.async import run_on_reactor, Linearizer
+from synapse.util.async import Linearizer
 from synapse.util.threepids import check_3pid_allowed
 from ._base import BaseHandler
 
@@ -139,7 +139,6 @@ class RegistrationHandler(BaseHandler):
         Raises:
             RegistrationError if there was a problem registering.
         """
-        yield run_on_reactor()
         password_hash = None
         if password:
             password_hash = yield self.auth_handler().hash(password)
@@ -431,8 +430,6 @@ class RegistrationHandler(BaseHandler):
         Raises:
             RegistrationError if there was a problem registering.
         """
-        yield run_on_reactor()
-
         if localpart is None:
             raise SynapseError(400, "Request must include user id")
 
diff --git a/synapse/http/__init__.py b/synapse/http/__init__.py
index 054372e179..58ef8d3ce4 100644
--- a/synapse/http/__init__.py
+++ b/synapse/http/__init__.py
@@ -13,6 +13,8 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import re
+
 from twisted.internet.defer import CancelledError
 from twisted.python import failure
 
@@ -34,3 +36,14 @@ def cancelled_to_request_timed_out_error(value, timeout):
         value.trap(CancelledError)
         raise RequestTimedOutError()
     return value
+
+
+ACCESS_TOKEN_RE = re.compile(br'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$')
+
+
+def redact_uri(uri):
+    """Strips access tokens from the uri replaces with <redacted>"""
+    return ACCESS_TOKEN_RE.sub(
+        br'\1<redacted>\3',
+        uri
+    )
diff --git a/synapse/http/client.py b/synapse/http/client.py
index 4d4eee3d64..8064a84c5c 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -19,7 +19,7 @@ from OpenSSL.SSL import VERIFY_NONE
 from synapse.api.errors import (
     CodeMessageException, MatrixCodeMessageException, SynapseError, Codes,
 )
-from synapse.http import cancelled_to_request_timed_out_error
+from synapse.http import cancelled_to_request_timed_out_error, redact_uri
 from synapse.util.async import add_timeout_to_deferred
 from synapse.util.caches import CACHE_SIZE_FACTOR
 from synapse.util.logcontext import make_deferred_yieldable
@@ -90,7 +90,8 @@ class SimpleHttpClient(object):
         # counters to it
         outgoing_requests_counter.labels(method).inc()
 
-        logger.info("Sending request %s %s", method, uri)
+        # log request but strip `access_token` (AS requests for example include this)
+        logger.info("Sending request %s %s", method, redact_uri(uri))
 
         try:
             request_deferred = self.agent.request(
@@ -105,14 +106,14 @@ class SimpleHttpClient(object):
             incoming_responses_counter.labels(method, response.code).inc()
             logger.info(
                 "Received response to  %s %s: %s",
-                method, uri, response.code
+                method, redact_uri(uri), response.code
             )
             defer.returnValue(response)
         except Exception as e:
             incoming_responses_counter.labels(method, "ERR").inc()
             logger.info(
                 "Error sending request to  %s %s: %s %s",
-                method, uri, type(e).__name__, e.message
+                method, redact_uri(uri), type(e).__name__, e.message
             )
             raise e
 
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 821aed362b..993dc06e02 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -260,14 +260,35 @@ class MatrixFederationHttpClient(object):
             defer.returnValue(response)
 
     def sign_request(self, destination, method, url_bytes, headers_dict,
-                     content=None):
+                     content=None, destination_is=None):
+        """
+        Signs a request by adding an Authorization header to headers_dict
+        Args:
+            destination (bytes|None): The desination home server of the request.
+                May be None if the destination is an identity server, in which case
+                destination_is must be non-None.
+            method (bytes): The HTTP method of the request
+            url_bytes (bytes): The URI path of the request
+            headers_dict (dict): Dictionary of request headers to append to
+            content (bytes): The body of the request
+            destination_is (bytes): As 'destination', but if the destination is an
+                identity server
+
+        Returns:
+            None
+        """
         request = {
             "method": method,
             "uri": url_bytes,
             "origin": self.server_name,
-            "destination": destination,
         }
 
+        if destination is not None:
+            request["destination"] = destination
+
+        if destination_is is not None:
+            request["destination_is"] = destination_is
+
         if content is not None:
             request["content"] = content
 
diff --git a/synapse/http/request_metrics.py b/synapse/http/request_metrics.py
index dc06f6c443..1b711ca2de 100644
--- a/synapse/http/request_metrics.py
+++ b/synapse/http/request_metrics.py
@@ -117,13 +117,17 @@ def _get_in_flight_counts():
     Returns:
         dict[tuple[str, str], int]
     """
-    for rm in _in_flight_requests:
+    # Cast to a list to prevent it changing while the Prometheus
+    # thread is collecting metrics
+    reqs = list(_in_flight_requests)
+
+    for rm in reqs:
         rm.update_metrics()
 
     # Map from (method, name) -> int, the number of in flight requests of that
     # type
     counts = {}
-    for rm in _in_flight_requests:
+    for rm in reqs:
         key = (rm.method, rm.name,)
         counts[key] = counts.get(key, 0) + 1
 
@@ -131,7 +135,7 @@ def _get_in_flight_counts():
 
 
 LaterGauge(
-    "synapse_http_request_metrics_in_flight_requests_count",
+    "synapse_http_server_in_flight_requests_count",
     "",
     ["method", "servlet"],
     _get_in_flight_counts,
diff --git a/synapse/http/site.py b/synapse/http/site.py
index 60299657b9..2664006f8c 100644
--- a/synapse/http/site.py
+++ b/synapse/http/site.py
@@ -14,18 +14,16 @@
 
 import contextlib
 import logging
-import re
 import time
 
 from twisted.web.server import Site, Request
 
+from synapse.http import redact_uri
 from synapse.http.request_metrics import RequestMetrics
 from synapse.util.logcontext import LoggingContext
 
 logger = logging.getLogger(__name__)
 
-ACCESS_TOKEN_RE = re.compile(br'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$')
-
 _next_request_seq = 0
 
 
@@ -69,10 +67,7 @@ class SynapseRequest(Request):
         return "%s-%i" % (self.method, self.request_seq)
 
     def get_redacted_uri(self):
-        return ACCESS_TOKEN_RE.sub(
-            br'\1<redacted>\3',
-            self.uri
-        )
+        return redact_uri(self.uri)
 
     def get_user_agent(self):
         return self.requestHeaders.getRawHeaders(b"User-Agent", [None])[-1]
diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py
index 429e79c472..7f76969467 100644
--- a/synapse/metrics/__init__.py
+++ b/synapse/metrics/__init__.py
@@ -62,7 +62,7 @@ class LaterGauge(object):
             calls = self.caller()
         except Exception:
             logger.exception(
-                "Exception running callback for LaterGuage(%s)",
+                "Exception running callback for LaterGauge(%s)",
                 self.name,
             )
             yield g
@@ -190,6 +190,22 @@ event_processing_last_ts = Gauge("synapse_event_processing_last_ts", "", ["name"
 # finished being processed.
 event_processing_lag = Gauge("synapse_event_processing_lag", "", ["name"])
 
+last_ticked = time.time()
+
+
+class ReactorLastSeenMetric(object):
+
+    def collect(self):
+        cm = GaugeMetricFamily(
+            "python_twisted_reactor_last_seen",
+            "Seconds since the Twisted reactor was last seen",
+        )
+        cm.add_metric([], time.time() - last_ticked)
+        yield cm
+
+
+REGISTRY.register(ReactorLastSeenMetric())
+
 
 def runUntilCurrentTimer(func):
 
@@ -222,6 +238,11 @@ def runUntilCurrentTimer(func):
         tick_time.observe(end - start)
         pending_calls_metric.observe(num_pending)
 
+        # Update the time we last ticked, for the metric to test whether
+        # Synapse's reactor has frozen
+        global last_ticked
+        last_ticked = end
+
         if running_on_pypy:
             return ret
 
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 750d11ca38..36bb5bbc65 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -19,7 +19,6 @@ import logging
 from twisted.internet import defer
 
 from synapse.push.pusher import PusherFactory
-from synapse.util.async import run_on_reactor
 from synapse.util.logcontext import make_deferred_yieldable, run_in_background
 
 logger = logging.getLogger(__name__)
@@ -125,7 +124,6 @@ class PusherPool:
 
     @defer.inlineCallbacks
     def on_new_notifications(self, min_stream_id, max_stream_id):
-        yield run_on_reactor()
         try:
             users_affected = yield self.store.get_push_action_users_in_range(
                 min_stream_id, max_stream_id
@@ -151,7 +149,6 @@ class PusherPool:
 
     @defer.inlineCallbacks
     def on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids):
-        yield run_on_reactor()
         try:
             # Need to subtract 1 from the minimum because the lower bound here
             # is not inclusive
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index 478c497722..faf6dfdb8d 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -50,14 +50,16 @@ REQUIREMENTS = {
     "bcrypt": ["bcrypt>=3.1.0"],
     "pillow": ["PIL"],
     "pydenticon": ["pydenticon"],
-    "blist": ["blist"],
+    "sortedcontainers": ["sortedcontainers"],
     "pysaml2>=3.0.0": ["saml2>=3.0.0"],
     "pymacaroons-pynacl": ["pymacaroons"],
     "msgpack-python>=0.3.0": ["msgpack"],
     "phonenumbers>=8.2.0": ["phonenumbers"],
     "six": ["six"],
     "prometheus_client": ["prometheus_client"],
+    "attr": ["attr"],
 }
+
 CONDITIONAL_REQUIREMENTS = {
     "web_client": {
         "matrix_angular_sdk>=0.6.8": ["syweb>=0.6.8"],
diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py
index 6835a7bba2..b8665a45eb 100644
--- a/synapse/rest/client/v1/admin.py
+++ b/synapse/rest/client/v1/admin.py
@@ -169,16 +169,12 @@ class PurgeHistoryRestServlet(ClientV1RestServlet):
                 yield self.store.find_first_stream_ordering_after_ts(ts)
             )
 
-            room_event_after_stream_ordering = (
+            r = (
                 yield self.store.get_room_event_after_stream_ordering(
                     room_id, stream_ordering,
                 )
             )
-            if room_event_after_stream_ordering:
-                token = yield self.store.get_topological_token_for_event(
-                    room_event_after_stream_ordering,
-                )
-            else:
+            if not r:
                 logger.warn(
                     "[purge] purging events not possible: No event found "
                     "(received_ts %i => stream_ordering %i)",
@@ -189,8 +185,10 @@ class PurgeHistoryRestServlet(ClientV1RestServlet):
                     "there is no event to be purged",
                     errcode=Codes.NOT_FOUND,
                 )
+            (stream, topo, _event_id) = r
+            token = "t%d-%d" % (topo, stream)
             logger.info(
-                "[purge] purging up to token %d (received_ts %i => "
+                "[purge] purging up to token %s (received_ts %i => "
                 "stream_ordering %i)",
                 token, ts, stream_ordering,
             )
diff --git a/synapse/rest/client/v1/register.py b/synapse/rest/client/v1/register.py
index 9b3022e0b0..c10320dedf 100644
--- a/synapse/rest/client/v1/register.py
+++ b/synapse/rest/client/v1/register.py
@@ -24,8 +24,6 @@ import synapse.util.stringutils as stringutils
 from synapse.http.servlet import parse_json_object_from_request
 from synapse.types import create_requester
 
-from synapse.util.async import run_on_reactor
-
 from hashlib import sha1
 import hmac
 import logging
@@ -272,7 +270,6 @@ class RegisterRestServlet(ClientV1RestServlet):
 
     @defer.inlineCallbacks
     def _do_password(self, request, register_json, session):
-        yield run_on_reactor()
         if (self.hs.config.enable_registration_captcha and
                 not session[LoginType.RECAPTCHA]):
             # captcha should've been done by this stage!
@@ -333,8 +330,6 @@ class RegisterRestServlet(ClientV1RestServlet):
 
     @defer.inlineCallbacks
     def _do_shared_secret(self, request, register_json, session):
-        yield run_on_reactor()
-
         if not isinstance(register_json.get("mac", None), string_types):
             raise SynapseError(400, "Expected mac.")
         if not isinstance(register_json.get("user", None), string_types):
@@ -423,8 +418,6 @@ class CreateUserRestServlet(ClientV1RestServlet):
 
     @defer.inlineCallbacks
     def _do_create(self, requester, user_json):
-        yield run_on_reactor()
-
         if "localpart" not in user_json:
             raise SynapseError(400, "Expected 'localpart' key.")
 
diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py
index 30523995af..e1281cfbb6 100644
--- a/synapse/rest/client/v2_alpha/account.py
+++ b/synapse/rest/client/v2_alpha/account.py
@@ -24,7 +24,6 @@ from synapse.http.servlet import (
     RestServlet, assert_params_in_request,
     parse_json_object_from_request,
 )
-from synapse.util.async import run_on_reactor
 from synapse.util.msisdn import phone_number_to_msisdn
 from synapse.util.threepids import check_3pid_allowed
 from ._base import client_v2_patterns, interactive_auth_handler
@@ -300,8 +299,6 @@ class ThreepidRestServlet(RestServlet):
 
     @defer.inlineCallbacks
     def on_GET(self, request):
-        yield run_on_reactor()
-
         requester = yield self.auth.get_user_by_req(request)
 
         threepids = yield self.datastore.user_get_threepids(
@@ -312,8 +309,6 @@ class ThreepidRestServlet(RestServlet):
 
     @defer.inlineCallbacks
     def on_POST(self, request):
-        yield run_on_reactor()
-
         body = parse_json_object_from_request(request)
 
         threePidCreds = body.get('threePidCreds')
@@ -365,8 +360,6 @@ class ThreepidDeleteRestServlet(RestServlet):
 
     @defer.inlineCallbacks
     def on_POST(self, request):
-        yield run_on_reactor()
-
         body = parse_json_object_from_request(request)
 
         required = ['medium', 'address']
@@ -381,9 +374,16 @@ class ThreepidDeleteRestServlet(RestServlet):
         requester = yield self.auth.get_user_by_req(request)
         user_id = requester.user.to_string()
 
-        yield self.auth_handler.delete_threepid(
-            user_id, body['medium'], body['address']
-        )
+        try:
+            yield self.auth_handler.delete_threepid(
+                user_id, body['medium'], body['address']
+            )
+        except Exception:
+            # NB. This endpoint should succeed if there is nothing to
+            # delete, so it should only throw if something is wrong
+            # that we ought to care about.
+            logger.exception("Failed to remove threepid")
+            raise SynapseError(500, "Failed to remove threepid")
 
         defer.returnValue((200, {}))
 
diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py
index 5cab00aea9..97e7c0f7c6 100644
--- a/synapse/rest/client/v2_alpha/register.py
+++ b/synapse/rest/client/v2_alpha/register.py
@@ -32,7 +32,6 @@ from ._base import client_v2_patterns, interactive_auth_handler
 import logging
 import hmac
 from hashlib import sha1
-from synapse.util.async import run_on_reactor
 from synapse.util.ratelimitutils import FederationRateLimiter
 
 from six import string_types
@@ -191,8 +190,6 @@ class RegisterRestServlet(RestServlet):
     @interactive_auth_handler
     @defer.inlineCallbacks
     def on_POST(self, request):
-        yield run_on_reactor()
-
         body = parse_json_object_from_request(request)
 
         kind = "user"
diff --git a/synapse/state.py b/synapse/state.py
index 216418f58d..8098db94b4 100644
--- a/synapse/state.py
+++ b/synapse/state.py
@@ -694,10 +694,10 @@ def _create_auth_events_from_maps(unconflicted_state, conflicted_state, state_ma
     return auth_events
 
 
-def _resolve_with_state(unconflicted_state_ids, conflicted_state_ds, auth_event_ids,
+def _resolve_with_state(unconflicted_state_ids, conflicted_state_ids, auth_event_ids,
                         state_map):
     conflicted_state = {}
-    for key, event_ids in iteritems(conflicted_state_ds):
+    for key, event_ids in iteritems(conflicted_state_ids):
         events = [state_map[ev_id] for ev_id in event_ids if ev_id in state_map]
         if len(events) > 1:
             conflicted_state[key] = events
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index c241167fbe..9c9cf46e7f 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -460,15 +460,6 @@ class RegistrationStore(RegistrationWorkerStore,
             defer.returnValue(ret['user_id'])
         defer.returnValue(None)
 
-    def user_delete_threepids(self, user_id):
-        return self._simple_delete(
-            "user_threepids",
-            keyvalues={
-                "user_id": user_id,
-            },
-            desc="user_delete_threepids",
-        )
-
     def user_delete_threepid(self, user_id, medium, address):
         return self._simple_delete(
             "user_threepids",
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 7bfc3d91b5..48a88f755e 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -578,7 +578,6 @@ class RoomMemberStore(RoomMemberWorkerStore):
             )
             txn.execute(sql, (user_id, room_id))
 
-            txn.call_after(self.was_forgotten_at.invalidate_all)
             txn.call_after(self.did_forget.invalidate, (user_id, room_id))
             self._invalidate_cache_and_stream(
                 txn, self.who_forgot_in_room, (room_id,)
@@ -609,31 +608,6 @@ class RoomMemberStore(RoomMemberWorkerStore):
         count = yield self.runInteraction("did_forget_membership", f)
         defer.returnValue(count == 0)
 
-    @cachedInlineCallbacks(num_args=3)
-    def was_forgotten_at(self, user_id, room_id, event_id):
-        """Returns whether user_id has elected to discard history for room_id at
-        event_id.
-
-        event_id must be a membership event."""
-        def f(txn):
-            sql = (
-                "SELECT"
-                "  forgotten"
-                " FROM"
-                "  room_memberships"
-                " WHERE"
-                "  user_id = ?"
-                " AND"
-                "  room_id = ?"
-                " AND"
-                "  event_id = ?"
-            )
-            txn.execute(sql, (user_id, room_id, event_id))
-            rows = txn.fetchall()
-            return rows[0][0]
-        forgot = yield self.runInteraction("did_forget_membership_at", f)
-        defer.returnValue(forgot == 1)
-
     @defer.inlineCallbacks
     def _background_add_membership_profile(self, progress, batch_size):
         target_min_stream_id = progress.get(
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index bdee14a8eb..85b8ec2b8f 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -23,7 +23,7 @@ from twisted.internet import defer
 
 from synapse.storage.background_updates import BackgroundUpdateStore
 from synapse.storage.engines import PostgresEngine
-from synapse.util.caches import intern_string, CACHE_SIZE_FACTOR
+from synapse.util.caches import intern_string, get_cache_factor_for
 from synapse.util.caches.descriptors import cached, cachedList
 from synapse.util.caches.dictionary_cache import DictionaryCache
 from synapse.util.stringutils import to_ascii
@@ -57,7 +57,7 @@ class StateGroupWorkerStore(SQLBaseStore):
         super(StateGroupWorkerStore, self).__init__(db_conn, hs)
 
         self._state_group_cache = DictionaryCache(
-            "*stateGroupCache*", 100000 * CACHE_SIZE_FACTOR
+            "*stateGroupCache*", 500000 * get_cache_factor_for("stateGroupCache")
         )
 
     @cached(max_entries=100000, iterable=True)
@@ -272,7 +272,7 @@ class StateGroupWorkerStore(SQLBaseStore):
                 for typ in types:
                     if typ[1] is None:
                         where_clauses.append("(type = ?)")
-                        where_args.extend(typ[0])
+                        where_args.append(typ[0])
                         wildcard_types = True
                     else:
                         where_clauses.append("(type = ? AND state_key = ?)")
diff --git a/synapse/util/async.py b/synapse/util/async.py
index 9dd4e6b5bc..b8e57efc54 100644
--- a/synapse/util/async.py
+++ b/synapse/util/async.py
@@ -13,7 +13,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-
 from twisted.internet import defer, reactor
 from twisted.internet.defer import CancelledError
 from twisted.python import failure
@@ -41,13 +40,6 @@ def sleep(seconds):
     defer.returnValue(res)
 
 
-def run_on_reactor():
-    """ This will cause the rest of the function to be invoked upon the next
-    iteration of the main loop
-    """
-    return sleep(0)
-
-
 class ObservableDeferred(object):
     """Wraps a deferred object so that we can add observer deferreds. These
     observer deferreds do not affect the callback chain of the original
@@ -227,7 +219,7 @@ class Linearizer(object):
             # the context manager, but it needs to happen while we hold the
             # lock, and the context manager's exit code must be synchronous,
             # so actually this is the only sensible place.
-            yield run_on_reactor()
+            yield sleep(0)
 
         else:
             logger.info("Acquired uncontended linearizer lock %r for key %r",
diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py
index 183faf75a1..900575eb3c 100644
--- a/synapse/util/caches/__init__.py
+++ b/synapse/util/caches/__init__.py
@@ -22,6 +22,16 @@ import six
 
 CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.5))
 
+
+def get_cache_factor_for(cache_name):
+    env_var = "SYNAPSE_CACHE_FACTOR_" + cache_name.upper()
+    factor = os.environ.get(env_var)
+    if factor:
+        return float(factor)
+
+    return CACHE_SIZE_FACTOR
+
+
 caches_by_name = {}
 collectors_by_name = {}
 
diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py
index fc1874b65b..65a1042de1 100644
--- a/synapse/util/caches/descriptors.py
+++ b/synapse/util/caches/descriptors.py
@@ -17,7 +17,7 @@ import logging
 
 from synapse.util.async import ObservableDeferred
 from synapse.util import unwrapFirstError, logcontext
-from synapse.util.caches import CACHE_SIZE_FACTOR
+from synapse.util.caches import get_cache_factor_for
 from synapse.util.caches.lrucache import LruCache
 from synapse.util.caches.treecache import TreeCache, iterate_tree_cache_entry
 from synapse.util.stringutils import to_ascii
@@ -313,7 +313,7 @@ class CacheDescriptor(_CacheDescriptorBase):
             orig, num_args=num_args, inlineCallbacks=inlineCallbacks,
             cache_context=cache_context)
 
-        max_entries = int(max_entries * CACHE_SIZE_FACTOR)
+        max_entries = int(max_entries * get_cache_factor_for(orig.__name__))
 
         self.max_entries = max_entries
         self.tree = tree
diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py
index a7fe0397fa..817118e30f 100644
--- a/synapse/util/caches/stream_change_cache.py
+++ b/synapse/util/caches/stream_change_cache.py
@@ -13,10 +13,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from synapse.util.caches import register_cache, CACHE_SIZE_FACTOR
+from synapse.util import caches
 
 
-from blist import sorteddict
+from sortedcontainers import SortedDict
 import logging
 
 
@@ -32,16 +32,18 @@ class StreamChangeCache(object):
     entities that may have changed since that position. If position key is too
     old then the cache will simply return all given entities.
     """
-    def __init__(self, name, current_stream_pos, max_size=10000, prefilled_cache={}):
-        self._max_size = int(max_size * CACHE_SIZE_FACTOR)
+
+    def __init__(self, name, current_stream_pos, max_size=10000, prefilled_cache=None):
+        self._max_size = int(max_size * caches.CACHE_SIZE_FACTOR)
         self._entity_to_key = {}
-        self._cache = sorteddict()
+        self._cache = SortedDict()
         self._earliest_known_stream_pos = current_stream_pos
         self.name = name
-        self.metrics = register_cache("cache", self.name, self._cache)
+        self.metrics = caches.register_cache("cache", self.name, self._cache)
 
-        for entity, stream_pos in prefilled_cache.items():
-            self.entity_has_changed(entity, stream_pos)
+        if prefilled_cache:
+            for entity, stream_pos in prefilled_cache.items():
+                self.entity_has_changed(entity, stream_pos)
 
     def has_entity_changed(self, entity, stream_pos):
         """Returns True if the entity may have been updated since stream_pos
@@ -65,22 +67,25 @@ class StreamChangeCache(object):
         return False
 
     def get_entities_changed(self, entities, stream_pos):
-        """Returns subset of entities that have had new things since the
-        given position. If the position is too old it will just return the given list.
+        """
+        Returns subset of entities that have had new things since the given
+        position.  Entities unknown to the cache will be returned.  If the
+        position is too old it will just return the given list.
         """
         assert type(stream_pos) is int
 
         if stream_pos >= self._earliest_known_stream_pos:
-            keys = self._cache.keys()
-            i = keys.bisect_right(stream_pos)
+            not_known_entities = set(entities) - set(self._entity_to_key)
 
-            result = set(
-                self._cache[k] for k in keys[i:]
-            ).intersection(entities)
+            result = (
+                set(self._cache.values()[self._cache.bisect_right(stream_pos) :])
+                .intersection(entities)
+                .union(not_known_entities)
+            )
 
             self.metrics.inc_hits()
         else:
-            result = entities
+            result = set(entities)
             self.metrics.inc_misses()
 
         return result
@@ -90,12 +95,13 @@ class StreamChangeCache(object):
         """
         assert type(stream_pos) is int
 
+        if not self._cache:
+            # If we have no cache, nothing can have changed.
+            return False
+
         if stream_pos >= self._earliest_known_stream_pos:
             self.metrics.inc_hits()
-            keys = self._cache.keys()
-            i = keys.bisect_right(stream_pos)
-
-            return i < len(keys)
+            return self._cache.bisect_right(stream_pos) < len(self._cache)
         else:
             self.metrics.inc_misses()
             return True
@@ -107,10 +113,7 @@ class StreamChangeCache(object):
         assert type(stream_pos) is int
 
         if stream_pos >= self._earliest_known_stream_pos:
-            keys = self._cache.keys()
-            i = keys.bisect_right(stream_pos)
-
-            return [self._cache[k] for k in keys[i:]]
+            return self._cache.values()[self._cache.bisect_right(stream_pos) :]
         else:
             return None
 
@@ -129,8 +132,10 @@ class StreamChangeCache(object):
             self._entity_to_key[entity] = stream_pos
 
             while len(self._cache) > self._max_size:
-                k, r = self._cache.popitem()
-                self._earliest_known_stream_pos = max(k, self._earliest_known_stream_pos)
+                k, r = self._cache.popitem(0)
+                self._earliest_known_stream_pos = max(
+                    k, self._earliest_known_stream_pos,
+                )
                 self._entity_to_key.pop(r, None)
 
     def get_max_pos_of_last_change(self, entity):