summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
authorRichard van der Hoff <1389908+richvdh@users.noreply.github.com>2018-07-04 07:13:38 +0100
committerGitHub <noreply@github.com>2018-07-04 07:13:38 +0100
commita4ab49137192e7994b231c6a8204baa452c9a8d6 (patch)
tree680f11091640bf6dedeb75922f2a90bafcc27d9f /synapse/handlers
parentRemove event re-signing hacks (diff)
parentReject invalid server names (#3480) (diff)
downloadsynapse-a4ab49137192e7994b231c6a8204baa452c9a8d6.tar.xz
Merge branch 'develop' into rav/drop_re_signing_hacks
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/auth.py52
-rw-r--r--synapse/handlers/deactivate_account.py39
-rw-r--r--synapse/handlers/e2e_keys.py7
-rw-r--r--synapse/handlers/federation.py204
-rw-r--r--synapse/handlers/identity.py58
-rw-r--r--synapse/handlers/message.py22
-rw-r--r--synapse/handlers/presence.py4
-rw-r--r--synapse/handlers/register.py5
-rw-r--r--synapse/handlers/room.py8
-rw-r--r--synapse/handlers/search.py14
-rw-r--r--synapse/handlers/sync.py2
-rw-r--r--synapse/handlers/user_directory.py9
12 files changed, 313 insertions, 111 deletions
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 3c0051586d..cbef1f2770 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -13,8 +13,11 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
 from twisted.internet import defer, threads
 
+from canonicaljson import json
+
 from ._base import BaseHandler
 from synapse.api.constants import LoginType
 from synapse.api.errors import (
@@ -23,7 +26,6 @@ from synapse.api.errors import (
 )
 from synapse.module_api import ModuleApi
 from synapse.types import UserID
-from synapse.util.async import run_on_reactor
 from synapse.util.caches.expiringcache import ExpiringCache
 from synapse.util.logcontext import make_deferred_yieldable
 
@@ -32,7 +34,7 @@ from twisted.web.client import PartialDownloadError
 import logging
 import bcrypt
 import pymacaroons
-import simplejson
+import attr
 
 import synapse.util.stringutils as stringutils
 
@@ -402,7 +404,7 @@ class AuthHandler(BaseHandler):
         except PartialDownloadError as pde:
             # Twisted is silly
             data = pde.response
-            resp_body = simplejson.loads(data)
+            resp_body = json.loads(data)
 
         if 'success' in resp_body:
             # Note that we do NOT check the hostname here: we explicitly
@@ -423,15 +425,11 @@ class AuthHandler(BaseHandler):
     def _check_msisdn(self, authdict, _):
         return self._check_threepid('msisdn', authdict)
 
-    @defer.inlineCallbacks
     def _check_dummy_auth(self, authdict, _):
-        yield run_on_reactor()
-        defer.returnValue(True)
+        return defer.succeed(True)
 
     @defer.inlineCallbacks
     def _check_threepid(self, medium, authdict):
-        yield run_on_reactor()
-
         if 'threepid_creds' not in authdict:
             raise LoginError(400, "Missing threepid_creds", Codes.MISSING_PARAM)
 
@@ -825,6 +823,15 @@ class AuthHandler(BaseHandler):
         if medium == 'email':
             address = address.lower()
 
+        identity_handler = self.hs.get_handlers().identity_handler
+        yield identity_handler.unbind_threepid(
+            user_id,
+            {
+                'medium': medium,
+                'address': address,
+            },
+        )
+
         ret = yield self.store.user_delete_threepid(
             user_id, medium, address,
         )
@@ -849,7 +856,11 @@ class AuthHandler(BaseHandler):
             return bcrypt.hashpw(password.encode('utf8') + self.hs.config.password_pepper,
                                  bcrypt.gensalt(self.bcrypt_rounds))
 
-        return make_deferred_yieldable(threads.deferToThread(_do_hash))
+        return make_deferred_yieldable(
+            threads.deferToThreadPool(
+                self.hs.get_reactor(), self.hs.get_reactor().getThreadPool(), _do_hash
+            ),
+        )
 
     def validate_hash(self, password, stored_hash):
         """Validates that self.hash(password) == stored_hash.
@@ -869,16 +880,21 @@ class AuthHandler(BaseHandler):
             )
 
         if stored_hash:
-            return make_deferred_yieldable(threads.deferToThread(_do_validate_hash))
+            return make_deferred_yieldable(
+                threads.deferToThreadPool(
+                    self.hs.get_reactor(),
+                    self.hs.get_reactor().getThreadPool(),
+                    _do_validate_hash,
+                ),
+            )
         else:
             return defer.succeed(False)
 
 
-class MacaroonGeneartor(object):
-    def __init__(self, hs):
-        self.clock = hs.get_clock()
-        self.server_name = hs.config.server_name
-        self.macaroon_secret_key = hs.config.macaroon_secret_key
+@attr.s
+class MacaroonGenerator(object):
+
+    hs = attr.ib()
 
     def generate_access_token(self, user_id, extra_caveats=None):
         extra_caveats = extra_caveats or []
@@ -896,7 +912,7 @@ class MacaroonGeneartor(object):
     def generate_short_term_login_token(self, user_id, duration_in_ms=(2 * 60 * 1000)):
         macaroon = self._generate_base_macaroon(user_id)
         macaroon.add_first_party_caveat("type = login")
-        now = self.clock.time_msec()
+        now = self.hs.get_clock().time_msec()
         expiry = now + duration_in_ms
         macaroon.add_first_party_caveat("time < %d" % (expiry,))
         return macaroon.serialize()
@@ -908,9 +924,9 @@ class MacaroonGeneartor(object):
 
     def _generate_base_macaroon(self, user_id):
         macaroon = pymacaroons.Macaroon(
-            location=self.server_name,
+            location=self.hs.config.server_name,
             identifier="key",
-            key=self.macaroon_secret_key)
+            key=self.hs.config.macaroon_secret_key)
         macaroon.add_first_party_caveat("gen = 1")
         macaroon.add_first_party_caveat("user_id = %s" % (user_id,))
         return macaroon
diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py
index c5e92f6214..a84b7b8b80 100644
--- a/synapse/handlers/deactivate_account.py
+++ b/synapse/handlers/deactivate_account.py
@@ -12,11 +12,12 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-from twisted.internet import defer, reactor
+from twisted.internet import defer
 
 from ._base import BaseHandler
 from synapse.types import UserID, create_requester
 from synapse.util.logcontext import run_in_background
+from synapse.api.errors import SynapseError
 
 import logging
 
@@ -30,6 +31,7 @@ class DeactivateAccountHandler(BaseHandler):
         self._auth_handler = hs.get_auth_handler()
         self._device_handler = hs.get_device_handler()
         self._room_member_handler = hs.get_room_member_handler()
+        self._identity_handler = hs.get_handlers().identity_handler
         self.user_directory_handler = hs.get_user_directory_handler()
 
         # Flag that indicates whether the process to part users from rooms is running
@@ -37,14 +39,15 @@ class DeactivateAccountHandler(BaseHandler):
 
         # Start the user parter loop so it can resume parting users from rooms where
         # it left off (if it has work left to do).
-        reactor.callWhenRunning(self._start_user_parting)
+        hs.get_reactor().callWhenRunning(self._start_user_parting)
 
     @defer.inlineCallbacks
-    def deactivate_account(self, user_id):
+    def deactivate_account(self, user_id, erase_data):
         """Deactivate a user's account
 
         Args:
             user_id (str): ID of user to be deactivated
+            erase_data (bool): whether to GDPR-erase the user's data
 
         Returns:
             Deferred
@@ -52,14 +55,35 @@ class DeactivateAccountHandler(BaseHandler):
         # FIXME: Theoretically there is a race here wherein user resets
         # password using threepid.
 
-        # first delete any devices belonging to the user, which will also
+        # delete threepids first. We remove these from the IS so if this fails,
+        # leave the user still active so they can try again.
+        # Ideally we would prevent password resets and then do this in the
+        # background thread.
+        threepids = yield self.store.user_get_threepids(user_id)
+        for threepid in threepids:
+            try:
+                yield self._identity_handler.unbind_threepid(
+                    user_id,
+                    {
+                        'medium': threepid['medium'],
+                        'address': threepid['address'],
+                    },
+                )
+            except Exception:
+                # Do we want this to be a fatal error or should we carry on?
+                logger.exception("Failed to remove threepid from ID server")
+                raise SynapseError(400, "Failed to remove threepid from ID server")
+            yield self.store.user_delete_threepid(
+                user_id, threepid['medium'], threepid['address'],
+            )
+
+        # delete any devices belonging to the user, which will also
         # delete corresponding access tokens.
         yield self._device_handler.delete_all_devices_for_user(user_id)
         # then delete any remaining access tokens which weren't associated with
         # a device.
         yield self._auth_handler.delete_access_tokens_for_user(user_id)
 
-        yield self.store.user_delete_threepids(user_id)
         yield self.store.user_set_password_hash(user_id, None)
 
         # Add the user to a table of users pending deactivation (ie.
@@ -69,6 +93,11 @@ class DeactivateAccountHandler(BaseHandler):
         # delete from user directory
         yield self.user_directory_handler.handle_user_deactivated(user_id)
 
+        # Mark the user as erased, if they asked for that
+        if erase_data:
+            logger.info("Marking %s as erased", user_id)
+            yield self.store.mark_user_erased(user_id)
+
         # Now start the process that goes through that list and
         # parts users from rooms (if it isn't already running)
         self._start_user_parting()
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 8a2d177539..62b4892a4e 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -14,10 +14,9 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import simplejson as json
 import logging
 
-from canonicaljson import encode_canonical_json
+from canonicaljson import encode_canonical_json, json
 from twisted.internet import defer
 from six import iteritems
 
@@ -80,7 +79,7 @@ class E2eKeysHandler(object):
             else:
                 remote_queries[user_id] = device_ids
 
-        # Firt get local devices.
+        # First get local devices.
         failures = {}
         results = {}
         if local_query:
@@ -357,7 +356,7 @@ def _exception_to_failure(e):
     # include ConnectionRefused and other errors
     #
     # Note that some Exceptions (notably twisted's ResponseFailed etc) don't
-    # give a string for e.message, which simplejson then fails to serialize.
+    # give a string for e.message, which json then fails to serialize.
     return {
         "status": 503, "message": str(e.message),
     }
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 60b97b140e..6350a4c204 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -39,11 +39,12 @@ from synapse.events.validator import EventValidator
 from synapse.util import unwrapFirstError, logcontext
 from synapse.util.metrics import measure_func
 from synapse.util.logutils import log_function
-from synapse.util.async import run_on_reactor, Linearizer
+from synapse.util.async import Linearizer
 from synapse.util.frozenutils import unfreeze
 from synapse.crypto.event_signing import (
     compute_event_signature, add_hashes_and_signatures,
 )
+from synapse.state import resolve_events_with_factory
 from synapse.types import UserID, get_domain_from_id
 
 from synapse.events.utils import prune_event
@@ -89,7 +90,9 @@ class FederationHandler(BaseHandler):
 
     @defer.inlineCallbacks
     @log_function
-    def on_receive_pdu(self, origin, pdu, get_missing=True):
+    def on_receive_pdu(
+            self, origin, pdu, get_missing=True, sent_to_us_directly=False,
+    ):
         """ Process a PDU received via a federation /send/ transaction, or
         via backfill of missing prev_events
 
@@ -103,8 +106,10 @@ class FederationHandler(BaseHandler):
         """
 
         # We reprocess pdus when we have seen them only as outliers
-        existing = yield self.get_persisted_pdu(
-            origin, pdu.event_id, do_auth=False
+        existing = yield self.store.get_event(
+            pdu.event_id,
+            allow_none=True,
+            allow_rejected=True,
         )
 
         # FIXME: Currently we fetch an event again when we already have it
@@ -161,14 +166,11 @@ class FederationHandler(BaseHandler):
                     "Ignoring PDU %s for room %s from %s as we've left the room!",
                     pdu.event_id, pdu.room_id, origin,
                 )
-                return
+                defer.returnValue(None)
 
         state = None
-
         auth_chain = []
 
-        fetch_state = False
-
         # Get missing pdus if necessary.
         if not pdu.internal_metadata.is_outlier():
             # We only backfill backwards to the min depth.
@@ -223,26 +225,60 @@ class FederationHandler(BaseHandler):
                         list(prevs - seen)[:5],
                     )
 
-            if prevs - seen:
-                logger.info(
-                    "Still missing %d events for room %r: %r...",
-                    len(prevs - seen), pdu.room_id, list(prevs - seen)[:5]
+            if sent_to_us_directly and prevs - seen:
+                # If they have sent it to us directly, and the server
+                # isn't telling us about the auth events that it's
+                # made a message referencing, we explode
+                raise FederationError(
+                    "ERROR",
+                    403,
+                    (
+                        "Your server isn't divulging details about prev_events "
+                        "referenced in this event."
+                    ),
+                    affected=pdu.event_id,
                 )
-                fetch_state = True
+            elif prevs - seen:
+                # Calculate the state of the previous events, and
+                # de-conflict them to find the current state.
+                state_groups = []
+                auth_chains = set()
+                try:
+                    # Get the state of the events we know about
+                    ours = yield self.store.get_state_groups(pdu.room_id, list(seen))
+                    state_groups.append(ours)
+
+                    # Ask the remote server for the states we don't
+                    # know about
+                    for p in prevs - seen:
+                        state, got_auth_chain = (
+                            yield self.replication_layer.get_state_for_room(
+                                origin, pdu.room_id, p
+                            )
+                        )
+                        auth_chains.update(got_auth_chain)
+                        state_group = {(x.type, x.state_key): x.event_id for x in state}
+                        state_groups.append(state_group)
+
+                    # Resolve any conflicting state
+                    def fetch(ev_ids):
+                        return self.store.get_events(
+                            ev_ids, get_prev_content=False, check_redacted=False
+                        )
 
-        if fetch_state:
-            # We need to get the state at this event, since we haven't
-            # processed all the prev events.
-            logger.debug(
-                "_handle_new_pdu getting state for %s",
-                pdu.room_id
-            )
-            try:
-                state, auth_chain = yield self.replication_layer.get_state_for_room(
-                    origin, pdu.room_id, pdu.event_id,
-                )
-            except Exception:
-                logger.exception("Failed to get state for event: %s", pdu.event_id)
+                    state_map = yield resolve_events_with_factory(
+                        state_groups, {pdu.event_id: pdu}, fetch
+                    )
+
+                    state = (yield self.store.get_events(state_map.values())).values()
+                    auth_chain = list(auth_chains)
+                except Exception:
+                    raise FederationError(
+                        "ERROR",
+                        403,
+                        "We can't get valid state history.",
+                        affected=pdu.event_id,
+                    )
 
         yield self._process_received_pdu(
             origin,
@@ -320,11 +356,17 @@ class FederationHandler(BaseHandler):
 
         for e in missing_events:
             logger.info("Handling found event %s", e.event_id)
-            yield self.on_receive_pdu(
-                origin,
-                e,
-                get_missing=False
-            )
+            try:
+                yield self.on_receive_pdu(
+                    origin,
+                    e,
+                    get_missing=False
+                )
+            except FederationError as e:
+                if e.code == 403:
+                    logger.warn("Event %s failed history check.")
+                else:
+                    raise
 
     @log_function
     @defer.inlineCallbacks
@@ -458,6 +500,47 @@ class FederationHandler(BaseHandler):
     @measure_func("_filter_events_for_server")
     @defer.inlineCallbacks
     def _filter_events_for_server(self, server_name, room_id, events):
+        """Filter the given events for the given server, redacting those the
+        server can't see.
+
+        Assumes the server is currently in the room.
+
+        Returns
+            list[FrozenEvent]
+        """
+        # First lets check to see if all the events have a history visibility
+        # of "shared" or "world_readable". If thats the case then we don't
+        # need to check membership (as we know the server is in the room).
+        event_to_state_ids = yield self.store.get_state_ids_for_events(
+            frozenset(e.event_id for e in events),
+            types=(
+                (EventTypes.RoomHistoryVisibility, ""),
+            )
+        )
+
+        visibility_ids = set()
+        for sids in event_to_state_ids.itervalues():
+            hist = sids.get((EventTypes.RoomHistoryVisibility, ""))
+            if hist:
+                visibility_ids.add(hist)
+
+        # If we failed to find any history visibility events then the default
+        # is "shared" visiblity.
+        if not visibility_ids:
+            defer.returnValue(events)
+
+        event_map = yield self.store.get_events(visibility_ids)
+        all_open = all(
+            e.content.get("history_visibility") in (None, "shared", "world_readable")
+            for e in event_map.itervalues()
+        )
+
+        if all_open:
+            defer.returnValue(events)
+
+        # Ok, so we're dealing with events that have non-trivial visibility
+        # rules, so we need to also get the memberships of the room.
+
         event_to_state_ids = yield self.store.get_state_ids_for_events(
             frozenset(e.event_id for e in events),
             types=(
@@ -493,7 +576,20 @@ class FederationHandler(BaseHandler):
             for e_id, key_to_eid in event_to_state_ids.iteritems()
         }
 
+        erased_senders = yield self.store.are_users_erased(
+            e.sender for e in events,
+        )
+
         def redact_disallowed(event, state):
+            # if the sender has been gdpr17ed, always return a redacted
+            # copy of the event.
+            if erased_senders[event.sender]:
+                logger.info(
+                    "Sender of %s has been erased, redacting",
+                    event.event_id,
+                )
+                return prune_event(event)
+
             if not state:
                 return event
 
@@ -1371,8 +1467,6 @@ class FederationHandler(BaseHandler):
     def get_state_for_pdu(self, room_id, event_id):
         """Returns the state at the event. i.e. not including said event.
         """
-        yield run_on_reactor()
-
         state_groups = yield self.store.get_state_groups(
             room_id, [event_id]
         )
@@ -1403,8 +1497,6 @@ class FederationHandler(BaseHandler):
     def get_state_ids_for_pdu(self, room_id, event_id):
         """Returns the state at the event. i.e. not including said event.
         """
-        yield run_on_reactor()
-
         state_groups = yield self.store.get_state_groups_ids(
             room_id, [event_id]
         )
@@ -1446,11 +1538,20 @@ class FederationHandler(BaseHandler):
 
     @defer.inlineCallbacks
     @log_function
-    def get_persisted_pdu(self, origin, event_id, do_auth=True):
-        """ Get a PDU from the database with given origin and id.
+    def get_persisted_pdu(self, origin, event_id):
+        """Get an event from the database for the given server.
+
+        Args:
+            origin [str]: hostname of server which is requesting the event; we
+               will check that the server is allowed to see it.
+            event_id [str]: id of the event being requested
 
         Returns:
-            Deferred: Results in a `Pdu`.
+            Deferred[EventBase|None]: None if we know nothing about the event;
+                otherwise the (possibly-redacted) event.
+
+        Raises:
+            AuthError if the server is not currently in the room
         """
         event = yield self.store.get_event(
             event_id,
@@ -1459,20 +1560,17 @@ class FederationHandler(BaseHandler):
         )
 
         if event:
-            if do_auth:
-                in_room = yield self.auth.check_host_in_room(
-                    event.room_id,
-                    origin
-                )
-                if not in_room:
-                    raise AuthError(403, "Host not in room.")
-
-                events = yield self._filter_events_for_server(
-                    origin, event.room_id, [event]
-                )
-
-                event = events[0]
+            in_room = yield self.auth.check_host_in_room(
+                event.room_id,
+                origin
+            )
+            if not in_room:
+                raise AuthError(403, "Host not in room.")
 
+            events = yield self._filter_events_for_server(
+                origin, event.room_id, [event]
+            )
+            event = events[0]
             defer.returnValue(event)
         else:
             defer.returnValue(None)
@@ -1751,6 +1849,10 @@ class FederationHandler(BaseHandler):
             min_depth=min_depth,
         )
 
+        missing_events = yield self._filter_events_for_server(
+            origin, room_id, missing_events,
+        )
+
         defer.returnValue(missing_events)
 
     @defer.inlineCallbacks
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index 91a0898860..277c2b7760 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -1,6 +1,7 @@
 # -*- coding: utf-8 -*-
 # Copyright 2015, 2016 OpenMarket Ltd
 # Copyright 2017 Vector Creations Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -18,7 +19,7 @@
 
 import logging
 
-import simplejson as json
+from canonicaljson import json
 
 from twisted.internet import defer
 
@@ -26,7 +27,6 @@ from synapse.api.errors import (
     MatrixCodeMessageException, CodeMessageException
 )
 from ._base import BaseHandler
-from synapse.util.async import run_on_reactor
 from synapse.api.errors import SynapseError, Codes
 
 logger = logging.getLogger(__name__)
@@ -38,6 +38,7 @@ class IdentityHandler(BaseHandler):
         super(IdentityHandler, self).__init__(hs)
 
         self.http_client = hs.get_simple_http_client()
+        self.federation_http_client = hs.get_http_client()
 
         self.trusted_id_servers = set(hs.config.trusted_third_party_id_servers)
         self.trust_any_id_server_just_for_testing_do_not_use = (
@@ -60,8 +61,6 @@ class IdentityHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def threepid_from_creds(self, creds):
-        yield run_on_reactor()
-
         if 'id_server' in creds:
             id_server = creds['id_server']
         elif 'idServer' in creds:
@@ -104,7 +103,6 @@ class IdentityHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def bind_threepid(self, creds, mxid):
-        yield run_on_reactor()
         logger.debug("binding threepid %r to %s", creds, mxid)
         data = None
 
@@ -139,9 +137,53 @@ class IdentityHandler(BaseHandler):
         defer.returnValue(data)
 
     @defer.inlineCallbacks
-    def requestEmailToken(self, id_server, email, client_secret, send_attempt, **kwargs):
-        yield run_on_reactor()
+    def unbind_threepid(self, mxid, threepid):
+        """
+        Removes a binding from an identity server
+        Args:
+            mxid (str): Matrix user ID of binding to be removed
+            threepid (dict): Dict with medium & address of binding to be removed
+
+        Returns:
+            Deferred[bool]: True on success, otherwise False
+        """
+        logger.debug("unbinding threepid %r from %s", threepid, mxid)
+        if not self.trusted_id_servers:
+            logger.warn("Can't unbind threepid: no trusted ID servers set in config")
+            defer.returnValue(False)
+
+        # We don't track what ID server we added 3pids on (perhaps we ought to)
+        # but we assume that any of the servers in the trusted list are in the
+        # same ID server federation, so we can pick any one of them to send the
+        # deletion request to.
+        id_server = next(iter(self.trusted_id_servers))
+
+        url = "https://%s/_matrix/identity/api/v1/3pid/unbind" % (id_server,)
+        content = {
+            "mxid": mxid,
+            "threepid": threepid,
+        }
+        headers = {}
+        # we abuse the federation http client to sign the request, but we have to send it
+        # using the normal http client since we don't want the SRV lookup and want normal
+        # 'browser-like' HTTPS.
+        self.federation_http_client.sign_request(
+            destination=None,
+            method='POST',
+            url_bytes='/_matrix/identity/api/v1/3pid/unbind'.encode('ascii'),
+            headers_dict=headers,
+            content=content,
+            destination_is=id_server,
+        )
+        yield self.http_client.post_json_get_json(
+            url,
+            content,
+            headers,
+        )
+        defer.returnValue(True)
 
+    @defer.inlineCallbacks
+    def requestEmailToken(self, id_server, email, client_secret, send_attempt, **kwargs):
         if not self._should_trust_id_server(id_server):
             raise SynapseError(
                 400, "Untrusted ID server '%s'" % id_server,
@@ -176,8 +218,6 @@ class IdentityHandler(BaseHandler):
             self, id_server, country, phone_number,
             client_secret, send_attempt, **kwargs
     ):
-        yield run_on_reactor()
-
         if not self._should_trust_id_server(id_server):
             raise SynapseError(
                 400, "Untrusted ID server '%s'" % id_server,
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 1cb81b6cf8..cbadf3c88e 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -14,13 +14,12 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
-import simplejson
 import sys
 
-from canonicaljson import encode_canonical_json
+from canonicaljson import encode_canonical_json, json
 import six
 from six import string_types, itervalues, iteritems
-from twisted.internet import defer, reactor
+from twisted.internet import defer
 from twisted.internet.defer import succeed
 from twisted.python.failure import Failure
 
@@ -36,7 +35,7 @@ from synapse.events.validator import EventValidator
 from synapse.types import (
     UserID, RoomAlias, RoomStreamToken,
 )
-from synapse.util.async import run_on_reactor, ReadWriteLock, Limiter
+from synapse.util.async import ReadWriteLock, Limiter
 from synapse.util.logcontext import run_in_background
 from synapse.util.metrics import measure_func
 from synapse.util.frozenutils import frozendict_json_encoder
@@ -157,7 +156,7 @@ class MessageHandler(BaseHandler):
             # remove the purge from the list 24 hours after it completes
             def clear_purge():
                 del self._purges_by_id[purge_id]
-            reactor.callLater(24 * 3600, clear_purge)
+            self.hs.get_reactor().callLater(24 * 3600, clear_purge)
 
     def get_purge_status(self, purge_id):
         """Get the current status of an active purge
@@ -491,7 +490,7 @@ class EventCreationHandler(object):
                         target, e
                     )
 
-        is_exempt = yield self._is_exempt_from_privacy_policy(builder)
+        is_exempt = yield self._is_exempt_from_privacy_policy(builder, requester)
         if not is_exempt:
             yield self.assert_accepted_privacy_policy(requester)
 
@@ -509,12 +508,13 @@ class EventCreationHandler(object):
 
         defer.returnValue((event, context))
 
-    def _is_exempt_from_privacy_policy(self, builder):
+    def _is_exempt_from_privacy_policy(self, builder, requester):
         """"Determine if an event to be sent is exempt from having to consent
         to the privacy policy
 
         Args:
             builder (synapse.events.builder.EventBuilder): event being created
+            requester (Requster): user requesting this event
 
         Returns:
             Deferred[bool]: true if the event can be sent without the user
@@ -525,6 +525,9 @@ class EventCreationHandler(object):
             membership = builder.content.get("membership", None)
             if membership == Membership.JOIN:
                 return self._is_server_notices_room(builder.room_id)
+            elif membership == Membership.LEAVE:
+                # the user is always allowed to leave (but not kick people)
+                return builder.state_key == requester.user.to_string()
         return succeed(False)
 
     @defer.inlineCallbacks
@@ -793,7 +796,7 @@ class EventCreationHandler(object):
         # Ensure that we can round trip before trying to persist in db
         try:
             dump = frozendict_json_encoder.encode(event.content)
-            simplejson.loads(dump)
+            json.loads(dump)
         except Exception:
             logger.exception("Failed to encode content: %r", event.content)
             raise
@@ -806,6 +809,7 @@ class EventCreationHandler(object):
             # If we're a worker we need to hit out to the master.
             if self.config.worker_app:
                 yield send_event_to_master(
+                    self.hs.get_clock(),
                     self.http_client,
                     host=self.config.worker_replication_host,
                     port=self.config.worker_replication_http_port,
@@ -959,9 +963,7 @@ class EventCreationHandler(object):
             event_stream_id, max_stream_id
         )
 
-        @defer.inlineCallbacks
         def _notify():
-            yield run_on_reactor()
             try:
                 self.notifier.on_new_room_event(
                     event, event_stream_id, max_stream_id,
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 7fe568132f..7db59fba00 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -22,7 +22,7 @@ The methods that define policy are:
     - should_notify
 """
 
-from twisted.internet import defer, reactor
+from twisted.internet import defer
 from contextlib import contextmanager
 
 from six import itervalues, iteritems
@@ -179,7 +179,7 @@ class PresenceHandler(object):
         # have not yet been persisted
         self.unpersisted_users_changes = set()
 
-        reactor.addSystemEventTrigger("before", "shutdown", self._on_shutdown)
+        hs.get_reactor().addSystemEventTrigger("before", "shutdown", self._on_shutdown)
 
         self.serial_to_user = {}
         self._next_serial = 1
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 7e52adda3c..e76ef5426d 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -24,7 +24,7 @@ from synapse.api.errors import (
 from synapse.http.client import CaptchaServerHttpClient
 from synapse import types
 from synapse.types import UserID, create_requester, RoomID, RoomAlias
-from synapse.util.async import run_on_reactor, Linearizer
+from synapse.util.async import Linearizer
 from synapse.util.threepids import check_3pid_allowed
 from ._base import BaseHandler
 
@@ -139,7 +139,6 @@ class RegistrationHandler(BaseHandler):
         Raises:
             RegistrationError if there was a problem registering.
         """
-        yield run_on_reactor()
         password_hash = None
         if password:
             password_hash = yield self.auth_handler().hash(password)
@@ -431,8 +430,6 @@ class RegistrationHandler(BaseHandler):
         Raises:
             RegistrationError if there was a problem registering.
         """
-        yield run_on_reactor()
-
         if localpart is None:
             raise SynapseError(400, "Request must include user id")
 
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 2abd63ad05..ab72963d87 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -23,7 +23,7 @@ from synapse.types import UserID, RoomAlias, RoomID, RoomStreamToken
 from synapse.api.constants import (
     EventTypes, JoinRules, RoomCreationPreset
 )
-from synapse.api.errors import AuthError, StoreError, SynapseError
+from synapse.api.errors import AuthError, Codes, StoreError, SynapseError
 from synapse.util import stringutils
 from synapse.visibility import filter_events_for_client
 
@@ -115,7 +115,11 @@ class RoomCreationHandler(BaseHandler):
             )
 
             if mapping:
-                raise SynapseError(400, "Room alias already taken")
+                raise SynapseError(
+                    400,
+                    "Room alias already taken",
+                    Codes.ROOM_IN_USE
+                )
         else:
             room_alias = None
 
diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
index 1eca26aa1e..2e3a77ca4b 100644
--- a/synapse/handlers/search.py
+++ b/synapse/handlers/search.py
@@ -64,6 +64,13 @@ class SearchHandler(BaseHandler):
             except Exception:
                 raise SynapseError(400, "Invalid batch")
 
+        logger.info(
+            "Search batch properties: %r, %r, %r",
+            batch_group, batch_group_key, batch_token,
+        )
+
+        logger.info("Search content: %s", content)
+
         try:
             room_cat = content["search_categories"]["room_events"]
 
@@ -271,6 +278,8 @@ class SearchHandler(BaseHandler):
             # We should never get here due to the guard earlier.
             raise NotImplementedError()
 
+        logger.info("Found %d events to return", len(allowed_events))
+
         # If client has asked for "context" for each event (i.e. some surrounding
         # events and state), fetch that
         if event_context is not None:
@@ -282,6 +291,11 @@ class SearchHandler(BaseHandler):
                     event.room_id, event.event_id, before_limit, after_limit
                 )
 
+                logger.info(
+                    "Context for search returned %d and %d events",
+                    len(res["events_before"]), len(res["events_after"]),
+                )
+
                 res["events_before"] = yield filter_events_for_client(
                     self.store, user.to_string(), res["events_before"]
                 )
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 51ec727df0..7f486e48e5 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -145,7 +145,7 @@ class SyncResult(collections.namedtuple("SyncResult", [
     "invited",  # InvitedSyncResult for each invited room.
     "archived",  # ArchivedSyncResult for each archived room.
     "to_device",  # List of direct messages for the device.
-    "device_lists",  # List of user_ids whose devices have chanegd
+    "device_lists",  # List of user_ids whose devices have changed
     "device_one_time_keys_count",  # Dict of algorithm to count for one time keys
                                    # for this device
     "groups",
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index a39f0f7343..7e4a114d4f 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -19,7 +19,6 @@ from twisted.internet import defer
 from synapse.api.constants import EventTypes, JoinRules, Membership
 from synapse.storage.roommember import ProfileInfo
 from synapse.util.metrics import Measure
-from synapse.util.async import sleep
 from synapse.types import get_localpart_from_id
 
 from six import iteritems
@@ -174,7 +173,7 @@ class UserDirectoryHandler(object):
             logger.info("Handling room %d/%d", num_processed_rooms + 1, len(room_ids))
             yield self._handle_initial_room(room_id)
             num_processed_rooms += 1
-            yield sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
+            yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
 
         logger.info("Processed all rooms.")
 
@@ -188,7 +187,7 @@ class UserDirectoryHandler(object):
                 logger.info("Handling user %d/%d", num_processed_users + 1, len(user_ids))
                 yield self._handle_local_user(user_id)
                 num_processed_users += 1
-                yield sleep(self.INITIAL_USER_SLEEP_MS / 1000.)
+                yield self.clock.sleep(self.INITIAL_USER_SLEEP_MS / 1000.)
 
             logger.info("Processed all users")
 
@@ -236,7 +235,7 @@ class UserDirectoryHandler(object):
         count = 0
         for user_id in user_ids:
             if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
-                yield sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
+                yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
 
             if not self.is_mine_id(user_id):
                 count += 1
@@ -251,7 +250,7 @@ class UserDirectoryHandler(object):
                     continue
 
                 if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
-                    yield sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
+                    yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
                 count += 1
 
                 user_set = (user_id, other_user_id)