summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorDavid Baker <dave@matrix.org>2018-06-28 15:16:04 +0100
committerDavid Baker <dave@matrix.org>2018-06-28 15:16:04 +0100
commit94a14b41829f468eff13a90ec08fa60fae21c399 (patch)
tree3645efe340c6855b66ddc3cddddfe2cf60e16ff1 /synapse
parentMedia repo support for content erasure (diff)
parentAttempt to be more performant on PyPy (#3462) (diff)
downloadsynapse-94a14b41829f468eff13a90ec08fa60fae21c399.tar.xz
Merge remote-tracking branch 'origin/develop' into dbkr/media_erasure
Diffstat (limited to 'synapse')
-rw-r--r--synapse/__init__.py3
-rw-r--r--synapse/api/auth.py6
-rw-r--r--synapse/api/errors.py3
-rw-r--r--synapse/api/filtering.py3
-rw-r--r--synapse/app/appservice.py5
-rw-r--r--synapse/app/client_reader.py2
-rw-r--r--synapse/app/event_creator.py2
-rw-r--r--synapse/app/federation_reader.py2
-rw-r--r--synapse/app/federation_sender.py2
-rw-r--r--synapse/app/frontend_proxy.py2
-rwxr-xr-xsynapse/app/homeserver.py9
-rw-r--r--synapse/app/media_repository.py2
-rw-r--r--synapse/app/pusher.py2
-rw-r--r--synapse/app/synchrotron.py2
-rw-r--r--synapse/app/user_dir.py2
-rw-r--r--synapse/config/logger.py24
-rw-r--r--synapse/crypto/keyclient.py2
-rw-r--r--synapse/event_auth.py109
-rw-r--r--synapse/federation/federation_server.py2
-rw-r--r--synapse/federation/transaction_queue.py14
-rw-r--r--synapse/handlers/auth.py43
-rw-r--r--synapse/handlers/deactivate_account.py5
-rw-r--r--synapse/handlers/e2e_keys.py7
-rw-r--r--synapse/handlers/federation.py47
-rw-r--r--synapse/handlers/identity.py10
-rw-r--r--synapse/handlers/message.py22
-rw-r--r--synapse/handlers/presence.py4
-rw-r--r--synapse/handlers/register.py5
-rw-r--r--synapse/handlers/room.py8
-rw-r--r--synapse/handlers/search.py14
-rw-r--r--synapse/handlers/sync.py2
-rw-r--r--synapse/handlers/user_directory.py9
-rw-r--r--synapse/http/client.py8
-rw-r--r--synapse/http/endpoint.py18
-rw-r--r--synapse/http/matrixfederationclient.py8
-rw-r--r--synapse/http/request_metrics.py10
-rw-r--r--synapse/http/server.py5
-rw-r--r--synapse/http/servlet.py6
-rw-r--r--synapse/http/site.py5
-rw-r--r--synapse/metrics/__init__.py28
-rw-r--r--synapse/notifier.py3
-rw-r--r--synapse/push/emailpusher.py4
-rw-r--r--synapse/push/httppusher.py6
-rw-r--r--synapse/push/pusherpool.py3
-rw-r--r--synapse/replication/http/send_event.py6
-rw-r--r--synapse/replication/slave/storage/events.py2
-rw-r--r--synapse/replication/tcp/client.py6
-rw-r--r--synapse/replication/tcp/commands.py16
-rw-r--r--synapse/replication/tcp/protocol.py53
-rw-r--r--synapse/replication/tcp/resource.py4
-rw-r--r--synapse/rest/client/v1/admin.py13
-rw-r--r--synapse/rest/client/v1/login.py3
-rw-r--r--synapse/rest/client/v1/register.py7
-rw-r--r--synapse/rest/client/v1/room.py2
-rw-r--r--synapse/rest/client/v2_alpha/account.py7
-rw-r--r--synapse/rest/client/v2_alpha/register.py3
-rw-r--r--synapse/rest/client/v2_alpha/sync.py2
-rw-r--r--synapse/rest/media/v0/content_repository.py3
-rw-r--r--synapse/rest/media/v1/media_repository.py3
-rw-r--r--synapse/rest/media/v1/media_storage.py7
-rw-r--r--synapse/rest/media/v1/preview_url_resource.py3
-rw-r--r--synapse/server.py19
-rw-r--r--synapse/storage/account_data.py3
-rw-r--r--synapse/storage/appservice.py2
-rw-r--r--synapse/storage/background_updates.py6
-rw-r--r--synapse/storage/client_ips.py6
-rw-r--r--synapse/storage/deviceinbox.py13
-rw-r--r--synapse/storage/devices.py3
-rw-r--r--synapse/storage/end_to_end_keys.py3
-rw-r--r--synapse/storage/event_push_actions.py12
-rw-r--r--synapse/storage/events.py12
-rw-r--r--synapse/storage/events_worker.py13
-rw-r--r--synapse/storage/filtering.py3
-rw-r--r--synapse/storage/group_server.py2
-rw-r--r--synapse/storage/push_rule.py3
-rw-r--r--synapse/storage/pusher.py3
-rw-r--r--synapse/storage/receipts.py3
-rw-r--r--synapse/storage/registration.py4
-rw-r--r--synapse/storage/room.py3
-rw-r--r--synapse/storage/roommember.py4
-rw-r--r--synapse/storage/search.py2
-rw-r--r--synapse/storage/state.py60
-rw-r--r--synapse/storage/tags.py3
-rw-r--r--synapse/storage/transactions.py3
-rw-r--r--synapse/util/__init__.py33
-rw-r--r--synapse/util/async.py33
-rw-r--r--synapse/util/caches/dictionary_cache.py25
-rw-r--r--synapse/util/caches/stream_change_cache.py6
-rw-r--r--synapse/util/file_consumer.py16
-rw-r--r--synapse/util/frozenutils.py2
-rw-r--r--synapse/util/logcontext.py15
-rw-r--r--synapse/util/ratelimitutils.py3
92 files changed, 580 insertions, 356 deletions
diff --git a/synapse/__init__.py b/synapse/__init__.py
index 78fc63aa49..faa183a99e 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -16,4 +17,4 @@
 """ This is a reference implementation of a Matrix home server.
 """
 
-__version__ = "0.31.1"
+__version__ = "0.31.2"
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index 06fa38366d..54186695cd 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -488,7 +488,7 @@ class Auth(object):
     def _look_up_user_by_access_token(self, token):
         ret = yield self.store.get_user_by_access_token(token)
         if not ret:
-            logger.warn("Unrecognised access token - not in store: %s" % (token,))
+            logger.warn("Unrecognised access token - not in store.")
             raise AuthError(
                 self.TOKEN_NOT_FOUND_HTTP_STATUS, "Unrecognised access token.",
                 errcode=Codes.UNKNOWN_TOKEN
@@ -511,7 +511,7 @@ class Auth(object):
             )
             service = self.store.get_app_service_by_token(token)
             if not service:
-                logger.warn("Unrecognised appservice access token: %s" % (token,))
+                logger.warn("Unrecognised appservice access token.")
                 raise AuthError(
                     self.TOKEN_NOT_FOUND_HTTP_STATUS,
                     "Unrecognised access token.",
@@ -655,7 +655,7 @@ class Auth(object):
             auth_events[(EventTypes.PowerLevels, "")] = power_level_event
 
         send_level = event_auth.get_send_level(
-            EventTypes.Aliases, "", auth_events
+            EventTypes.Aliases, "", power_level_event,
         )
         user_level = event_auth.get_user_power_level(user_id, auth_events)
 
diff --git a/synapse/api/errors.py b/synapse/api/errors.py
index e6ad3768f0..227a0713b2 100644
--- a/synapse/api/errors.py
+++ b/synapse/api/errors.py
@@ -17,7 +17,8 @@
 
 import logging
 
-import simplejson as json
+from canonicaljson import json
+
 from six import iteritems
 from six.moves import http_client
 
diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py
index dbc0e7e445..aae25e7a47 100644
--- a/synapse/api/filtering.py
+++ b/synapse/api/filtering.py
@@ -17,7 +17,8 @@ from synapse.storage.presence import UserPresenceState
 from synapse.types import UserID, RoomID
 from twisted.internet import defer
 
-import simplejson as json
+from canonicaljson import json
+
 import jsonschema
 from jsonschema import FormatChecker
 
diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py
index dd114dee07..4319ddce03 100644
--- a/synapse/app/appservice.py
+++ b/synapse/app/appservice.py
@@ -23,6 +23,7 @@ from synapse.config._base import ConfigError
 from synapse.config.homeserver import HomeServerConfig
 from synapse.config.logger import setup_logging
 from synapse.http.site import SynapseSite
+from synapse.metrics import RegistryProxy
 from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
 from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
 from synapse.replication.slave.storage.directory import DirectoryStore
@@ -62,7 +63,7 @@ class AppserviceServer(HomeServer):
         for res in listener_config["resources"]:
             for name in res["names"]:
                 if name == "metrics":
-                    resources[METRICS_PREFIX] = MetricsResource(self)
+                    resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
 
         root_resource = create_resource_tree(resources, NoResource())
 
@@ -97,7 +98,7 @@ class AppserviceServer(HomeServer):
             elif listener["type"] == "metrics":
                 if not self.get_config().enable_metrics:
                     logger.warn(("Metrics listener configured, but "
-                                 "collect_metrics is not enabled!"))
+                                 "enable_metrics is not True!"))
                 else:
                     _base.listen_metrics(listener["bind_addresses"],
                                          listener["port"])
diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py
index 85dada7f9f..654ddb8414 100644
--- a/synapse/app/client_reader.py
+++ b/synapse/app/client_reader.py
@@ -122,7 +122,7 @@ class ClientReaderServer(HomeServer):
             elif listener["type"] == "metrics":
                 if not self.get_config().enable_metrics:
                     logger.warn(("Metrics listener configured, but "
-                                 "collect_metrics is not enabled!"))
+                                 "enable_metrics is not True!"))
                 else:
                     _base.listen_metrics(listener["bind_addresses"],
                                          listener["port"])
diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py
index 5ca77c0f1a..441467093a 100644
--- a/synapse/app/event_creator.py
+++ b/synapse/app/event_creator.py
@@ -138,7 +138,7 @@ class EventCreatorServer(HomeServer):
             elif listener["type"] == "metrics":
                 if not self.get_config().enable_metrics:
                     logger.warn(("Metrics listener configured, but "
-                                 "collect_metrics is not enabled!"))
+                                 "enable_metrics is not True!"))
                 else:
                     _base.listen_metrics(listener["bind_addresses"],
                                          listener["port"])
diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py
index 2a1995d0cd..b2415cc671 100644
--- a/synapse/app/federation_reader.py
+++ b/synapse/app/federation_reader.py
@@ -111,7 +111,7 @@ class FederationReaderServer(HomeServer):
             elif listener["type"] == "metrics":
                 if not self.get_config().enable_metrics:
                     logger.warn(("Metrics listener configured, but "
-                                 "collect_metrics is not enabled!"))
+                                 "enable_metrics is not True!"))
                 else:
                     _base.listen_metrics(listener["bind_addresses"],
                                          listener["port"])
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 81ad574043..13d2b70053 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -125,7 +125,7 @@ class FederationSenderServer(HomeServer):
             elif listener["type"] == "metrics":
                 if not self.get_config().enable_metrics:
                     logger.warn(("Metrics listener configured, but "
-                                 "collect_metrics is not enabled!"))
+                                 "enable_metrics is not True!"))
                 else:
                     _base.listen_metrics(listener["bind_addresses"],
                                          listener["port"])
diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py
index 5a164a7a95..d2bae4ad03 100644
--- a/synapse/app/frontend_proxy.py
+++ b/synapse/app/frontend_proxy.py
@@ -176,7 +176,7 @@ class FrontendProxyServer(HomeServer):
             elif listener["type"] == "metrics":
                 if not self.get_config().enable_metrics:
                     logger.warn(("Metrics listener configured, but "
-                                 "collect_metrics is not enabled!"))
+                                 "enable_metrics is not True!"))
                 else:
                     _base.listen_metrics(listener["bind_addresses"],
                                          listener["port"])
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 714f98a3e0..ae5fc751d5 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -266,7 +266,7 @@ class SynapseHomeServer(HomeServer):
             elif listener["type"] == "metrics":
                 if not self.get_config().enable_metrics:
                     logger.warn(("Metrics listener configured, but "
-                                 "collect_metrics is not enabled!"))
+                                 "enable_metrics is not True!"))
                 else:
                     _base.listen_metrics(listener["bind_addresses"],
                                          listener["port"])
@@ -318,11 +318,6 @@ def setup(config_options):
     # check any extra requirements we have now we have a config
     check_requirements(config)
 
-    version_string = "Synapse/" + get_version_string(synapse)
-
-    logger.info("Server hostname: %s", config.server_name)
-    logger.info("Server version: %s", version_string)
-
     events.USE_FROZEN_DICTS = config.use_frozen_dicts
 
     tls_server_context_factory = context_factory.ServerContextFactory(config)
@@ -335,7 +330,7 @@ def setup(config_options):
         db_config=config.database_config,
         tls_server_context_factory=tls_server_context_factory,
         config=config,
-        version_string=version_string,
+        version_string="Synapse/" + get_version_string(synapse),
         database_engine=database_engine,
     )
 
diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py
index 006bba80a8..19a682cce3 100644
--- a/synapse/app/media_repository.py
+++ b/synapse/app/media_repository.py
@@ -118,7 +118,7 @@ class MediaRepositoryServer(HomeServer):
             elif listener["type"] == "metrics":
                 if not self.get_config().enable_metrics:
                     logger.warn(("Metrics listener configured, but "
-                                 "collect_metrics is not enabled!"))
+                                 "enable_metrics is not True!"))
                 else:
                     _base.listen_metrics(listener["bind_addresses"],
                                          listener["port"])
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index 64df47f9cc..13cfbd08b0 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -128,7 +128,7 @@ class PusherServer(HomeServer):
             elif listener["type"] == "metrics":
                 if not self.get_config().enable_metrics:
                     logger.warn(("Metrics listener configured, but "
-                                 "collect_metrics is not enabled!"))
+                                 "enable_metrics is not True!"))
                 else:
                     _base.listen_metrics(listener["bind_addresses"],
                                          listener["port"])
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 6808d6d3e0..82f06ea185 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -305,7 +305,7 @@ class SynchrotronServer(HomeServer):
             elif listener["type"] == "metrics":
                 if not self.get_config().enable_metrics:
                     logger.warn(("Metrics listener configured, but "
-                                 "collect_metrics is not enabled!"))
+                                 "enable_metrics is not True!"))
                 else:
                     _base.listen_metrics(listener["bind_addresses"],
                                          listener["port"])
diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py
index ada1c13cec..f5726e3df6 100644
--- a/synapse/app/user_dir.py
+++ b/synapse/app/user_dir.py
@@ -150,7 +150,7 @@ class UserDirectoryServer(HomeServer):
             elif listener["type"] == "metrics":
                 if not self.get_config().enable_metrics:
                     logger.warn(("Metrics listener configured, but "
-                                 "collect_metrics is not enabled!"))
+                                 "enable_metrics is not True!"))
                 else:
                     _base.listen_metrics(listener["bind_addresses"],
                                          listener["port"])
diff --git a/synapse/config/logger.py b/synapse/config/logger.py
index 6a7228dc2f..557c270fbe 100644
--- a/synapse/config/logger.py
+++ b/synapse/config/logger.py
@@ -12,17 +12,20 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-from ._base import Config
-from synapse.util.logcontext import LoggingContextFilter
-from twisted.logger import globalLogBeginner, STDLibLogObserver
 import logging
 import logging.config
-import yaml
-from string import Template
 import os
 import signal
+from string import Template
+import sys
 
+from twisted.logger import STDLibLogObserver, globalLogBeginner
+import yaml
+
+import synapse
+from synapse.util.logcontext import LoggingContextFilter
+from synapse.util.versionstring import get_version_string
+from ._base import Config
 
 DEFAULT_LOG_CONFIG = Template("""
 version: 1
@@ -202,6 +205,15 @@ def setup_logging(config, use_worker_options=False):
     if getattr(signal, "SIGHUP"):
         signal.signal(signal.SIGHUP, sighup)
 
+    # make sure that the first thing we log is a thing we can grep backwards
+    # for
+    logging.warn("***** STARTING SERVER *****")
+    logging.warn(
+        "Server %s version %s",
+        sys.argv[0], get_version_string(synapse),
+    )
+    logging.info("Server hostname: %s", config.server_name)
+
     # It's critical to point twisted's internal logging somewhere, otherwise it
     # stacks up and leaks kup to 64K object;
     # see: https://twistedmatrix.com/trac/ticket/8164
diff --git a/synapse/crypto/keyclient.py b/synapse/crypto/keyclient.py
index f1fd488b90..2a0eddbea1 100644
--- a/synapse/crypto/keyclient.py
+++ b/synapse/crypto/keyclient.py
@@ -18,7 +18,7 @@ from twisted.web.http import HTTPClient
 from twisted.internet.protocol import Factory
 from twisted.internet import defer, reactor
 from synapse.http.endpoint import matrix_federation_endpoint
-import simplejson as json
+from canonicaljson import json
 import logging
 
 
diff --git a/synapse/event_auth.py b/synapse/event_auth.py
index eaf9cecde6..f512d88145 100644
--- a/synapse/event_auth.py
+++ b/synapse/event_auth.py
@@ -34,9 +34,11 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True):
         event: the event being checked.
         auth_events (dict: event-key -> event): the existing room state.
 
+    Raises:
+        AuthError if the checks fail
 
     Returns:
-        True if the auth checks pass.
+         if the auth checks pass.
     """
     if do_size_check:
         _check_size_limits(event)
@@ -71,7 +73,7 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True):
         # Oh, we don't know what the state of the room was, so we
         # are trusting that this is allowed (at least for now)
         logger.warn("Trusting event: %s", event.event_id)
-        return True
+        return
 
     if event.type == EventTypes.Create:
         room_id_domain = get_domain_from_id(event.room_id)
@@ -81,7 +83,8 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True):
                 "Creation event's room_id domain does not match sender's"
             )
         # FIXME
-        return True
+        logger.debug("Allowing! %s", event)
+        return
 
     creation_event = auth_events.get((EventTypes.Create, ""), None)
 
@@ -118,7 +121,8 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True):
                 403,
                 "Alias event's state_key does not match sender's domain"
             )
-        return True
+        logger.debug("Allowing! %s", event)
+        return
 
     if logger.isEnabledFor(logging.DEBUG):
         logger.debug(
@@ -127,14 +131,9 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True):
         )
 
     if event.type == EventTypes.Member:
-        allowed = _is_membership_change_allowed(
-            event, auth_events
-        )
-        if allowed:
-            logger.debug("Allowing! %s", event)
-        else:
-            logger.debug("Denying! %s", event)
-        return allowed
+        _is_membership_change_allowed(event, auth_events)
+        logger.debug("Allowing! %s", event)
+        return
 
     _check_event_sender_in_room(event, auth_events)
 
@@ -153,7 +152,8 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True):
                 )
             )
         else:
-            return True
+            logger.debug("Allowing! %s", event)
+            return
 
     _can_send_event(event, auth_events)
 
@@ -200,7 +200,7 @@ def _is_membership_change_allowed(event, auth_events):
         create = auth_events.get(key)
         if create and event.prev_events[0][0] == create.event_id:
             if create.content["creator"] == event.state_key:
-                return True
+                return
 
     target_user_id = event.state_key
 
@@ -265,13 +265,13 @@ def _is_membership_change_allowed(event, auth_events):
             raise AuthError(
                 403, "%s is banned from the room" % (target_user_id,)
             )
-        return True
+        return
 
     if Membership.JOIN != membership:
         if (caller_invited
                 and Membership.LEAVE == membership
                 and target_user_id == event.user_id):
-            return True
+            return
 
         if not caller_in_room:  # caller isn't joined
             raise AuthError(
@@ -334,8 +334,6 @@ def _is_membership_change_allowed(event, auth_events):
     else:
         raise AuthError(500, "Unknown membership %s" % membership)
 
-    return True
-
 
 def _check_event_sender_in_room(event, auth_events):
     key = (EventTypes.Member, event.user_id, )
@@ -355,35 +353,46 @@ def _check_joined_room(member, user_id, room_id):
         ))
 
 
-def get_send_level(etype, state_key, auth_events):
-    key = (EventTypes.PowerLevels, "", )
-    send_level_event = auth_events.get(key)
-    send_level = None
-    if send_level_event:
-        send_level = send_level_event.content.get("events", {}).get(
-            etype
-        )
-        if send_level is None:
-            if state_key is not None:
-                send_level = send_level_event.content.get(
-                    "state_default", 50
-                )
-            else:
-                send_level = send_level_event.content.get(
-                    "events_default", 0
-                )
+def get_send_level(etype, state_key, power_levels_event):
+    """Get the power level required to send an event of a given type
+
+    The federation spec [1] refers to this as "Required Power Level".
+
+    https://matrix.org/docs/spec/server_server/unstable.html#definitions
 
-    if send_level:
-        send_level = int(send_level)
+    Args:
+        etype (str): type of event
+        state_key (str|None): state_key of state event, or None if it is not
+            a state event.
+        power_levels_event (synapse.events.EventBase|None): power levels event
+            in force at this point in the room
+    Returns:
+        int: power level required to send this event.
+    """
+
+    if power_levels_event:
+        power_levels_content = power_levels_event.content
     else:
-        send_level = 0
+        power_levels_content = {}
+
+    # see if we have a custom level for this event type
+    send_level = power_levels_content.get("events", {}).get(etype)
+
+    # otherwise, fall back to the state_default/events_default.
+    if send_level is None:
+        if state_key is not None:
+            send_level = power_levels_content.get("state_default", 50)
+        else:
+            send_level = power_levels_content.get("events_default", 0)
 
-    return send_level
+    return int(send_level)
 
 
 def _can_send_event(event, auth_events):
+    power_levels_event = _get_power_level_event(auth_events)
+
     send_level = get_send_level(
-        event.type, event.get("state_key", None), auth_events
+        event.type, event.get("state_key"), power_levels_event,
     )
     user_level = get_user_power_level(event.user_id, auth_events)
 
@@ -524,13 +533,22 @@ def _check_power_levels(event, auth_events):
 
 
 def _get_power_level_event(auth_events):
-    key = (EventTypes.PowerLevels, "", )
-    return auth_events.get(key)
+    return auth_events.get((EventTypes.PowerLevels, ""))
 
 
 def get_user_power_level(user_id, auth_events):
-    power_level_event = _get_power_level_event(auth_events)
+    """Get a user's power level
+
+    Args:
+        user_id (str): user's id to look up in power_levels
+        auth_events (dict[(str, str), synapse.events.EventBase]):
+            state in force at this point in the room (or rather, a subset of
+            it including at least the create event and power levels event.
 
+    Returns:
+        int: the user's power level in this room.
+    """
+    power_level_event = _get_power_level_event(auth_events)
     if power_level_event:
         level = power_level_event.content.get("users", {}).get(user_id)
         if not level:
@@ -541,6 +559,11 @@ def get_user_power_level(user_id, auth_events):
         else:
             return int(level)
     else:
+        # if there is no power levels event, the creator gets 100 and everyone
+        # else gets 0.
+
+        # some things which call this don't pass the create event: hack around
+        # that.
         key = (EventTypes.Create, "", )
         create_event = auth_events.get(key)
         if (create_event is not None and
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index d4dd967c60..a00420a24b 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -15,7 +15,7 @@
 # limitations under the License.
 import logging
 
-import simplejson as json
+from canonicaljson import json
 from twisted.internet import defer
 
 from synapse.api.errors import AuthError, FederationError, SynapseError, NotFoundError
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index f0aeb5a0d3..d72b057e28 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -21,7 +21,6 @@ from .units import Transaction, Edu
 
 from synapse.api.errors import HttpResponseException, FederationDeniedError
 from synapse.util import logcontext, PreserveLoggingContext
-from synapse.util.async import run_on_reactor
 from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
 from synapse.util.metrics import measure_func
 from synapse.handlers.presence import format_user_presence_state, get_interested_remotes
@@ -42,8 +41,11 @@ import logging
 
 logger = logging.getLogger(__name__)
 
-sent_pdus_destination_dist = Counter(
-    "synapse_federation_transaction_queue_sent_pdu_destinations", ""
+sent_pdus_destination_dist_count = Counter(
+    "synapse_federation_client_sent_pdu_destinations:count", ""
+)
+sent_pdus_destination_dist_total = Counter(
+    "synapse_federation_client_sent_pdu_destinations:total", ""
 )
 
 
@@ -280,7 +282,8 @@ class TransactionQueue(object):
         if not destinations:
             return
 
-        sent_pdus_destination_dist.inc(len(destinations))
+        sent_pdus_destination_dist_total.inc(len(destinations))
+        sent_pdus_destination_dist_count.inc()
 
         for destination in destinations:
             self.pending_pdus_by_dest.setdefault(destination, []).append(
@@ -451,9 +454,6 @@ class TransactionQueue(object):
             # hence why we throw the result away.
             yield get_retry_limiter(destination, self.clock, self.store)
 
-            # XXX: what's this for?
-            yield run_on_reactor()
-
             pending_pdus = []
             while True:
                 device_message_edus, device_stream_id, dev_list_id = (
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 912136534d..cbef1f2770 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -13,8 +13,11 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
 from twisted.internet import defer, threads
 
+from canonicaljson import json
+
 from ._base import BaseHandler
 from synapse.api.constants import LoginType
 from synapse.api.errors import (
@@ -23,7 +26,6 @@ from synapse.api.errors import (
 )
 from synapse.module_api import ModuleApi
 from synapse.types import UserID
-from synapse.util.async import run_on_reactor
 from synapse.util.caches.expiringcache import ExpiringCache
 from synapse.util.logcontext import make_deferred_yieldable
 
@@ -32,7 +34,7 @@ from twisted.web.client import PartialDownloadError
 import logging
 import bcrypt
 import pymacaroons
-import simplejson
+import attr
 
 import synapse.util.stringutils as stringutils
 
@@ -402,7 +404,7 @@ class AuthHandler(BaseHandler):
         except PartialDownloadError as pde:
             # Twisted is silly
             data = pde.response
-            resp_body = simplejson.loads(data)
+            resp_body = json.loads(data)
 
         if 'success' in resp_body:
             # Note that we do NOT check the hostname here: we explicitly
@@ -423,15 +425,11 @@ class AuthHandler(BaseHandler):
     def _check_msisdn(self, authdict, _):
         return self._check_threepid('msisdn', authdict)
 
-    @defer.inlineCallbacks
     def _check_dummy_auth(self, authdict, _):
-        yield run_on_reactor()
-        defer.returnValue(True)
+        return defer.succeed(True)
 
     @defer.inlineCallbacks
     def _check_threepid(self, medium, authdict):
-        yield run_on_reactor()
-
         if 'threepid_creds' not in authdict:
             raise LoginError(400, "Missing threepid_creds", Codes.MISSING_PARAM)
 
@@ -858,7 +856,11 @@ class AuthHandler(BaseHandler):
             return bcrypt.hashpw(password.encode('utf8') + self.hs.config.password_pepper,
                                  bcrypt.gensalt(self.bcrypt_rounds))
 
-        return make_deferred_yieldable(threads.deferToThread(_do_hash))
+        return make_deferred_yieldable(
+            threads.deferToThreadPool(
+                self.hs.get_reactor(), self.hs.get_reactor().getThreadPool(), _do_hash
+            ),
+        )
 
     def validate_hash(self, password, stored_hash):
         """Validates that self.hash(password) == stored_hash.
@@ -878,16 +880,21 @@ class AuthHandler(BaseHandler):
             )
 
         if stored_hash:
-            return make_deferred_yieldable(threads.deferToThread(_do_validate_hash))
+            return make_deferred_yieldable(
+                threads.deferToThreadPool(
+                    self.hs.get_reactor(),
+                    self.hs.get_reactor().getThreadPool(),
+                    _do_validate_hash,
+                ),
+            )
         else:
             return defer.succeed(False)
 
 
-class MacaroonGeneartor(object):
-    def __init__(self, hs):
-        self.clock = hs.get_clock()
-        self.server_name = hs.config.server_name
-        self.macaroon_secret_key = hs.config.macaroon_secret_key
+@attr.s
+class MacaroonGenerator(object):
+
+    hs = attr.ib()
 
     def generate_access_token(self, user_id, extra_caveats=None):
         extra_caveats = extra_caveats or []
@@ -905,7 +912,7 @@ class MacaroonGeneartor(object):
     def generate_short_term_login_token(self, user_id, duration_in_ms=(2 * 60 * 1000)):
         macaroon = self._generate_base_macaroon(user_id)
         macaroon.add_first_party_caveat("type = login")
-        now = self.clock.time_msec()
+        now = self.hs.get_clock().time_msec()
         expiry = now + duration_in_ms
         macaroon.add_first_party_caveat("time < %d" % (expiry,))
         return macaroon.serialize()
@@ -917,9 +924,9 @@ class MacaroonGeneartor(object):
 
     def _generate_base_macaroon(self, user_id):
         macaroon = pymacaroons.Macaroon(
-            location=self.server_name,
+            location=self.hs.config.server_name,
             identifier="key",
-            key=self.macaroon_secret_key)
+            key=self.hs.config.macaroon_secret_key)
         macaroon.add_first_party_caveat("gen = 1")
         macaroon.add_first_party_caveat("user_id = %s" % (user_id,))
         return macaroon
diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py
index 404b662469..a84b7b8b80 100644
--- a/synapse/handlers/deactivate_account.py
+++ b/synapse/handlers/deactivate_account.py
@@ -12,7 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-from twisted.internet import defer, reactor
+from twisted.internet import defer
 
 from ._base import BaseHandler
 from synapse.types import UserID, create_requester
@@ -39,7 +39,7 @@ class DeactivateAccountHandler(BaseHandler):
 
         # Start the user parter loop so it can resume parting users from rooms where
         # it left off (if it has work left to do).
-        reactor.callWhenRunning(self._start_user_parting)
+        hs.get_reactor().callWhenRunning(self._start_user_parting)
 
     @defer.inlineCallbacks
     def deactivate_account(self, user_id, erase_data):
@@ -47,6 +47,7 @@ class DeactivateAccountHandler(BaseHandler):
 
         Args:
             user_id (str): ID of user to be deactivated
+            erase_data (bool): whether to GDPR-erase the user's data
 
         Returns:
             Deferred
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 8a2d177539..62b4892a4e 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -14,10 +14,9 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import simplejson as json
 import logging
 
-from canonicaljson import encode_canonical_json
+from canonicaljson import encode_canonical_json, json
 from twisted.internet import defer
 from six import iteritems
 
@@ -80,7 +79,7 @@ class E2eKeysHandler(object):
             else:
                 remote_queries[user_id] = device_ids
 
-        # Firt get local devices.
+        # First get local devices.
         failures = {}
         results = {}
         if local_query:
@@ -357,7 +356,7 @@ def _exception_to_failure(e):
     # include ConnectionRefused and other errors
     #
     # Note that some Exceptions (notably twisted's ResponseFailed etc) don't
-    # give a string for e.message, which simplejson then fails to serialize.
+    # give a string for e.message, which json then fails to serialize.
     return {
         "status": 503, "message": str(e.message),
     }
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 6f4adf102b..b6f8d4cf82 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -39,7 +39,7 @@ from synapse.events.validator import EventValidator
 from synapse.util import unwrapFirstError, logcontext
 from synapse.util.metrics import measure_func
 from synapse.util.logutils import log_function
-from synapse.util.async import run_on_reactor, Linearizer
+from synapse.util.async import Linearizer
 from synapse.util.frozenutils import unfreeze
 from synapse.crypto.event_signing import (
     compute_event_signature, add_hashes_and_signatures,
@@ -460,6 +460,47 @@ class FederationHandler(BaseHandler):
     @measure_func("_filter_events_for_server")
     @defer.inlineCallbacks
     def _filter_events_for_server(self, server_name, room_id, events):
+        """Filter the given events for the given server, redacting those the
+        server can't see.
+
+        Assumes the server is currently in the room.
+
+        Returns
+            list[FrozenEvent]
+        """
+        # First lets check to see if all the events have a history visibility
+        # of "shared" or "world_readable". If thats the case then we don't
+        # need to check membership (as we know the server is in the room).
+        event_to_state_ids = yield self.store.get_state_ids_for_events(
+            frozenset(e.event_id for e in events),
+            types=(
+                (EventTypes.RoomHistoryVisibility, ""),
+            )
+        )
+
+        visibility_ids = set()
+        for sids in event_to_state_ids.itervalues():
+            hist = sids.get((EventTypes.RoomHistoryVisibility, ""))
+            if hist:
+                visibility_ids.add(hist)
+
+        # If we failed to find any history visibility events then the default
+        # is "shared" visiblity.
+        if not visibility_ids:
+            defer.returnValue(events)
+
+        event_map = yield self.store.get_events(visibility_ids)
+        all_open = all(
+            e.content.get("history_visibility") in (None, "shared", "world_readable")
+            for e in event_map.itervalues()
+        )
+
+        if all_open:
+            defer.returnValue(events)
+
+        # Ok, so we're dealing with events that have non-trivial visibility
+        # rules, so we need to also get the memberships of the room.
+
         event_to_state_ids = yield self.store.get_state_ids_for_events(
             frozenset(e.event_id for e in events),
             types=(
@@ -1396,8 +1437,6 @@ class FederationHandler(BaseHandler):
     def get_state_for_pdu(self, room_id, event_id):
         """Returns the state at the event. i.e. not including said event.
         """
-        yield run_on_reactor()
-
         state_groups = yield self.store.get_state_groups(
             room_id, [event_id]
         )
@@ -1440,8 +1479,6 @@ class FederationHandler(BaseHandler):
     def get_state_ids_for_pdu(self, room_id, event_id):
         """Returns the state at the event. i.e. not including said event.
         """
-        yield run_on_reactor()
-
         state_groups = yield self.store.get_state_groups_ids(
             room_id, [event_id]
         )
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index 529400955d..277c2b7760 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -19,7 +19,7 @@
 
 import logging
 
-import simplejson as json
+from canonicaljson import json
 
 from twisted.internet import defer
 
@@ -27,7 +27,6 @@ from synapse.api.errors import (
     MatrixCodeMessageException, CodeMessageException
 )
 from ._base import BaseHandler
-from synapse.util.async import run_on_reactor
 from synapse.api.errors import SynapseError, Codes
 
 logger = logging.getLogger(__name__)
@@ -62,8 +61,6 @@ class IdentityHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def threepid_from_creds(self, creds):
-        yield run_on_reactor()
-
         if 'id_server' in creds:
             id_server = creds['id_server']
         elif 'idServer' in creds:
@@ -106,7 +103,6 @@ class IdentityHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def bind_threepid(self, creds, mxid):
-        yield run_on_reactor()
         logger.debug("binding threepid %r to %s", creds, mxid)
         data = None
 
@@ -188,8 +184,6 @@ class IdentityHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def requestEmailToken(self, id_server, email, client_secret, send_attempt, **kwargs):
-        yield run_on_reactor()
-
         if not self._should_trust_id_server(id_server):
             raise SynapseError(
                 400, "Untrusted ID server '%s'" % id_server,
@@ -224,8 +218,6 @@ class IdentityHandler(BaseHandler):
             self, id_server, country, phone_number,
             client_secret, send_attempt, **kwargs
     ):
-        yield run_on_reactor()
-
         if not self._should_trust_id_server(id_server):
             raise SynapseError(
                 400, "Untrusted ID server '%s'" % id_server,
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 1cb81b6cf8..cbadf3c88e 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -14,13 +14,12 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
-import simplejson
 import sys
 
-from canonicaljson import encode_canonical_json
+from canonicaljson import encode_canonical_json, json
 import six
 from six import string_types, itervalues, iteritems
-from twisted.internet import defer, reactor
+from twisted.internet import defer
 from twisted.internet.defer import succeed
 from twisted.python.failure import Failure
 
@@ -36,7 +35,7 @@ from synapse.events.validator import EventValidator
 from synapse.types import (
     UserID, RoomAlias, RoomStreamToken,
 )
-from synapse.util.async import run_on_reactor, ReadWriteLock, Limiter
+from synapse.util.async import ReadWriteLock, Limiter
 from synapse.util.logcontext import run_in_background
 from synapse.util.metrics import measure_func
 from synapse.util.frozenutils import frozendict_json_encoder
@@ -157,7 +156,7 @@ class MessageHandler(BaseHandler):
             # remove the purge from the list 24 hours after it completes
             def clear_purge():
                 del self._purges_by_id[purge_id]
-            reactor.callLater(24 * 3600, clear_purge)
+            self.hs.get_reactor().callLater(24 * 3600, clear_purge)
 
     def get_purge_status(self, purge_id):
         """Get the current status of an active purge
@@ -491,7 +490,7 @@ class EventCreationHandler(object):
                         target, e
                     )
 
-        is_exempt = yield self._is_exempt_from_privacy_policy(builder)
+        is_exempt = yield self._is_exempt_from_privacy_policy(builder, requester)
         if not is_exempt:
             yield self.assert_accepted_privacy_policy(requester)
 
@@ -509,12 +508,13 @@ class EventCreationHandler(object):
 
         defer.returnValue((event, context))
 
-    def _is_exempt_from_privacy_policy(self, builder):
+    def _is_exempt_from_privacy_policy(self, builder, requester):
         """"Determine if an event to be sent is exempt from having to consent
         to the privacy policy
 
         Args:
             builder (synapse.events.builder.EventBuilder): event being created
+            requester (Requster): user requesting this event
 
         Returns:
             Deferred[bool]: true if the event can be sent without the user
@@ -525,6 +525,9 @@ class EventCreationHandler(object):
             membership = builder.content.get("membership", None)
             if membership == Membership.JOIN:
                 return self._is_server_notices_room(builder.room_id)
+            elif membership == Membership.LEAVE:
+                # the user is always allowed to leave (but not kick people)
+                return builder.state_key == requester.user.to_string()
         return succeed(False)
 
     @defer.inlineCallbacks
@@ -793,7 +796,7 @@ class EventCreationHandler(object):
         # Ensure that we can round trip before trying to persist in db
         try:
             dump = frozendict_json_encoder.encode(event.content)
-            simplejson.loads(dump)
+            json.loads(dump)
         except Exception:
             logger.exception("Failed to encode content: %r", event.content)
             raise
@@ -806,6 +809,7 @@ class EventCreationHandler(object):
             # If we're a worker we need to hit out to the master.
             if self.config.worker_app:
                 yield send_event_to_master(
+                    self.hs.get_clock(),
                     self.http_client,
                     host=self.config.worker_replication_host,
                     port=self.config.worker_replication_http_port,
@@ -959,9 +963,7 @@ class EventCreationHandler(object):
             event_stream_id, max_stream_id
         )
 
-        @defer.inlineCallbacks
         def _notify():
-            yield run_on_reactor()
             try:
                 self.notifier.on_new_room_event(
                     event, event_stream_id, max_stream_id,
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 7fe568132f..7db59fba00 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -22,7 +22,7 @@ The methods that define policy are:
     - should_notify
 """
 
-from twisted.internet import defer, reactor
+from twisted.internet import defer
 from contextlib import contextmanager
 
 from six import itervalues, iteritems
@@ -179,7 +179,7 @@ class PresenceHandler(object):
         # have not yet been persisted
         self.unpersisted_users_changes = set()
 
-        reactor.addSystemEventTrigger("before", "shutdown", self._on_shutdown)
+        hs.get_reactor().addSystemEventTrigger("before", "shutdown", self._on_shutdown)
 
         self.serial_to_user = {}
         self._next_serial = 1
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 7e52adda3c..e76ef5426d 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -24,7 +24,7 @@ from synapse.api.errors import (
 from synapse.http.client import CaptchaServerHttpClient
 from synapse import types
 from synapse.types import UserID, create_requester, RoomID, RoomAlias
-from synapse.util.async import run_on_reactor, Linearizer
+from synapse.util.async import Linearizer
 from synapse.util.threepids import check_3pid_allowed
 from ._base import BaseHandler
 
@@ -139,7 +139,6 @@ class RegistrationHandler(BaseHandler):
         Raises:
             RegistrationError if there was a problem registering.
         """
-        yield run_on_reactor()
         password_hash = None
         if password:
             password_hash = yield self.auth_handler().hash(password)
@@ -431,8 +430,6 @@ class RegistrationHandler(BaseHandler):
         Raises:
             RegistrationError if there was a problem registering.
         """
-        yield run_on_reactor()
-
         if localpart is None:
             raise SynapseError(400, "Request must include user id")
 
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 2abd63ad05..ab72963d87 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -23,7 +23,7 @@ from synapse.types import UserID, RoomAlias, RoomID, RoomStreamToken
 from synapse.api.constants import (
     EventTypes, JoinRules, RoomCreationPreset
 )
-from synapse.api.errors import AuthError, StoreError, SynapseError
+from synapse.api.errors import AuthError, Codes, StoreError, SynapseError
 from synapse.util import stringutils
 from synapse.visibility import filter_events_for_client
 
@@ -115,7 +115,11 @@ class RoomCreationHandler(BaseHandler):
             )
 
             if mapping:
-                raise SynapseError(400, "Room alias already taken")
+                raise SynapseError(
+                    400,
+                    "Room alias already taken",
+                    Codes.ROOM_IN_USE
+                )
         else:
             room_alias = None
 
diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
index 1eca26aa1e..2e3a77ca4b 100644
--- a/synapse/handlers/search.py
+++ b/synapse/handlers/search.py
@@ -64,6 +64,13 @@ class SearchHandler(BaseHandler):
             except Exception:
                 raise SynapseError(400, "Invalid batch")
 
+        logger.info(
+            "Search batch properties: %r, %r, %r",
+            batch_group, batch_group_key, batch_token,
+        )
+
+        logger.info("Search content: %s", content)
+
         try:
             room_cat = content["search_categories"]["room_events"]
 
@@ -271,6 +278,8 @@ class SearchHandler(BaseHandler):
             # We should never get here due to the guard earlier.
             raise NotImplementedError()
 
+        logger.info("Found %d events to return", len(allowed_events))
+
         # If client has asked for "context" for each event (i.e. some surrounding
         # events and state), fetch that
         if event_context is not None:
@@ -282,6 +291,11 @@ class SearchHandler(BaseHandler):
                     event.room_id, event.event_id, before_limit, after_limit
                 )
 
+                logger.info(
+                    "Context for search returned %d and %d events",
+                    len(res["events_before"]), len(res["events_after"]),
+                )
+
                 res["events_before"] = yield filter_events_for_client(
                     self.store, user.to_string(), res["events_before"]
                 )
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 51ec727df0..7f486e48e5 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -145,7 +145,7 @@ class SyncResult(collections.namedtuple("SyncResult", [
     "invited",  # InvitedSyncResult for each invited room.
     "archived",  # ArchivedSyncResult for each archived room.
     "to_device",  # List of direct messages for the device.
-    "device_lists",  # List of user_ids whose devices have chanegd
+    "device_lists",  # List of user_ids whose devices have changed
     "device_one_time_keys_count",  # Dict of algorithm to count for one time keys
                                    # for this device
     "groups",
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index a39f0f7343..7e4a114d4f 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -19,7 +19,6 @@ from twisted.internet import defer
 from synapse.api.constants import EventTypes, JoinRules, Membership
 from synapse.storage.roommember import ProfileInfo
 from synapse.util.metrics import Measure
-from synapse.util.async import sleep
 from synapse.types import get_localpart_from_id
 
 from six import iteritems
@@ -174,7 +173,7 @@ class UserDirectoryHandler(object):
             logger.info("Handling room %d/%d", num_processed_rooms + 1, len(room_ids))
             yield self._handle_initial_room(room_id)
             num_processed_rooms += 1
-            yield sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
+            yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
 
         logger.info("Processed all rooms.")
 
@@ -188,7 +187,7 @@ class UserDirectoryHandler(object):
                 logger.info("Handling user %d/%d", num_processed_users + 1, len(user_ids))
                 yield self._handle_local_user(user_id)
                 num_processed_users += 1
-                yield sleep(self.INITIAL_USER_SLEEP_MS / 1000.)
+                yield self.clock.sleep(self.INITIAL_USER_SLEEP_MS / 1000.)
 
             logger.info("Processed all users")
 
@@ -236,7 +235,7 @@ class UserDirectoryHandler(object):
         count = 0
         for user_id in user_ids:
             if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
-                yield sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
+                yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
 
             if not self.is_mine_id(user_id):
                 count += 1
@@ -251,7 +250,7 @@ class UserDirectoryHandler(object):
                     continue
 
                 if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
-                    yield sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
+                    yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
                 count += 1
 
                 user_set = (user_id, other_user_id)
diff --git a/synapse/http/client.py b/synapse/http/client.py
index 8064a84c5c..5bdc484c15 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -42,7 +42,7 @@ from twisted.web._newclient import ResponseDone
 from six import StringIO
 
 from prometheus_client import Counter
-import simplejson as json
+from canonicaljson import json
 import logging
 import urllib
 
@@ -98,8 +98,8 @@ class SimpleHttpClient(object):
                 method, uri, *args, **kwargs
             )
             add_timeout_to_deferred(
-                request_deferred,
-                60, cancelled_to_request_timed_out_error,
+                request_deferred, 60, self.hs.get_reactor(),
+                cancelled_to_request_timed_out_error,
             )
             response = yield make_deferred_yieldable(request_deferred)
 
@@ -115,7 +115,7 @@ class SimpleHttpClient(object):
                 "Error sending request to  %s %s: %s %s",
                 method, redact_uri(uri), type(e).__name__, e.message
             )
-            raise e
+            raise
 
     @defer.inlineCallbacks
     def post_urlencoded_get_json(self, uri, args={}, headers=None):
diff --git a/synapse/http/endpoint.py b/synapse/http/endpoint.py
index 87a482650d..80da870584 100644
--- a/synapse/http/endpoint.py
+++ b/synapse/http/endpoint.py
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
-from twisted.internet import defer, reactor
+from twisted.internet import defer
 from twisted.internet.error import ConnectError
 from twisted.names import client, dns
 from twisted.names.error import DNSNameError, DomainError
@@ -74,21 +74,22 @@ def matrix_federation_endpoint(reactor, destination, ssl_context_factory=None,
             reactor, "matrix", domain, protocol="tcp",
             default_port=default_port, endpoint=transport_endpoint,
             endpoint_kw_args=endpoint_kw_args
-        ))
+        ), reactor)
     else:
         return _WrappingEndpointFac(transport_endpoint(
             reactor, domain, port, **endpoint_kw_args
-        ))
+        ), reactor)
 
 
 class _WrappingEndpointFac(object):
-    def __init__(self, endpoint_fac):
+    def __init__(self, endpoint_fac, reactor):
         self.endpoint_fac = endpoint_fac
+        self.reactor = reactor
 
     @defer.inlineCallbacks
     def connect(self, protocolFactory):
         conn = yield self.endpoint_fac.connect(protocolFactory)
-        conn = _WrappedConnection(conn)
+        conn = _WrappedConnection(conn, self.reactor)
         defer.returnValue(conn)
 
 
@@ -98,9 +99,10 @@ class _WrappedConnection(object):
     """
     __slots__ = ["conn", "last_request"]
 
-    def __init__(self, conn):
+    def __init__(self, conn, reactor):
         object.__setattr__(self, "conn", conn)
         object.__setattr__(self, "last_request", time.time())
+        self._reactor = reactor
 
     def __getattr__(self, name):
         return getattr(self.conn, name)
@@ -131,14 +133,14 @@ class _WrappedConnection(object):
         # Time this connection out if we haven't send a request in the last
         # N minutes
         # TODO: Cancel the previous callLater?
-        reactor.callLater(3 * 60, self._time_things_out_maybe)
+        self._reactor.callLater(3 * 60, self._time_things_out_maybe)
 
         d = self.conn.request(request)
 
         def update_request_time(res):
             self.last_request = time.time()
             # TODO: Cancel the previous callLater?
-            reactor.callLater(3 * 60, self._time_things_out_maybe)
+            self._reactor.callLater(3 * 60, self._time_things_out_maybe)
             return res
 
         d.addCallback(update_request_time)
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 993dc06e02..2cb9e3e231 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -22,12 +22,12 @@ from twisted.web._newclient import ResponseDone
 from synapse.http import cancelled_to_request_timed_out_error
 from synapse.http.endpoint import matrix_federation_endpoint
 import synapse.metrics
-from synapse.util.async import sleep, add_timeout_to_deferred
+from synapse.util.async import add_timeout_to_deferred
 from synapse.util import logcontext
 from synapse.util.logcontext import make_deferred_yieldable
 import synapse.util.retryutils
 
-from canonicaljson import encode_canonical_json
+from canonicaljson import encode_canonical_json, json
 
 from synapse.api.errors import (
     SynapseError, Codes, HttpResponseException, FederationDeniedError,
@@ -36,7 +36,6 @@ from synapse.api.errors import (
 from signedjson.sign import sign_json
 
 import cgi
-import simplejson as json
 import logging
 import random
 import sys
@@ -193,6 +192,7 @@ class MatrixFederationHttpClient(object):
                         add_timeout_to_deferred(
                             request_deferred,
                             timeout / 1000. if timeout else 60,
+                            self.hs.get_reactor(),
                             cancelled_to_request_timed_out_error,
                         )
                         response = yield make_deferred_yieldable(
@@ -234,7 +234,7 @@ class MatrixFederationHttpClient(object):
                                 delay = min(delay, 2)
                                 delay *= random.uniform(0.8, 1.4)
 
-                            yield sleep(delay)
+                            yield self.clock.sleep(delay)
                             retries_left -= 1
                         else:
                             raise
diff --git a/synapse/http/request_metrics.py b/synapse/http/request_metrics.py
index dc06f6c443..1b711ca2de 100644
--- a/synapse/http/request_metrics.py
+++ b/synapse/http/request_metrics.py
@@ -117,13 +117,17 @@ def _get_in_flight_counts():
     Returns:
         dict[tuple[str, str], int]
     """
-    for rm in _in_flight_requests:
+    # Cast to a list to prevent it changing while the Prometheus
+    # thread is collecting metrics
+    reqs = list(_in_flight_requests)
+
+    for rm in reqs:
         rm.update_metrics()
 
     # Map from (method, name) -> int, the number of in flight requests of that
     # type
     counts = {}
-    for rm in _in_flight_requests:
+    for rm in reqs:
         key = (rm.method, rm.name,)
         counts[key] = counts.get(key, 0) + 1
 
@@ -131,7 +135,7 @@ def _get_in_flight_counts():
 
 
 LaterGauge(
-    "synapse_http_request_metrics_in_flight_requests_count",
+    "synapse_http_server_in_flight_requests_count",
     "",
     ["method", "servlet"],
     _get_in_flight_counts,
diff --git a/synapse/http/server.py b/synapse/http/server.py
index bc09b8b2be..517aaf7b5a 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -29,7 +29,7 @@ import synapse.metrics
 import synapse.events
 
 from canonicaljson import (
-    encode_canonical_json, encode_pretty_printed_json
+    encode_canonical_json, encode_pretty_printed_json, json
 )
 
 from twisted.internet import defer
@@ -41,7 +41,6 @@ from twisted.web.util import redirectTo
 import collections
 import logging
 import urllib
-import simplejson
 
 logger = logging.getLogger(__name__)
 
@@ -410,7 +409,7 @@ def respond_with_json(request, code, json_object, send_cors=False,
         if canonical_json or synapse.events.USE_FROZEN_DICTS:
             json_bytes = encode_canonical_json(json_object)
         else:
-            json_bytes = simplejson.dumps(json_object)
+            json_bytes = json.dumps(json_object)
 
     return respond_with_json_bytes(
         request, code, json_bytes,
diff --git a/synapse/http/servlet.py b/synapse/http/servlet.py
index ef8e62901b..ef3a01ddc7 100644
--- a/synapse/http/servlet.py
+++ b/synapse/http/servlet.py
@@ -18,7 +18,9 @@
 from synapse.api.errors import SynapseError, Codes
 
 import logging
-import simplejson
+
+from canonicaljson import json
+
 
 logger = logging.getLogger(__name__)
 
@@ -171,7 +173,7 @@ def parse_json_value_from_request(request, allow_empty_body=False):
         return None
 
     try:
-        content = simplejson.loads(content_bytes)
+        content = json.loads(content_bytes)
     except Exception as e:
         logger.warn("Unable to parse JSON: %s", e)
         raise SynapseError(400, "Content not JSON.", errcode=Codes.NOT_JSON)
diff --git a/synapse/http/site.py b/synapse/http/site.py
index 2664006f8c..74a752d6cf 100644
--- a/synapse/http/site.py
+++ b/synapse/http/site.py
@@ -99,16 +99,18 @@ class SynapseRequest(Request):
             db_txn_count = context.db_txn_count
             db_txn_duration_sec = context.db_txn_duration_sec
             db_sched_duration_sec = context.db_sched_duration_sec
+            evt_db_fetch_count = context.evt_db_fetch_count
         except Exception:
             ru_utime, ru_stime = (0, 0)
             db_txn_count, db_txn_duration_sec = (0, 0)
+            evt_db_fetch_count = 0
 
         end_time = time.time()
 
         self.site.access_logger.info(
             "%s - %s - {%s}"
             " Processed request: %.3fsec (%.3fsec, %.3fsec) (%.3fsec/%.3fsec/%d)"
-            " %sB %s \"%s %s %s\" \"%s\"",
+            " %sB %s \"%s %s %s\" \"%s\" [%d dbevts]",
             self.getClientIP(),
             self.site.site_tag,
             self.authenticated_entity,
@@ -124,6 +126,7 @@ class SynapseRequest(Request):
             self.get_redacted_uri(),
             self.clientproto,
             self.get_user_agent(),
+            evt_db_fetch_count,
         )
 
         try:
diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py
index 429e79c472..2d2397caae 100644
--- a/synapse/metrics/__init__.py
+++ b/synapse/metrics/__init__.py
@@ -62,7 +62,7 @@ class LaterGauge(object):
             calls = self.caller()
         except Exception:
             logger.exception(
-                "Exception running callback for LaterGuage(%s)",
+                "Exception running callback for LaterGauge(%s)",
                 self.name,
             )
             yield g
@@ -140,14 +140,15 @@ gc_time = Histogram(
 class GCCounts(object):
 
     def collect(self):
-        cm = GaugeMetricFamily("python_gc_counts", "GC cycle counts", labels=["gen"])
+        cm = GaugeMetricFamily("python_gc_counts", "GC object counts", labels=["gen"])
         for n, m in enumerate(gc.get_count()):
             cm.add_metric([str(n)], m)
 
         yield cm
 
 
-REGISTRY.register(GCCounts())
+if not running_on_pypy:
+    REGISTRY.register(GCCounts())
 
 #
 # Twisted reactor metrics
@@ -190,6 +191,22 @@ event_processing_last_ts = Gauge("synapse_event_processing_last_ts", "", ["name"
 # finished being processed.
 event_processing_lag = Gauge("synapse_event_processing_lag", "", ["name"])
 
+last_ticked = time.time()
+
+
+class ReactorLastSeenMetric(object):
+
+    def collect(self):
+        cm = GaugeMetricFamily(
+            "python_twisted_reactor_last_seen",
+            "Seconds since the Twisted reactor was last seen",
+        )
+        cm.add_metric([], time.time() - last_ticked)
+        yield cm
+
+
+REGISTRY.register(ReactorLastSeenMetric())
+
 
 def runUntilCurrentTimer(func):
 
@@ -222,6 +239,11 @@ def runUntilCurrentTimer(func):
         tick_time.observe(end - start)
         pending_calls_metric.observe(num_pending)
 
+        # Update the time we last ticked, for the metric to test whether
+        # Synapse's reactor has frozen
+        global last_ticked
+        last_ticked = end
+
         if running_on_pypy:
             return ret
 
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 6dce20a284..3c0622a294 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -161,6 +161,7 @@ class Notifier(object):
         self.user_to_user_stream = {}
         self.room_to_user_streams = {}
 
+        self.hs = hs
         self.event_sources = hs.get_event_sources()
         self.store = hs.get_datastore()
         self.pending_new_room_events = []
@@ -340,6 +341,7 @@ class Notifier(object):
                     add_timeout_to_deferred(
                         listener.deferred,
                         (end_time - now) / 1000.,
+                        self.hs.get_reactor(),
                     )
                     with PreserveLoggingContext():
                         yield listener.deferred
@@ -561,6 +563,7 @@ class Notifier(object):
             add_timeout_to_deferred(
                 listener.deferred.addTimeout,
                 (end_time - now) / 1000.,
+                self.hs.get_reactor(),
             )
             try:
                 with PreserveLoggingContext():
diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py
index ba7286cb72..52d4f087ee 100644
--- a/synapse/push/emailpusher.py
+++ b/synapse/push/emailpusher.py
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from twisted.internet import defer, reactor
+from twisted.internet import defer
 from twisted.internet.error import AlreadyCalled, AlreadyCancelled
 
 import logging
@@ -199,7 +199,7 @@ class EmailPusher(object):
                     self.timed_call = None
 
         if soonest_due_at is not None:
-            self.timed_call = reactor.callLater(
+            self.timed_call = self.hs.get_reactor().callLater(
                 self.seconds_until(soonest_due_at), self.on_timer
             )
 
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index bf7ff74a1a..7a481b5a1e 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -15,7 +15,7 @@
 # limitations under the License.
 import logging
 
-from twisted.internet import defer, reactor
+from twisted.internet import defer
 from twisted.internet.error import AlreadyCalled, AlreadyCancelled
 
 from . import push_rule_evaluator
@@ -220,7 +220,9 @@ class HttpPusher(object):
                     )
                 else:
                     logger.info("Push failed: delaying for %ds", self.backoff_delay)
-                    self.timed_call = reactor.callLater(self.backoff_delay, self.on_timer)
+                    self.timed_call = self.hs.get_reactor().callLater(
+                        self.backoff_delay, self.on_timer
+                    )
                     self.backoff_delay = min(self.backoff_delay * 2, self.MAX_BACKOFF_SEC)
                     break
 
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 750d11ca38..36bb5bbc65 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -19,7 +19,6 @@ import logging
 from twisted.internet import defer
 
 from synapse.push.pusher import PusherFactory
-from synapse.util.async import run_on_reactor
 from synapse.util.logcontext import make_deferred_yieldable, run_in_background
 
 logger = logging.getLogger(__name__)
@@ -125,7 +124,6 @@ class PusherPool:
 
     @defer.inlineCallbacks
     def on_new_notifications(self, min_stream_id, max_stream_id):
-        yield run_on_reactor()
         try:
             users_affected = yield self.store.get_push_action_users_in_range(
                 min_stream_id, max_stream_id
@@ -151,7 +149,6 @@ class PusherPool:
 
     @defer.inlineCallbacks
     def on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids):
-        yield run_on_reactor()
         try:
             # Need to subtract 1 from the minimum because the lower bound here
             # is not inclusive
diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py
index a9baa2c1c3..f080f96cc1 100644
--- a/synapse/replication/http/send_event.py
+++ b/synapse/replication/http/send_event.py
@@ -21,7 +21,6 @@ from synapse.api.errors import (
 from synapse.events import FrozenEvent
 from synapse.events.snapshot import EventContext
 from synapse.http.servlet import RestServlet, parse_json_object_from_request
-from synapse.util.async import sleep
 from synapse.util.caches.response_cache import ResponseCache
 from synapse.util.metrics import Measure
 from synapse.types import Requester, UserID
@@ -33,11 +32,12 @@ logger = logging.getLogger(__name__)
 
 
 @defer.inlineCallbacks
-def send_event_to_master(client, host, port, requester, event, context,
+def send_event_to_master(clock, client, host, port, requester, event, context,
                          ratelimit, extra_users):
     """Send event to be handled on the master
 
     Args:
+        clock (synapse.util.Clock)
         client (SimpleHttpClient)
         host (str): host of master
         port (int): port on master listening for HTTP replication
@@ -77,7 +77,7 @@ def send_event_to_master(client, host, port, requester, event, context,
 
             # If we timed out we probably don't need to worry about backing
             # off too much, but lets just wait a little anyway.
-            yield sleep(1)
+            yield clock.sleep(1)
     except MatrixCodeMessageException as e:
         # We convert to SynapseError as we know that it was a SynapseError
         # on the master process that we should send to the client. (And
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index b1f64ef0d8..97d3196633 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -23,6 +23,7 @@ from synapse.storage.roommember import RoomMemberWorkerStore
 from synapse.storage.state import StateGroupWorkerStore
 from synapse.storage.stream import StreamWorkerStore
 from synapse.storage.signatures import SignatureWorkerStore
+from synapse.storage.user_erasure_store import UserErasureWorkerStore
 from ._base import BaseSlavedStore
 from ._slaved_id_tracker import SlavedIdTracker
 
@@ -45,6 +46,7 @@ class SlavedEventStore(EventFederationWorkerStore,
                        EventsWorkerStore,
                        StateGroupWorkerStore,
                        SignatureWorkerStore,
+                       UserErasureWorkerStore,
                        BaseSlavedStore):
 
     def __init__(self, db_conn, hs):
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index 6d2513c4e2..bb852b00af 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -15,7 +15,7 @@
 """A replication client for use by synapse workers.
 """
 
-from twisted.internet import reactor, defer
+from twisted.internet import defer
 from twisted.internet.protocol import ReconnectingClientFactory
 
 from .commands import (
@@ -44,7 +44,7 @@ class ReplicationClientFactory(ReconnectingClientFactory):
         self.server_name = hs.config.server_name
         self._clock = hs.get_clock()  # As self.clock is defined in super class
 
-        reactor.addSystemEventTrigger("before", "shutdown", self.stopTrying)
+        hs.get_reactor().addSystemEventTrigger("before", "shutdown", self.stopTrying)
 
     def startedConnecting(self, connector):
         logger.info("Connecting to replication: %r", connector.getDestination())
@@ -95,7 +95,7 @@ class ReplicationClientHandler(object):
         factory = ReplicationClientFactory(hs, client_name, self)
         host = hs.config.worker_replication_host
         port = hs.config.worker_replication_port
-        reactor.connectTCP(host, port, factory)
+        hs.get_reactor().connectTCP(host, port, factory)
 
     def on_rdata(self, stream_name, token, rows):
         """Called when we get new replication data. By default this just pokes
diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
index 12aac3cc6b..f3908df642 100644
--- a/synapse/replication/tcp/commands.py
+++ b/synapse/replication/tcp/commands.py
@@ -19,13 +19,17 @@ allowed to be sent by which side.
 """
 
 import logging
-import simplejson
+import platform
 
+if platform.python_implementation() == "PyPy":
+    import json
+    _json_encoder = json.JSONEncoder()
+else:
+    import simplejson as json
+    _json_encoder = json.JSONEncoder(namedtuple_as_object=False)
 
 logger = logging.getLogger(__name__)
 
-_json_encoder = simplejson.JSONEncoder(namedtuple_as_object=False)
-
 
 class Command(object):
     """The base command class.
@@ -102,7 +106,7 @@ class RdataCommand(Command):
         return cls(
             stream_name,
             None if token == "batch" else int(token),
-            simplejson.loads(row_json)
+            json.loads(row_json)
         )
 
     def to_line(self):
@@ -300,7 +304,7 @@ class InvalidateCacheCommand(Command):
     def from_line(cls, line):
         cache_func, keys_json = line.split(" ", 1)
 
-        return cls(cache_func, simplejson.loads(keys_json))
+        return cls(cache_func, json.loads(keys_json))
 
     def to_line(self):
         return " ".join((
@@ -329,7 +333,7 @@ class UserIpCommand(Command):
     def from_line(cls, line):
         user_id, jsn = line.split(" ", 1)
 
-        access_token, ip, user_agent, device_id, last_seen = simplejson.loads(jsn)
+        access_token, ip, user_agent, device_id, last_seen = json.loads(jsn)
 
         return cls(
             user_id, access_token, ip, user_agent, device_id, last_seen
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index c870475cd1..171a698e14 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -564,11 +564,13 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
 # The following simply registers metrics for the replication connections
 
 pending_commands = LaterGauge(
-    "pending_commands", "", ["name", "conn_id"],
+    "synapse_replication_tcp_protocol_pending_commands",
+    "",
+    ["name", "conn_id"],
     lambda: {
-        (p.name, p.conn_id): len(p.pending_commands)
-        for p in connected_connections
-    })
+        (p.name, p.conn_id): len(p.pending_commands) for p in connected_connections
+    },
+)
 
 
 def transport_buffer_size(protocol):
@@ -579,11 +581,13 @@ def transport_buffer_size(protocol):
 
 
 transport_send_buffer = LaterGauge(
-    "synapse_replication_tcp_transport_send_buffer", "", ["name", "conn_id"],
+    "synapse_replication_tcp_protocol_transport_send_buffer",
+    "",
+    ["name", "conn_id"],
     lambda: {
-        (p.name, p.conn_id): transport_buffer_size(p)
-        for p in connected_connections
-    })
+        (p.name, p.conn_id): transport_buffer_size(p) for p in connected_connections
+    },
+)
 
 
 def transport_kernel_read_buffer_size(protocol, read=True):
@@ -602,37 +606,50 @@ def transport_kernel_read_buffer_size(protocol, read=True):
 
 
 tcp_transport_kernel_send_buffer = LaterGauge(
-    "synapse_replication_tcp_transport_kernel_send_buffer", "", ["name", "conn_id"],
+    "synapse_replication_tcp_protocol_transport_kernel_send_buffer",
+    "",
+    ["name", "conn_id"],
     lambda: {
         (p.name, p.conn_id): transport_kernel_read_buffer_size(p, False)
         for p in connected_connections
-    })
+    },
+)
 
 
 tcp_transport_kernel_read_buffer = LaterGauge(
-    "synapse_replication_tcp_transport_kernel_read_buffer", "", ["name", "conn_id"],
+    "synapse_replication_tcp_protocol_transport_kernel_read_buffer",
+    "",
+    ["name", "conn_id"],
     lambda: {
         (p.name, p.conn_id): transport_kernel_read_buffer_size(p, True)
         for p in connected_connections
-    })
+    },
+)
 
 
 tcp_inbound_commands = LaterGauge(
-    "synapse_replication_tcp_inbound_commands", "", ["command", "name", "conn_id"],
+    "synapse_replication_tcp_protocol_inbound_commands",
+    "",
+    ["command", "name", "conn_id"],
     lambda: {
         (k[0], p.name, p.conn_id): count
         for p in connected_connections
         for k, count in iteritems(p.inbound_commands_counter)
-    })
+    },
+)
 
 tcp_outbound_commands = LaterGauge(
-    "synapse_replication_tcp_outbound_commands", "", ["command", "name", "conn_id"],
+    "synapse_replication_tcp_protocol_outbound_commands",
+    "",
+    ["command", "name", "conn_id"],
     lambda: {
         (k[0], p.name, p.conn_id): count
         for p in connected_connections
         for k, count in iteritems(p.outbound_commands_counter)
-    })
+    },
+)
 
 # number of updates received for each RDATA stream
-inbound_rdata_count = Counter("synapse_replication_tcp_inbound_rdata_count", "",
-                              ["stream_name"])
+inbound_rdata_count = Counter(
+    "synapse_replication_tcp_protocol_inbound_rdata_count", "", ["stream_name"]
+)
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 63bd6d2652..95ad8c1b4c 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -15,7 +15,7 @@
 """The server side of the replication stream.
 """
 
-from twisted.internet import defer, reactor
+from twisted.internet import defer
 from twisted.internet.protocol import Factory
 
 from .streams import STREAMS_MAP, FederationStream
@@ -109,7 +109,7 @@ class ReplicationStreamer(object):
         self.is_looping = False
         self.pending_updates = False
 
-        reactor.addSystemEventTrigger("before", "shutdown", self.on_shutdown)
+        hs.get_reactor().addSystemEventTrigger("before", "shutdown", self.on_shutdown)
 
     def on_shutdown(self):
         # close all connections on shutdown
diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py
index ddaedb2a8c..8fb08dc526 100644
--- a/synapse/rest/client/v1/admin.py
+++ b/synapse/rest/client/v1/admin.py
@@ -16,6 +16,8 @@
 
 from twisted.internet import defer
 
+from six.moves import http_client
+
 from synapse.api.constants import Membership
 from synapse.api.errors import AuthError, SynapseError, Codes, NotFoundError
 from synapse.types import UserID, create_requester
@@ -247,6 +249,15 @@ class DeactivateAccountRestServlet(ClientV1RestServlet):
 
     @defer.inlineCallbacks
     def on_POST(self, request, target_user_id):
+        body = parse_json_object_from_request(request, allow_empty_body=True)
+        erase = body.get("erase", False)
+        if not isinstance(erase, bool):
+            raise SynapseError(
+                http_client.BAD_REQUEST,
+                "Param 'erase' must be a boolean, if given",
+                Codes.BAD_JSON,
+            )
+
         UserID.from_string(target_user_id)
         requester = yield self.auth.get_user_by_req(request)
         is_admin = yield self.auth.is_server_admin(requester.user)
@@ -255,7 +266,7 @@ class DeactivateAccountRestServlet(ClientV1RestServlet):
             raise AuthError(403, "You are not a server admin")
 
         yield self._deactivate_account_handler.deactivate_account(
-            target_user_id, False,
+            target_user_id, erase,
         )
         defer.returnValue((200, {}))
 
diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py
index 34df5be4e9..88ca5184cd 100644
--- a/synapse/rest/client/v1/login.py
+++ b/synapse/rest/client/v1/login.py
@@ -23,7 +23,8 @@ from synapse.util.msisdn import phone_number_to_msisdn
 
 from .base import ClientV1RestServlet, client_path_patterns
 
-import simplejson as json
+from canonicaljson import json
+
 import urllib
 from six.moves.urllib import parse as urlparse
 
diff --git a/synapse/rest/client/v1/register.py b/synapse/rest/client/v1/register.py
index 9b3022e0b0..c10320dedf 100644
--- a/synapse/rest/client/v1/register.py
+++ b/synapse/rest/client/v1/register.py
@@ -24,8 +24,6 @@ import synapse.util.stringutils as stringutils
 from synapse.http.servlet import parse_json_object_from_request
 from synapse.types import create_requester
 
-from synapse.util.async import run_on_reactor
-
 from hashlib import sha1
 import hmac
 import logging
@@ -272,7 +270,6 @@ class RegisterRestServlet(ClientV1RestServlet):
 
     @defer.inlineCallbacks
     def _do_password(self, request, register_json, session):
-        yield run_on_reactor()
         if (self.hs.config.enable_registration_captcha and
                 not session[LoginType.RECAPTCHA]):
             # captcha should've been done by this stage!
@@ -333,8 +330,6 @@ class RegisterRestServlet(ClientV1RestServlet):
 
     @defer.inlineCallbacks
     def _do_shared_secret(self, request, register_json, session):
-        yield run_on_reactor()
-
         if not isinstance(register_json.get("mac", None), string_types):
             raise SynapseError(400, "Expected mac.")
         if not isinstance(register_json.get("user", None), string_types):
@@ -423,8 +418,6 @@ class CreateUserRestServlet(ClientV1RestServlet):
 
     @defer.inlineCallbacks
     def _do_create(self, requester, user_json):
-        yield run_on_reactor()
-
         if "localpart" not in user_json:
             raise SynapseError(400, "Expected 'localpart' key.")
 
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index 0b984987ed..e6ae5db79b 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -31,7 +31,7 @@ from synapse.http.servlet import (
 from six.moves.urllib import parse as urlparse
 
 import logging
-import simplejson as json
+from canonicaljson import json
 
 logger = logging.getLogger(__name__)
 
diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py
index 3b822c0cb4..80dbc3c92e 100644
--- a/synapse/rest/client/v2_alpha/account.py
+++ b/synapse/rest/client/v2_alpha/account.py
@@ -26,7 +26,6 @@ from synapse.http.servlet import (
     RestServlet, assert_params_in_request,
     parse_json_object_from_request,
 )
-from synapse.util.async import run_on_reactor
 from synapse.util.msisdn import phone_number_to_msisdn
 from synapse.util.threepids import check_3pid_allowed
 from ._base import client_v2_patterns, interactive_auth_handler
@@ -309,8 +308,6 @@ class ThreepidRestServlet(RestServlet):
 
     @defer.inlineCallbacks
     def on_GET(self, request):
-        yield run_on_reactor()
-
         requester = yield self.auth.get_user_by_req(request)
 
         threepids = yield self.datastore.user_get_threepids(
@@ -321,8 +318,6 @@ class ThreepidRestServlet(RestServlet):
 
     @defer.inlineCallbacks
     def on_POST(self, request):
-        yield run_on_reactor()
-
         body = parse_json_object_from_request(request)
 
         threePidCreds = body.get('threePidCreds')
@@ -374,8 +369,6 @@ class ThreepidDeleteRestServlet(RestServlet):
 
     @defer.inlineCallbacks
     def on_POST(self, request):
-        yield run_on_reactor()
-
         body = parse_json_object_from_request(request)
 
         required = ['medium', 'address']
diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py
index 5cab00aea9..97e7c0f7c6 100644
--- a/synapse/rest/client/v2_alpha/register.py
+++ b/synapse/rest/client/v2_alpha/register.py
@@ -32,7 +32,6 @@ from ._base import client_v2_patterns, interactive_auth_handler
 import logging
 import hmac
 from hashlib import sha1
-from synapse.util.async import run_on_reactor
 from synapse.util.ratelimitutils import FederationRateLimiter
 
 from six import string_types
@@ -191,8 +190,6 @@ class RegisterRestServlet(RestServlet):
     @interactive_auth_handler
     @defer.inlineCallbacks
     def on_POST(self, request):
-        yield run_on_reactor()
-
         body = parse_json_object_from_request(request)
 
         kind = "user"
diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py
index a291cffbf1..d2aa47b326 100644
--- a/synapse/rest/client/v2_alpha/sync.py
+++ b/synapse/rest/client/v2_alpha/sync.py
@@ -33,7 +33,7 @@ from ._base import set_timeline_upper_limit
 import itertools
 import logging
 
-import simplejson as json
+from canonicaljson import json
 
 logger = logging.getLogger(__name__)
 
diff --git a/synapse/rest/media/v0/content_repository.py b/synapse/rest/media/v0/content_repository.py
index 956bd5da75..e44d4276d2 100644
--- a/synapse/rest/media/v0/content_repository.py
+++ b/synapse/rest/media/v0/content_repository.py
@@ -22,8 +22,9 @@ from synapse.api.errors import (
 from twisted.protocols.basic import FileSender
 from twisted.web import server, resource
 
+from canonicaljson import json
+
 import base64
-import simplejson as json
 import logging
 import os
 import re
diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py
index e592eef368..a9f84365b0 100644
--- a/synapse/rest/media/v1/media_repository.py
+++ b/synapse/rest/media/v1/media_repository.py
@@ -58,6 +58,7 @@ UPDATE_RECENTLY_ACCESSED_TS = 60 * 1000
 
 class MediaRepository(object):
     def __init__(self, hs):
+        self.hs = hs
         self.auth = hs.get_auth()
         self.client = MatrixFederationHttpClient(hs)
         self.clock = hs.get_clock()
@@ -94,7 +95,7 @@ class MediaRepository(object):
             storage_providers.append(provider)
 
         self.media_storage = MediaStorage(
-            self.primary_base_path, self.filepaths, storage_providers,
+            self.hs, self.primary_base_path, self.filepaths, storage_providers,
         )
 
         self.clock.looping_call(
diff --git a/synapse/rest/media/v1/media_storage.py b/synapse/rest/media/v1/media_storage.py
index d23fe10b07..d6b8ebbedb 100644
--- a/synapse/rest/media/v1/media_storage.py
+++ b/synapse/rest/media/v1/media_storage.py
@@ -37,13 +37,15 @@ class MediaStorage(object):
     """Responsible for storing/fetching files from local sources.
 
     Args:
+        hs (synapse.server.Homeserver)
         local_media_directory (str): Base path where we store media on disk
         filepaths (MediaFilePaths)
         storage_providers ([StorageProvider]): List of StorageProvider that are
             used to fetch and store files.
     """
 
-    def __init__(self, local_media_directory, filepaths, storage_providers):
+    def __init__(self, hs, local_media_directory, filepaths, storage_providers):
+        self.hs = hs
         self.local_media_directory = local_media_directory
         self.filepaths = filepaths
         self.storage_providers = storage_providers
@@ -175,7 +177,8 @@ class MediaStorage(object):
             res = yield provider.fetch(path, file_info)
             if res:
                 with res:
-                    consumer = BackgroundFileConsumer(open(local_path, "w"))
+                    consumer = BackgroundFileConsumer(
+                        open(local_path, "w"), self.hs.get_reactor())
                     yield res.write_to_consumer(consumer)
                     yield consumer.wait()
                 defer.returnValue(local_path)
diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py
index 565cef2b8d..adca490640 100644
--- a/synapse/rest/media/v1/preview_url_resource.py
+++ b/synapse/rest/media/v1/preview_url_resource.py
@@ -23,7 +23,8 @@ import re
 import shutil
 import sys
 import traceback
-import simplejson as json
+
+from canonicaljson import json
 
 from six.moves import urllib_parse as urlparse
 from six import string_types
diff --git a/synapse/server.py b/synapse/server.py
index 58dbf78437..c29c19289a 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -40,7 +40,7 @@ from synapse.federation.transport.client import TransportLayerClient
 from synapse.federation.transaction_queue import TransactionQueue
 from synapse.handlers import Handlers
 from synapse.handlers.appservice import ApplicationServicesHandler
-from synapse.handlers.auth import AuthHandler, MacaroonGeneartor
+from synapse.handlers.auth import AuthHandler, MacaroonGenerator
 from synapse.handlers.deactivate_account import DeactivateAccountHandler
 from synapse.handlers.devicemessage import DeviceMessageHandler
 from synapse.handlers.device import DeviceHandler
@@ -165,15 +165,19 @@ class HomeServer(object):
         'server_notices_sender',
     ]
 
-    def __init__(self, hostname, **kwargs):
+    def __init__(self, hostname, reactor=None, **kwargs):
         """
         Args:
             hostname : The hostname for the server.
         """
+        if not reactor:
+            from twisted.internet import reactor
+
+        self._reactor = reactor
         self.hostname = hostname
         self._building = {}
 
-        self.clock = Clock()
+        self.clock = Clock(reactor)
         self.distributor = Distributor()
         self.ratelimiter = Ratelimiter()
 
@@ -186,6 +190,12 @@ class HomeServer(object):
         self.datastore = DataStore(self.get_db_conn(), self)
         logger.info("Finished setting up.")
 
+    def get_reactor(self):
+        """
+        Fetch the Twisted reactor in use by this HomeServer.
+        """
+        return self._reactor
+
     def get_ip_from_request(self, request):
         # X-Forwarded-For is handled by our custom request type.
         return request.getClientIP()
@@ -261,7 +271,7 @@ class HomeServer(object):
         return AuthHandler(self)
 
     def build_macaroon_generator(self):
-        return MacaroonGeneartor(self)
+        return MacaroonGenerator(self)
 
     def build_device_handler(self):
         return DeviceHandler(self)
@@ -328,6 +338,7 @@ class HomeServer(object):
 
         return adbapi.ConnectionPool(
             name,
+            cp_reactor=self.get_reactor(),
             **self.db_config.get("args", {})
         )
 
diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py
index 284ec3c970..7034a61399 100644
--- a/synapse/storage/account_data.py
+++ b/synapse/storage/account_data.py
@@ -22,8 +22,9 @@ from synapse.storage.util.id_generators import StreamIdGenerator
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
 
+from canonicaljson import json
+
 import abc
-import simplejson as json
 import logging
 
 logger = logging.getLogger(__name__)
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index 12ea8a158c..4d32d0bdf6 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -15,8 +15,8 @@
 # limitations under the License.
 import logging
 import re
-import simplejson as json
 from twisted.internet import defer
+from canonicaljson import json
 
 from synapse.appservice import AppServiceTransaction
 from synapse.config.appservice import load_appservices
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index 8af325a9f5..af18964510 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -12,14 +12,14 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-import synapse.util.async
 
 from ._base import SQLBaseStore
 from . import engines
 
 from twisted.internet import defer
 
-import simplejson as json
+from canonicaljson import json
+
 import logging
 
 logger = logging.getLogger(__name__)
@@ -92,7 +92,7 @@ class BackgroundUpdateStore(SQLBaseStore):
         logger.info("Starting background schema updates")
 
         while True:
-            yield synapse.util.async.sleep(
+            yield self.hs.get_clock().sleep(
                 self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.)
 
             try:
diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py
index ce338514e8..968d2fed22 100644
--- a/synapse/storage/client_ips.py
+++ b/synapse/storage/client_ips.py
@@ -15,7 +15,7 @@
 
 import logging
 
-from twisted.internet import defer, reactor
+from twisted.internet import defer
 
 from ._base import Cache
 from . import background_updates
@@ -70,7 +70,9 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
         self._client_ip_looper = self._clock.looping_call(
             self._update_client_ips_batch, 5 * 1000
         )
-        reactor.addSystemEventTrigger("before", "shutdown", self._update_client_ips_batch)
+        self.hs.get_reactor().addSystemEventTrigger(
+            "before", "shutdown", self._update_client_ips_batch
+        )
 
     def insert_client_ip(self, user_id, access_token, ip, user_agent, device_id,
                          now=None):
diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py
index a879e5bfc1..38addbf9c0 100644
--- a/synapse/storage/deviceinbox.py
+++ b/synapse/storage/deviceinbox.py
@@ -14,7 +14,8 @@
 # limitations under the License.
 
 import logging
-import simplejson
+
+from canonicaljson import json
 
 from twisted.internet import defer
 
@@ -85,7 +86,7 @@ class DeviceInboxStore(BackgroundUpdateStore):
             )
             rows = []
             for destination, edu in remote_messages_by_destination.items():
-                edu_json = simplejson.dumps(edu)
+                edu_json = json.dumps(edu)
                 rows.append((destination, stream_id, now_ms, edu_json))
             txn.executemany(sql, rows)
 
@@ -177,7 +178,7 @@ class DeviceInboxStore(BackgroundUpdateStore):
                     " WHERE user_id = ?"
                 )
                 txn.execute(sql, (user_id,))
-                message_json = simplejson.dumps(messages_by_device["*"])
+                message_json = json.dumps(messages_by_device["*"])
                 for row in txn:
                     # Add the message for all devices for this user on this
                     # server.
@@ -199,7 +200,7 @@ class DeviceInboxStore(BackgroundUpdateStore):
                     # Only insert into the local inbox if the device exists on
                     # this server
                     device = row[0]
-                    message_json = simplejson.dumps(messages_by_device[device])
+                    message_json = json.dumps(messages_by_device[device])
                     messages_json_for_user[device] = message_json
 
             if messages_json_for_user:
@@ -253,7 +254,7 @@ class DeviceInboxStore(BackgroundUpdateStore):
             messages = []
             for row in txn:
                 stream_pos = row[0]
-                messages.append(simplejson.loads(row[1]))
+                messages.append(json.loads(row[1]))
             if len(messages) < limit:
                 stream_pos = current_stream_id
             return (messages, stream_pos)
@@ -389,7 +390,7 @@ class DeviceInboxStore(BackgroundUpdateStore):
             messages = []
             for row in txn:
                 stream_pos = row[0]
-                messages.append(simplejson.loads(row[1]))
+                messages.append(json.loads(row[1]))
             if len(messages) < limit:
                 stream_pos = current_stream_id
             return (messages, stream_pos)
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index d149d8392e..2ed9ada783 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -13,7 +13,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
-import simplejson as json
 
 from twisted.internet import defer
 
@@ -21,6 +20,8 @@ from synapse.api.errors import StoreError
 from ._base import SQLBaseStore, Cache
 from synapse.util.caches.descriptors import cached, cachedList, cachedInlineCallbacks
 
+from canonicaljson import json
+
 from six import itervalues, iteritems
 
 logger = logging.getLogger(__name__)
diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py
index b146487943..181047c8b7 100644
--- a/synapse/storage/end_to_end_keys.py
+++ b/synapse/storage/end_to_end_keys.py
@@ -16,8 +16,7 @@ from twisted.internet import defer
 
 from synapse.util.caches.descriptors import cached
 
-from canonicaljson import encode_canonical_json
-import simplejson as json
+from canonicaljson import encode_canonical_json, json
 
 from ._base import SQLBaseStore
 
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index d0350ee5fe..05cb3f61ce 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -16,11 +16,11 @@
 
 from synapse.storage._base import SQLBaseStore, LoggingTransaction
 from twisted.internet import defer
-from synapse.util.async import sleep
 from synapse.util.caches.descriptors import cachedInlineCallbacks
 
 import logging
-import simplejson as json
+
+from canonicaljson import json
 
 from six import iteritems
 
@@ -84,6 +84,8 @@ class EventPushActionsWorkerStore(SQLBaseStore):
         self.find_stream_orderings_looping_call = self._clock.looping_call(
             self._find_stream_orderings_for_times, 10 * 60 * 1000
         )
+        self._rotate_delay = 3
+        self._rotate_count = 10000
 
     @cachedInlineCallbacks(num_args=3, tree=True, max_entries=5000)
     def get_unread_event_push_actions_by_room_for_user(
@@ -800,7 +802,7 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
                 )
                 if caught_up:
                     break
-                yield sleep(5)
+                yield self.hs.get_clock().sleep(self._rotate_delay)
         finally:
             self._doing_notif_rotation = False
 
@@ -821,8 +823,8 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
         txn.execute("""
             SELECT stream_ordering FROM event_push_actions
             WHERE stream_ordering > ?
-            ORDER BY stream_ordering ASC LIMIT 1 OFFSET 50000
-        """, (old_rotate_stream_ordering,))
+            ORDER BY stream_ordering ASC LIMIT 1 OFFSET ?
+        """, (old_rotate_stream_ordering, self._rotate_count))
         stream_row = txn.fetchone()
         if stream_row:
             offset_stream_ordering, = stream_row
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index cb1082e864..d816d4883c 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -19,7 +19,8 @@ from functools import wraps
 import itertools
 import logging
 
-import simplejson as json
+from canonicaljson import json
+
 from twisted.internet import defer
 
 from synapse.storage.events_worker import EventsWorkerStore
@@ -1044,7 +1045,6 @@ class EventsStore(EventsWorkerStore):
                 "event_edge_hashes",
                 "event_edges",
                 "event_forward_extremities",
-                "event_push_actions",
                 "event_reference_hashes",
                 "event_search",
                 "event_signatures",
@@ -1064,6 +1064,14 @@ class EventsStore(EventsWorkerStore):
                 [(ev.event_id,) for ev, _ in events_and_contexts]
             )
 
+        for table in (
+            "event_push_actions",
+        ):
+            txn.executemany(
+                "DELETE FROM %s WHERE room_id = ? AND event_id = ?" % (table,),
+                [(ev.event_id,) for ev, _ in events_and_contexts]
+            )
+
     def _store_event_txn(self, txn, events_and_contexts):
         """Insert new events into the event and event_json tables
 
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index 32d9d00ffb..896225aab9 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -14,13 +14,14 @@
 # limitations under the License.
 from ._base import SQLBaseStore
 
-from twisted.internet import defer, reactor
+from twisted.internet import defer
 
 from synapse.events import FrozenEvent
 from synapse.events.utils import prune_event
 
 from synapse.util.logcontext import (
     PreserveLoggingContext, make_deferred_yieldable, run_in_background,
+    LoggingContext,
 )
 from synapse.util.metrics import Measure
 from synapse.api.errors import SynapseError
@@ -28,7 +29,8 @@ from synapse.api.errors import SynapseError
 from collections import namedtuple
 
 import logging
-import simplejson as json
+
+from canonicaljson import json
 
 # these are only included to make the type annotations work
 from synapse.events import EventBase    # noqa: F401
@@ -145,6 +147,9 @@ class EventsWorkerStore(SQLBaseStore):
         missing_events_ids = [e for e in event_ids if e not in event_entry_map]
 
         if missing_events_ids:
+            log_ctx = LoggingContext.current_context()
+            log_ctx.record_event_fetch(len(missing_events_ids))
+
             missing_events = yield self._enqueue_events(
                 missing_events_ids,
                 check_redacted=check_redacted,
@@ -265,7 +270,7 @@ class EventsWorkerStore(SQLBaseStore):
                             except Exception:
                                 logger.exception("Failed to callback")
                 with PreserveLoggingContext():
-                    reactor.callFromThread(fire, event_list, row_dict)
+                    self.hs.get_reactor().callFromThread(fire, event_list, row_dict)
             except Exception as e:
                 logger.exception("do_fetch")
 
@@ -278,7 +283,7 @@ class EventsWorkerStore(SQLBaseStore):
 
                 if event_list:
                     with PreserveLoggingContext():
-                        reactor.callFromThread(fire, event_list)
+                        self.hs.get_reactor().callFromThread(fire, event_list)
 
     @defer.inlineCallbacks
     def _enqueue_events(self, events, check_redacted=True, allow_rejected=False):
diff --git a/synapse/storage/filtering.py b/synapse/storage/filtering.py
index 2e2763126d..eae6027cee 100644
--- a/synapse/storage/filtering.py
+++ b/synapse/storage/filtering.py
@@ -19,8 +19,7 @@ from ._base import SQLBaseStore
 from synapse.api.errors import SynapseError, Codes
 from synapse.util.caches.descriptors import cachedInlineCallbacks
 
-from canonicaljson import encode_canonical_json
-import simplejson as json
+from canonicaljson import encode_canonical_json, json
 
 
 class FilteringStore(SQLBaseStore):
diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py
index da05ccb027..b77402d295 100644
--- a/synapse/storage/group_server.py
+++ b/synapse/storage/group_server.py
@@ -20,7 +20,7 @@ from synapse.api.errors import SynapseError
 
 from ._base import SQLBaseStore
 
-import simplejson as json
+from canonicaljson import json
 
 
 # The category ID for the "default" category. We don't store as null in the
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index 04a0b59a39..9e52e992b3 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -25,9 +25,10 @@ from synapse.push.baserules import list_with_base_rules
 from synapse.api.constants import EventTypes
 from twisted.internet import defer
 
+from canonicaljson import json
+
 import abc
 import logging
-import simplejson as json
 
 logger = logging.getLogger(__name__)
 
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index 307660b99a..c6def861cf 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -17,12 +17,11 @@
 from ._base import SQLBaseStore
 from twisted.internet import defer
 
-from canonicaljson import encode_canonical_json
+from canonicaljson import encode_canonical_json, json
 
 from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList
 
 import logging
-import simplejson as json
 import types
 
 logger = logging.getLogger(__name__)
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index c93c228f6e..f230a3bab7 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -21,9 +21,10 @@ from synapse.util.caches.stream_change_cache import StreamChangeCache
 
 from twisted.internet import defer
 
+from canonicaljson import json
+
 import abc
 import logging
-import simplejson as json
 
 
 logger = logging.getLogger(__name__)
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 9c9cf46e7f..0d18f6d869 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -623,7 +623,9 @@ class RegistrationStore(RegistrationWorkerStore,
         Removes the given user to the table of users who need to be parted from all the
         rooms they're in, effectively marking that user as fully deactivated.
         """
-        return self._simple_delete_one(
+        # XXX: This should be simple_delete_one but we failed to put a unique index on
+        # the table, so somehow duplicate entries have ended up in it.
+        return self._simple_delete(
             "users_pending_deactivation",
             keyvalues={
                 "user_id": user_id,
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index ea6a189185..ca0eb187e5 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -20,9 +20,10 @@ from synapse.storage._base import SQLBaseStore
 from synapse.storage.search import SearchStore
 from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
 
+from canonicaljson import json
+
 import collections
 import logging
-import simplejson as json
 import re
 
 logger = logging.getLogger(__name__)
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 48a88f755e..8fc9549a75 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -28,7 +28,7 @@ from synapse.api.constants import Membership, EventTypes
 from synapse.types import get_domain_from_id
 
 import logging
-import simplejson as json
+from canonicaljson import json
 
 from six import itervalues, iteritems
 
@@ -455,7 +455,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
 
         defer.returnValue(joined_hosts)
 
-    @cached(max_entries=10000, iterable=True)
+    @cached(max_entries=10000)
     def _get_joined_hosts_cache(self, room_id):
         return _JoinedHostsCache(self, room_id)
 
diff --git a/synapse/storage/search.py b/synapse/storage/search.py
index f0fa5d7631..9b77c45318 100644
--- a/synapse/storage/search.py
+++ b/synapse/storage/search.py
@@ -16,7 +16,7 @@
 from collections import namedtuple
 import logging
 import re
-import simplejson as json
+from canonicaljson import json
 
 from six import string_types
 
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 85b8ec2b8f..cd9821c270 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -526,10 +526,23 @@ class StateGroupWorkerStore(SQLBaseStore):
 
     @defer.inlineCallbacks
     def _get_state_for_groups(self, groups, types=None):
-        """Given list of groups returns dict of group -> list of state events
-        with matching types. `types` is a list of `(type, state_key)`, where
-        a `state_key` of None matches all state_keys. If `types` is None then
-        all events are returned.
+        """Gets the state at each of a list of state groups, optionally
+        filtering by type/state_key
+
+        Args:
+            groups (iterable[int]): list of state groups for which we want
+                to get the state.
+            types (None|iterable[(str, None|str)]):
+                indicates the state type/keys required. If None, the whole
+                state is fetched and returned.
+
+                Otherwise, each entry should be a `(type, state_key)` tuple to
+                include in the response. A `state_key` of None is a wildcard
+                meaning that we require all state with that type.
+
+        Returns:
+            Deferred[dict[int, dict[(type, state_key), EventBase]]]
+                a dictionary mapping from state group to state dictionary.
         """
         if types:
             types = frozenset(types)
@@ -538,7 +551,7 @@ class StateGroupWorkerStore(SQLBaseStore):
         if types is not None:
             for group in set(groups):
                 state_dict_ids, _, got_all = self._get_some_state_from_cache(
-                    group, types
+                    group, types,
                 )
                 results[group] = state_dict_ids
 
@@ -559,26 +572,40 @@ class StateGroupWorkerStore(SQLBaseStore):
             # Okay, so we have some missing_types, lets fetch them.
             cache_seq_num = self._state_group_cache.sequence
 
+            # the DictionaryCache knows if it has *all* the state, but
+            # does not know if it has all of the keys of a particular type,
+            # which makes wildcard lookups expensive unless we have a complete
+            # cache. Hence, if we are doing a wildcard lookup, populate the
+            # cache fully so that we can do an efficient lookup next time.
+
+            if types and any(k is None for (t, k) in types):
+                types_to_fetch = None
+            else:
+                types_to_fetch = types
+
             group_to_state_dict = yield self._get_state_groups_from_groups(
-                missing_groups, types
+                missing_groups, types_to_fetch,
             )
 
-            # Now we want to update the cache with all the things we fetched
-            # from the database.
             for group, group_state_dict in iteritems(group_to_state_dict):
                 state_dict = results[group]
 
-                state_dict.update(
-                    ((intern_string(k[0]), intern_string(k[1])), to_ascii(v))
-                    for k, v in iteritems(group_state_dict)
-                )
-
+                # update the result, filtering by `types`.
+                if types:
+                    for k, v in iteritems(group_state_dict):
+                        (typ, _) = k
+                        if k in types or (typ, None) in types:
+                            state_dict[k] = v
+                else:
+                    state_dict.update(group_state_dict)
+
+                # update the cache with all the things we fetched from the
+                # database.
                 self._state_group_cache.update(
                     cache_seq_num,
                     key=group,
-                    value=state_dict,
-                    full=(types is None),
-                    known_absent=types,
+                    value=group_state_dict,
+                    fetched_keys=types_to_fetch,
                 )
 
         defer.returnValue(results)
@@ -685,7 +712,6 @@ class StateGroupWorkerStore(SQLBaseStore):
                 self._state_group_cache.sequence,
                 key=state_group,
                 value=dict(current_state_ids),
-                full=True,
             )
 
             return state_group
diff --git a/synapse/storage/tags.py b/synapse/storage/tags.py
index 6671d3cfca..04d123ed95 100644
--- a/synapse/storage/tags.py
+++ b/synapse/storage/tags.py
@@ -19,7 +19,8 @@ from synapse.storage.account_data import AccountDataWorkerStore
 from synapse.util.caches.descriptors import cached
 from twisted.internet import defer
 
-import simplejson as json
+from canonicaljson import json
+
 import logging
 
 from six.moves import range
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index e485d19b84..acbc03446e 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -19,12 +19,11 @@ from synapse.util.caches.descriptors import cached
 from twisted.internet import defer
 import six
 
-from canonicaljson import encode_canonical_json
+from canonicaljson import encode_canonical_json, json
 
 from collections import namedtuple
 
 import logging
-import simplejson as json
 
 # py2 sqlite has buffer hardcoded as only binary type, so we must use it,
 # despite being deprecated and removed in favor of memoryview
diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py
index fc11e26623..e9886ef299 100644
--- a/synapse/util/__init__.py
+++ b/synapse/util/__init__.py
@@ -13,15 +13,14 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from synapse.util.logcontext import PreserveLoggingContext
-
-from twisted.internet import defer, reactor, task
-
-import time
 import logging
-
 from itertools import islice
 
+import attr
+from twisted.internet import defer, task
+
+from synapse.util.logcontext import PreserveLoggingContext
+
 logger = logging.getLogger(__name__)
 
 
@@ -31,16 +30,27 @@ def unwrapFirstError(failure):
     return failure.value.subFailure
 
 
+@attr.s
 class Clock(object):
-    """A small utility that obtains current time-of-day so that time may be
-    mocked during unit-tests.
+    """
+    A Clock wraps a Twisted reactor and provides utilities on top of it.
 
-    TODO(paul): Also move the sleep() functionality into it
+    Args:
+        reactor: The Twisted reactor to use.
     """
+    _reactor = attr.ib()
+
+    @defer.inlineCallbacks
+    def sleep(self, seconds):
+        d = defer.Deferred()
+        with PreserveLoggingContext():
+            self._reactor.callLater(seconds, d.callback, seconds)
+            res = yield d
+        defer.returnValue(res)
 
     def time(self):
         """Returns the current system time in seconds since epoch."""
-        return time.time()
+        return self._reactor.seconds()
 
     def time_msec(self):
         """Returns the current system time in miliseconds since epoch."""
@@ -56,6 +66,7 @@ class Clock(object):
             msec(float): How long to wait between calls in milliseconds.
         """
         call = task.LoopingCall(f)
+        call.clock = self._reactor
         call.start(msec / 1000.0, now=False)
         return call
 
@@ -73,7 +84,7 @@ class Clock(object):
                 callback(*args, **kwargs)
 
         with PreserveLoggingContext():
-            return reactor.callLater(delay, wrapped_callback, *args, **kwargs)
+            return self._reactor.callLater(delay, wrapped_callback, *args, **kwargs)
 
     def cancel_call_later(self, timer, ignore_errs=False):
         try:
diff --git a/synapse/util/async.py b/synapse/util/async.py
index 9dd4e6b5bc..1668df4ce6 100644
--- a/synapse/util/async.py
+++ b/synapse/util/async.py
@@ -13,15 +13,14 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-
-from twisted.internet import defer, reactor
+from twisted.internet import defer
 from twisted.internet.defer import CancelledError
 from twisted.python import failure
 
 from .logcontext import (
     PreserveLoggingContext, make_deferred_yieldable, run_in_background
 )
-from synapse.util import logcontext, unwrapFirstError
+from synapse.util import logcontext, unwrapFirstError, Clock
 
 from contextlib import contextmanager
 
@@ -32,22 +31,6 @@ from six.moves import range
 logger = logging.getLogger(__name__)
 
 
-@defer.inlineCallbacks
-def sleep(seconds):
-    d = defer.Deferred()
-    with PreserveLoggingContext():
-        reactor.callLater(seconds, d.callback, seconds)
-        res = yield d
-    defer.returnValue(res)
-
-
-def run_on_reactor():
-    """ This will cause the rest of the function to be invoked upon the next
-    iteration of the main loop
-    """
-    return sleep(0)
-
-
 class ObservableDeferred(object):
     """Wraps a deferred object so that we can add observer deferreds. These
     observer deferreds do not affect the callback chain of the original
@@ -180,13 +163,18 @@ class Linearizer(object):
             # do some work.
 
     """
-    def __init__(self, name=None):
+    def __init__(self, name=None, clock=None):
         if name is None:
             self.name = id(self)
         else:
             self.name = name
         self.key_to_defer = {}
 
+        if not clock:
+            from twisted.internet import reactor
+            clock = Clock(reactor)
+        self._clock = clock
+
     @defer.inlineCallbacks
     def queue(self, key):
         # If there is already a deferred in the queue, we pull it out so that
@@ -227,7 +215,7 @@ class Linearizer(object):
             # the context manager, but it needs to happen while we hold the
             # lock, and the context manager's exit code must be synchronous,
             # so actually this is the only sensible place.
-            yield run_on_reactor()
+            yield self._clock.sleep(0)
 
         else:
             logger.info("Acquired uncontended linearizer lock %r for key %r",
@@ -404,7 +392,7 @@ class DeferredTimeoutError(Exception):
     """
 
 
-def add_timeout_to_deferred(deferred, timeout, on_timeout_cancel=None):
+def add_timeout_to_deferred(deferred, timeout, reactor, on_timeout_cancel=None):
     """
     Add a timeout to a deferred by scheduling it to be cancelled after
     timeout seconds.
@@ -419,6 +407,7 @@ def add_timeout_to_deferred(deferred, timeout, on_timeout_cancel=None):
     Args:
         deferred (defer.Deferred): deferred to be timed out
         timeout (Number): seconds to time out after
+        reactor (twisted.internet.reactor): the Twisted reactor to use
 
         on_timeout_cancel (callable): A callable which is called immediately
             after the deferred times out, and not if this deferred is
diff --git a/synapse/util/caches/dictionary_cache.py b/synapse/util/caches/dictionary_cache.py
index bdc21e348f..95793d466d 100644
--- a/synapse/util/caches/dictionary_cache.py
+++ b/synapse/util/caches/dictionary_cache.py
@@ -107,29 +107,28 @@ class DictionaryCache(object):
         self.sequence += 1
         self.cache.clear()
 
-    def update(self, sequence, key, value, full=False, known_absent=None):
+    def update(self, sequence, key, value, fetched_keys=None):
         """Updates the entry in the cache
 
         Args:
             sequence
-            key
-            value (dict): The value to update the cache with.
-            full (bool): Whether the given value is the full dict, or just a
-                partial subset there of. If not full then any existing entries
-                for the key will be updated.
-            known_absent (set): Set of keys that we know don't exist in the full
-                dict.
+            key (K)
+            value (dict[X,Y]): The value to update the cache with.
+            fetched_keys (None|set[X]): All of the dictionary keys which were
+                fetched from the database.
+
+                If None, this is the complete value for key K. Otherwise, it
+                is used to infer a list of keys which we know don't exist in
+                the full dict.
         """
         self.check_thread()
         if self.sequence == sequence:
             # Only update the cache if the caches sequence number matches the
             # number that the cache had before the SELECT was started (SYN-369)
-            if known_absent is None:
-                known_absent = set()
-            if full:
-                self._insert(key, value, known_absent)
+            if fetched_keys is None:
+                self._insert(key, value, set())
             else:
-                self._update_or_insert(key, value, known_absent)
+                self._update_or_insert(key, value, fetched_keys)
 
     def _update_or_insert(self, key, value, known_absent):
         # We pop and reinsert as we need to tell the cache the size may have
diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py
index 817118e30f..0fb8620001 100644
--- a/synapse/util/caches/stream_change_cache.py
+++ b/synapse/util/caches/stream_change_cache.py
@@ -78,7 +78,8 @@ class StreamChangeCache(object):
             not_known_entities = set(entities) - set(self._entity_to_key)
 
             result = (
-                set(self._cache.values()[self._cache.bisect_right(stream_pos) :])
+                {self._cache[k] for k in self._cache.islice(
+                    start=self._cache.bisect_right(stream_pos))}
                 .intersection(entities)
                 .union(not_known_entities)
             )
@@ -113,7 +114,8 @@ class StreamChangeCache(object):
         assert type(stream_pos) is int
 
         if stream_pos >= self._earliest_known_stream_pos:
-            return self._cache.values()[self._cache.bisect_right(stream_pos) :]
+            return [self._cache[k] for k in self._cache.islice(
+                start=self._cache.bisect_right(stream_pos))]
         else:
             return None
 
diff --git a/synapse/util/file_consumer.py b/synapse/util/file_consumer.py
index 3380970e4e..c78801015b 100644
--- a/synapse/util/file_consumer.py
+++ b/synapse/util/file_consumer.py
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from twisted.internet import threads, reactor
+from twisted.internet import threads
 
 from synapse.util.logcontext import make_deferred_yieldable, run_in_background
 
@@ -27,6 +27,7 @@ class BackgroundFileConsumer(object):
     Args:
         file_obj (file): The file like object to write to. Closed when
             finished.
+        reactor (twisted.internet.reactor): the Twisted reactor to use
     """
 
     # For PushProducers pause if we have this many unwritten slices
@@ -34,9 +35,11 @@ class BackgroundFileConsumer(object):
     # And resume once the size of the queue is less than this
     _RESUME_ON_QUEUE_SIZE = 2
 
-    def __init__(self, file_obj):
+    def __init__(self, file_obj, reactor):
         self._file_obj = file_obj
 
+        self._reactor = reactor
+
         # Producer we're registered with
         self._producer = None
 
@@ -71,7 +74,10 @@ class BackgroundFileConsumer(object):
         self._producer = producer
         self.streaming = streaming
         self._finished_deferred = run_in_background(
-            threads.deferToThread, self._writer
+            threads.deferToThreadPool,
+            self._reactor,
+            self._reactor.getThreadPool(),
+            self._writer,
         )
         if not streaming:
             self._producer.resumeProducing()
@@ -109,7 +115,7 @@ class BackgroundFileConsumer(object):
                 # producer.
                 if self._producer and self._paused_producer:
                     if self._bytes_queue.qsize() <= self._RESUME_ON_QUEUE_SIZE:
-                        reactor.callFromThread(self._resume_paused_producer)
+                        self._reactor.callFromThread(self._resume_paused_producer)
 
                 bytes = self._bytes_queue.get()
 
@@ -121,7 +127,7 @@ class BackgroundFileConsumer(object):
                 # If its a pull producer then we need to explicitly ask for
                 # more stuff.
                 if not self.streaming and self._producer:
-                    reactor.callFromThread(self._producer.resumeProducing)
+                    self._reactor.callFromThread(self._producer.resumeProducing)
         except Exception as e:
             self._write_exception = e
             raise
diff --git a/synapse/util/frozenutils.py b/synapse/util/frozenutils.py
index 15f0a7ba9e..535e7d0e7a 100644
--- a/synapse/util/frozenutils.py
+++ b/synapse/util/frozenutils.py
@@ -14,7 +14,7 @@
 # limitations under the License.
 
 from frozendict import frozendict
-import simplejson as json
+from canonicaljson import json
 
 from six import string_types
 
diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py
index a58c723403..df2b71b791 100644
--- a/synapse/util/logcontext.py
+++ b/synapse/util/logcontext.py
@@ -60,6 +60,7 @@ class LoggingContext(object):
     __slots__ = [
         "previous_context", "name", "ru_stime", "ru_utime",
         "db_txn_count", "db_txn_duration_sec", "db_sched_duration_sec",
+        "evt_db_fetch_count",
         "usage_start",
         "main_thread", "alive",
         "request", "tag",
@@ -90,6 +91,9 @@ class LoggingContext(object):
         def add_database_scheduled(self, sched_sec):
             pass
 
+        def record_event_fetch(self, event_count):
+            pass
+
         def __nonzero__(self):
             return False
         __bool__ = __nonzero__  # python3
@@ -109,6 +113,9 @@ class LoggingContext(object):
         # sec spent waiting for db txns to be scheduled
         self.db_sched_duration_sec = 0
 
+        # number of events this thread has fetched from the db
+        self.evt_db_fetch_count = 0
+
         # If alive has the thread resource usage when the logcontext last
         # became active.
         self.usage_start = None
@@ -243,6 +250,14 @@ class LoggingContext(object):
         """
         self.db_sched_duration_sec += sched_sec
 
+    def record_event_fetch(self, event_count):
+        """Record a number of events being fetched from the db
+
+        Args:
+            event_count (int): number of events being fetched
+        """
+        self.evt_db_fetch_count += event_count
+
 
 class LoggingContextFilter(logging.Filter):
     """Logging filter that adds values from the current logging context to each
diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py
index 0ab63c3d7d..c5a45cef7c 100644
--- a/synapse/util/ratelimitutils.py
+++ b/synapse/util/ratelimitutils.py
@@ -17,7 +17,6 @@ from twisted.internet import defer
 
 from synapse.api.errors import LimitExceededError
 
-from synapse.util.async import sleep
 from synapse.util.logcontext import (
     run_in_background, make_deferred_yieldable,
     PreserveLoggingContext,
@@ -153,7 +152,7 @@ class _PerHostRatelimiter(object):
                 "Ratelimit [%s]: sleeping req",
                 id(request_id),
             )
-            ret_defer = run_in_background(sleep, self.sleep_msec / 1000.0)
+            ret_defer = run_in_background(self.clock.sleep, self.sleep_msec / 1000.0)
 
             self.sleeping_requests.add(request_id)