summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/account_data.py7
-rw-r--r--synapse/handlers/admin.py7
-rw-r--r--synapse/handlers/appservice.py5
-rw-r--r--synapse/handlers/auth.py6
-rw-r--r--synapse/handlers/device.py20
-rw-r--r--synapse/handlers/devicemessage.py2
-rw-r--r--synapse/handlers/directory.py2
-rw-r--r--synapse/handlers/e2e_keys.py545
-rw-r--r--synapse/handlers/events.py6
-rw-r--r--synapse/handlers/federation.py156
-rw-r--r--synapse/handlers/groups_local.py2
-rw-r--r--synapse/handlers/identity.py6
-rw-r--r--synapse/handlers/initial_sync.py18
-rw-r--r--synapse/handlers/message.py29
-rw-r--r--synapse/handlers/pagination.py25
-rw-r--r--synapse/handlers/profile.py2
-rw-r--r--synapse/handlers/read_marker.py13
-rw-r--r--synapse/handlers/receipts.py37
-rw-r--r--synapse/handlers/register.py54
-rw-r--r--synapse/handlers/room.py45
-rw-r--r--synapse/handlers/room_member.py125
-rw-r--r--synapse/handlers/search.py24
-rw-r--r--synapse/handlers/stats.py9
-rw-r--r--synapse/handlers/sync.py36
-rw-r--r--synapse/handlers/ui_auth/checkers.py2
25 files changed, 924 insertions, 259 deletions
diff --git a/synapse/handlers/account_data.py b/synapse/handlers/account_data.py
index 38bc67191c..2d7e6df6e4 100644
--- a/synapse/handlers/account_data.py
+++ b/synapse/handlers/account_data.py
@@ -38,9 +38,10 @@ class AccountDataEventSource(object):
                 {"type": "m.tag", "content": {"tags": room_tags}, "room_id": room_id}
             )
 
-        account_data, room_account_data = (
-            yield self.store.get_updated_account_data_for_user(user_id, last_stream_id)
-        )
+        (
+            account_data,
+            room_account_data,
+        ) = yield self.store.get_updated_account_data_for_user(user_id, last_stream_id)
 
         for account_data_type, content in account_data.items():
             results.append({"type": account_data_type, "content": content})
diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py
index 1a87b58838..6407d56f8e 100644
--- a/synapse/handlers/admin.py
+++ b/synapse/handlers/admin.py
@@ -30,6 +30,9 @@ class AdminHandler(BaseHandler):
     def __init__(self, hs):
         super(AdminHandler, self).__init__(hs)
 
+        self.storage = hs.get_storage()
+        self.state_store = self.storage.state
+
     @defer.inlineCallbacks
     def get_whois(self, user):
         connections = []
@@ -205,7 +208,7 @@ class AdminHandler(BaseHandler):
 
                 from_key = events[-1].internal_metadata.after
 
-                events = yield filter_events_for_client(self.store, user_id, events)
+                events = yield filter_events_for_client(self.storage, user_id, events)
 
                 writer.write_events(room_id, events)
 
@@ -241,7 +244,7 @@ class AdminHandler(BaseHandler):
             for event_id in extremities:
                 if not event_to_unseen_prevs[event_id]:
                     continue
-                state = yield self.store.get_state_for_event(event_id)
+                state = yield self.state_store.get_state_for_event(event_id)
                 writer.write_state(room_id, event_id, state)
 
         return writer.finished()
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 3e9b298154..fe62f78e67 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -73,7 +73,10 @@ class ApplicationServicesHandler(object):
             try:
                 limit = 100
                 while True:
-                    upper_bound, events = yield self.store.get_new_events_for_appservice(
+                    (
+                        upper_bound,
+                        events,
+                    ) = yield self.store.get_new_events_for_appservice(
                         self.current_max, limit
                     )
 
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 333eb30625..7a0f54ca24 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -525,7 +525,7 @@ class AuthHandler(BaseHandler):
 
         result = None
         if not user_infos:
-            logger.warn("Attempted to login as %s but they do not exist", user_id)
+            logger.warning("Attempted to login as %s but they do not exist", user_id)
         elif len(user_infos) == 1:
             # a single match (possibly not exact)
             result = user_infos.popitem()
@@ -534,7 +534,7 @@ class AuthHandler(BaseHandler):
             result = (user_id, user_infos[user_id])
         else:
             # multiple matches, none of them exact
-            logger.warn(
+            logger.warning(
                 "Attempted to login as %s but it matches more than one user "
                 "inexactly: %r",
                 user_id,
@@ -728,7 +728,7 @@ class AuthHandler(BaseHandler):
 
         result = yield self.validate_hash(password, password_hash)
         if not result:
-            logger.warn("Failed password login for user %s", user_id)
+            logger.warning("Failed password login for user %s", user_id)
             return None
         return user_id
 
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 5f23ee4488..26ef5e150c 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -46,6 +46,7 @@ class DeviceWorkerHandler(BaseHandler):
 
         self.hs = hs
         self.state = hs.get_state_handler()
+        self.state_store = hs.get_storage().state
         self._auth_handler = hs.get_auth_handler()
 
     @trace
@@ -178,7 +179,7 @@ class DeviceWorkerHandler(BaseHandler):
                 continue
 
             # mapping from event_id -> state_dict
-            prev_state_ids = yield self.store.get_state_ids_for_events(event_ids)
+            prev_state_ids = yield self.state_store.get_state_ids_for_events(event_ids)
 
             # Check if we've joined the room? If so we just blindly add all the users to
             # the "possibly changed" users.
@@ -458,7 +459,18 @@ class DeviceHandler(DeviceWorkerHandler):
     @defer.inlineCallbacks
     def on_federation_query_user_devices(self, user_id):
         stream_id, devices = yield self.store.get_devices_with_keys_by_user(user_id)
-        return {"user_id": user_id, "stream_id": stream_id, "devices": devices}
+        master_key = yield self.store.get_e2e_cross_signing_key(user_id, "master")
+        self_signing_key = yield self.store.get_e2e_cross_signing_key(
+            user_id, "self_signing"
+        )
+
+        return {
+            "user_id": user_id,
+            "stream_id": stream_id,
+            "devices": devices,
+            "master_key": master_key,
+            "self_signing_key": self_signing_key,
+        }
 
     @defer.inlineCallbacks
     def user_left_room(self, user, room_id):
@@ -656,7 +668,7 @@ class DeviceListUpdater(object):
         except (NotRetryingDestination, RequestSendFailed, HttpResponseException):
             # TODO: Remember that we are now out of sync and try again
             # later
-            logger.warn("Failed to handle device list update for %s", user_id)
+            logger.warning("Failed to handle device list update for %s", user_id)
             # We abort on exceptions rather than accepting the update
             # as otherwise synapse will 'forget' that its device list
             # is out of date. If we bail then we will retry the resync
@@ -694,7 +706,7 @@ class DeviceListUpdater(object):
         # up on storing the total list of devices and only handle the
         # delta instead.
         if len(devices) > 1000:
-            logger.warn(
+            logger.warning(
                 "Ignoring device list snapshot for %s as it has >1K devs (%d)",
                 user_id,
                 len(devices),
diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py
index 0043cbea17..73b9e120f5 100644
--- a/synapse/handlers/devicemessage.py
+++ b/synapse/handlers/devicemessage.py
@@ -52,7 +52,7 @@ class DeviceMessageHandler(object):
         local_messages = {}
         sender_user_id = content["sender"]
         if origin != get_domain_from_id(sender_user_id):
-            logger.warn(
+            logger.warning(
                 "Dropping device message from %r with spoofed sender %r",
                 origin,
                 sender_user_id,
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 526379c6f7..c4632f8984 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -250,7 +250,7 @@ class DirectoryHandler(BaseHandler):
                     ignore_backoff=True,
                 )
             except CodeMessageException as e:
-                logging.warn("Error retrieving alias")
+                logging.warning("Error retrieving alias")
                 if e.code == 404:
                     result = None
                 else:
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index be675197f1..f09a0b73c8 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -19,12 +19,15 @@ import logging
 
 from six import iteritems
 
+import attr
 from canonicaljson import encode_canonical_json, json
+from signedjson.key import decode_verify_key_bytes
 from signedjson.sign import SignatureVerifyException, verify_signed_json
+from unpaddedbase64 import decode_base64
 
 from twisted.internet import defer
 
-from synapse.api.errors import CodeMessageException, Codes, SynapseError
+from synapse.api.errors import CodeMessageException, Codes, NotFoundError, SynapseError
 from synapse.logging.context import make_deferred_yieldable, run_in_background
 from synapse.logging.opentracing import log_kv, set_tag, tag_args, trace
 from synapse.types import (
@@ -33,6 +36,8 @@ from synapse.types import (
     get_verify_key_from_cross_signing_key,
 )
 from synapse.util import unwrapFirstError
+from synapse.util.async_helpers import Linearizer
+from synapse.util.caches.expiringcache import ExpiringCache
 from synapse.util.retryutils import NotRetryingDestination
 
 logger = logging.getLogger(__name__)
@@ -46,10 +51,19 @@ class E2eKeysHandler(object):
         self.is_mine = hs.is_mine
         self.clock = hs.get_clock()
 
+        self._edu_updater = SigningKeyEduUpdater(hs, self)
+
+        federation_registry = hs.get_federation_registry()
+
+        # FIXME: switch to m.signing_key_update when MSC1756 is merged into the spec
+        federation_registry.register_edu_handler(
+            "org.matrix.signing_key_update",
+            self._edu_updater.incoming_signing_key_update,
+        )
         # doesn't really work as part of the generic query API, because the
         # query request requires an object POST, but we abuse the
         # "query handler" interface.
-        hs.get_federation_registry().register_query_handler(
+        federation_registry.register_query_handler(
             "client_keys", self.on_federation_query_client_keys
         )
 
@@ -116,9 +130,10 @@ class E2eKeysHandler(object):
                 else:
                     query_list.append((user_id, None))
 
-            user_ids_not_in_cache, remote_results = (
-                yield self.store.get_user_devices_from_cache(query_list)
-            )
+            (
+                user_ids_not_in_cache,
+                remote_results,
+            ) = yield self.store.get_user_devices_from_cache(query_list)
             for user_id, devices in iteritems(remote_results):
                 user_devices = results.setdefault(user_id, {})
                 for device_id, device in iteritems(devices):
@@ -204,13 +219,15 @@ class E2eKeysHandler(object):
                     if user_id in destination_query:
                         results[user_id] = keys
 
-                for user_id, key in remote_result["master_keys"].items():
-                    if user_id in destination_query:
-                        cross_signing_keys["master_keys"][user_id] = key
+                if "master_keys" in remote_result:
+                    for user_id, key in remote_result["master_keys"].items():
+                        if user_id in destination_query:
+                            cross_signing_keys["master_keys"][user_id] = key
 
-                for user_id, key in remote_result["self_signing_keys"].items():
-                    if user_id in destination_query:
-                        cross_signing_keys["self_signing_keys"][user_id] = key
+                if "self_signing_keys" in remote_result:
+                    for user_id, key in remote_result["self_signing_keys"].items():
+                        if user_id in destination_query:
+                            cross_signing_keys["self_signing_keys"][user_id] = key
 
             except Exception as e:
                 failure = _exception_to_failure(e)
@@ -248,7 +265,7 @@ class E2eKeysHandler(object):
 
         Returns:
             defer.Deferred[dict[str, dict[str, dict]]]: map from
-                (master|self_signing|user_signing) -> user_id -> key
+                (master_keys|self_signing_keys|user_signing_keys) -> user_id -> key
         """
         master_keys = {}
         self_signing_keys = {}
@@ -340,7 +357,16 @@ class E2eKeysHandler(object):
         """
         device_keys_query = query_body.get("device_keys", {})
         res = yield self.query_local_devices(device_keys_query)
-        return {"device_keys": res}
+        ret = {"device_keys": res}
+
+        # add in the cross-signing keys
+        cross_signing_keys = yield self.get_cross_signing_keys_from_cache(
+            device_keys_query, None
+        )
+
+        ret.update(cross_signing_keys)
+
+        return ret
 
     @trace
     @defer.inlineCallbacks
@@ -602,6 +628,349 @@ class E2eKeysHandler(object):
 
         return {}
 
+    @defer.inlineCallbacks
+    def upload_signatures_for_device_keys(self, user_id, signatures):
+        """Upload device signatures for cross-signing
+
+        Args:
+            user_id (string): the user uploading the signatures
+            signatures (dict[string, dict[string, dict]]): map of users to
+                devices to signed keys. This is the submission from the user; an
+                exception will be raised if it is malformed.
+        Returns:
+            dict: response to be sent back to the client.  The response will have
+                a "failures" key, which will be a dict mapping users to devices
+                to errors for the signatures that failed.
+        Raises:
+            SynapseError: if the signatures dict is not valid.
+        """
+        failures = {}
+
+        # signatures to be stored.  Each item will be a SignatureListItem
+        signature_list = []
+
+        # split between checking signatures for own user and signatures for
+        # other users, since we verify them with different keys
+        self_signatures = signatures.get(user_id, {})
+        other_signatures = {k: v for k, v in signatures.items() if k != user_id}
+
+        self_signature_list, self_failures = yield self._process_self_signatures(
+            user_id, self_signatures
+        )
+        signature_list.extend(self_signature_list)
+        failures.update(self_failures)
+
+        other_signature_list, other_failures = yield self._process_other_signatures(
+            user_id, other_signatures
+        )
+        signature_list.extend(other_signature_list)
+        failures.update(other_failures)
+
+        # store the signature, and send the appropriate notifications for sync
+        logger.debug("upload signature failures: %r", failures)
+        yield self.store.store_e2e_cross_signing_signatures(user_id, signature_list)
+
+        self_device_ids = [item.target_device_id for item in self_signature_list]
+        if self_device_ids:
+            yield self.device_handler.notify_device_update(user_id, self_device_ids)
+        signed_users = [item.target_user_id for item in other_signature_list]
+        if signed_users:
+            yield self.device_handler.notify_user_signature_update(
+                user_id, signed_users
+            )
+
+        return {"failures": failures}
+
+    @defer.inlineCallbacks
+    def _process_self_signatures(self, user_id, signatures):
+        """Process uploaded signatures of the user's own keys.
+
+        Signatures of the user's own keys from this API come in two forms:
+        - signatures of the user's devices by the user's self-signing key,
+        - signatures of the user's master key by the user's devices.
+
+        Args:
+            user_id (string): the user uploading the keys
+            signatures (dict[string, dict]): map of devices to signed keys
+
+        Returns:
+            (list[SignatureListItem], dict[string, dict[string, dict]]):
+            a list of signatures to store, and a map of users to devices to failure
+            reasons
+
+        Raises:
+            SynapseError: if the input is malformed
+        """
+        signature_list = []
+        failures = {}
+        if not signatures:
+            return signature_list, failures
+
+        if not isinstance(signatures, dict):
+            raise SynapseError(400, "Invalid parameter", Codes.INVALID_PARAM)
+
+        try:
+            # get our self-signing key to verify the signatures
+            (
+                _,
+                self_signing_key_id,
+                self_signing_verify_key,
+            ) = yield self._get_e2e_cross_signing_verify_key(user_id, "self_signing")
+
+            # get our master key, since we may have received a signature of it.
+            # We need to fetch it here so that we know what its key ID is, so
+            # that we can check if a signature that was sent is a signature of
+            # the master key or of a device
+            (
+                master_key,
+                _,
+                master_verify_key,
+            ) = yield self._get_e2e_cross_signing_verify_key(user_id, "master")
+
+            # fetch our stored devices.  This is used to 1. verify
+            # signatures on the master key, and 2. to compare with what
+            # was sent if the device was signed
+            devices = yield self.store.get_e2e_device_keys([(user_id, None)])
+
+            if user_id not in devices:
+                raise NotFoundError("No device keys found")
+
+            devices = devices[user_id]
+        except SynapseError as e:
+            failure = _exception_to_failure(e)
+            failures[user_id] = {device: failure for device in signatures.keys()}
+            return signature_list, failures
+
+        for device_id, device in signatures.items():
+            # make sure submitted data is in the right form
+            if not isinstance(device, dict):
+                raise SynapseError(400, "Invalid parameter", Codes.INVALID_PARAM)
+
+            try:
+                if "signatures" not in device or user_id not in device["signatures"]:
+                    # no signature was sent
+                    raise SynapseError(
+                        400, "Invalid signature", Codes.INVALID_SIGNATURE
+                    )
+
+                if device_id == master_verify_key.version:
+                    # The signature is of the master key. This needs to be
+                    # handled differently from signatures of normal devices.
+                    master_key_signature_list = self._check_master_key_signature(
+                        user_id, device_id, device, master_key, devices
+                    )
+                    signature_list.extend(master_key_signature_list)
+                    continue
+
+                # at this point, we have a device that should be signed
+                # by the self-signing key
+                if self_signing_key_id not in device["signatures"][user_id]:
+                    # no signature was sent
+                    raise SynapseError(
+                        400, "Invalid signature", Codes.INVALID_SIGNATURE
+                    )
+
+                try:
+                    stored_device = devices[device_id]
+                except KeyError:
+                    raise NotFoundError("Unknown device")
+                if self_signing_key_id in stored_device.get("signatures", {}).get(
+                    user_id, {}
+                ):
+                    # we already have a signature on this device, so we
+                    # can skip it, since it should be exactly the same
+                    continue
+
+                _check_device_signature(
+                    user_id, self_signing_verify_key, device, stored_device
+                )
+
+                signature = device["signatures"][user_id][self_signing_key_id]
+                signature_list.append(
+                    SignatureListItem(
+                        self_signing_key_id, user_id, device_id, signature
+                    )
+                )
+            except SynapseError as e:
+                failures.setdefault(user_id, {})[device_id] = _exception_to_failure(e)
+
+        return signature_list, failures
+
+    def _check_master_key_signature(
+        self, user_id, master_key_id, signed_master_key, stored_master_key, devices
+    ):
+        """Check signatures of a user's master key made by their devices.
+
+        Args:
+            user_id (string): the user whose master key is being checked
+            master_key_id (string): the ID of the user's master key
+            signed_master_key (dict): the user's signed master key that was uploaded
+            stored_master_key (dict): our previously-stored copy of the user's master key
+            devices (iterable(dict)): the user's devices
+
+        Returns:
+            list[SignatureListItem]: a list of signatures to store
+
+        Raises:
+            SynapseError: if a signature is invalid
+        """
+        # for each device that signed the master key, check the signature.
+        master_key_signature_list = []
+        sigs = signed_master_key["signatures"]
+        for signing_key_id, signature in sigs[user_id].items():
+            _, signing_device_id = signing_key_id.split(":", 1)
+            if (
+                signing_device_id not in devices
+                or signing_key_id not in devices[signing_device_id]["keys"]
+            ):
+                # signed by an unknown device, or the
+                # device does not have the key
+                raise SynapseError(400, "Invalid signature", Codes.INVALID_SIGNATURE)
+
+            # get the key and check the signature
+            pubkey = devices[signing_device_id]["keys"][signing_key_id]
+            verify_key = decode_verify_key_bytes(signing_key_id, decode_base64(pubkey))
+            _check_device_signature(
+                user_id, verify_key, signed_master_key, stored_master_key
+            )
+
+            master_key_signature_list.append(
+                SignatureListItem(signing_key_id, user_id, master_key_id, signature)
+            )
+
+        return master_key_signature_list
+
+    @defer.inlineCallbacks
+    def _process_other_signatures(self, user_id, signatures):
+        """Process uploaded signatures of other users' keys.  These will be the
+        target user's master keys, signed by the uploading user's user-signing
+        key.
+
+        Args:
+            user_id (string): the user uploading the keys
+            signatures (dict[string, dict]): map of users to devices to signed keys
+
+        Returns:
+            (list[SignatureListItem], dict[string, dict[string, dict]]):
+            a list of signatures to store, and a map of users to devices to failure
+            reasons
+
+        Raises:
+            SynapseError: if the input is malformed
+        """
+        signature_list = []
+        failures = {}
+        if not signatures:
+            return signature_list, failures
+
+        try:
+            # get our user-signing key to verify the signatures
+            (
+                user_signing_key,
+                user_signing_key_id,
+                user_signing_verify_key,
+            ) = yield self._get_e2e_cross_signing_verify_key(user_id, "user_signing")
+        except SynapseError as e:
+            failure = _exception_to_failure(e)
+            for user, devicemap in signatures.items():
+                failures[user] = {device_id: failure for device_id in devicemap.keys()}
+            return signature_list, failures
+
+        for target_user, devicemap in signatures.items():
+            # make sure submitted data is in the right form
+            if not isinstance(devicemap, dict):
+                raise SynapseError(400, "Invalid parameter", Codes.INVALID_PARAM)
+            for device in devicemap.values():
+                if not isinstance(device, dict):
+                    raise SynapseError(400, "Invalid parameter", Codes.INVALID_PARAM)
+
+            device_id = None
+            try:
+                # get the target user's master key, to make sure it matches
+                # what was sent
+                (
+                    master_key,
+                    master_key_id,
+                    _,
+                ) = yield self._get_e2e_cross_signing_verify_key(
+                    target_user, "master", user_id
+                )
+
+                # make sure that the target user's master key is the one that
+                # was signed (and no others)
+                device_id = master_key_id.split(":", 1)[1]
+                if device_id not in devicemap:
+                    logger.debug(
+                        "upload signature: could not find signature for device %s",
+                        device_id,
+                    )
+                    # set device to None so that the failure gets
+                    # marked on all the signatures
+                    device_id = None
+                    raise NotFoundError("Unknown device")
+                key = devicemap[device_id]
+                other_devices = [k for k in devicemap.keys() if k != device_id]
+                if other_devices:
+                    # other devices were signed -- mark those as failures
+                    logger.debug("upload signature: too many devices specified")
+                    failure = _exception_to_failure(NotFoundError("Unknown device"))
+                    failures[target_user] = {
+                        device: failure for device in other_devices
+                    }
+
+                if user_signing_key_id in master_key.get("signatures", {}).get(
+                    user_id, {}
+                ):
+                    # we already have the signature, so we can skip it
+                    continue
+
+                _check_device_signature(
+                    user_id, user_signing_verify_key, key, master_key
+                )
+
+                signature = key["signatures"][user_id][user_signing_key_id]
+                signature_list.append(
+                    SignatureListItem(
+                        user_signing_key_id, target_user, device_id, signature
+                    )
+                )
+            except SynapseError as e:
+                failure = _exception_to_failure(e)
+                if device_id is None:
+                    failures[target_user] = {
+                        device_id: failure for device_id in devicemap.keys()
+                    }
+                else:
+                    failures.setdefault(target_user, {})[device_id] = failure
+
+        return signature_list, failures
+
+    @defer.inlineCallbacks
+    def _get_e2e_cross_signing_verify_key(self, user_id, key_type, from_user_id=None):
+        """Fetch the cross-signing public key from storage and interpret it.
+
+        Args:
+            user_id (str): the user whose key should be fetched
+            key_type (str): the type of key to fetch
+            from_user_id (str): the user that we are fetching the keys for.
+                This affects what signatures are fetched.
+
+        Returns:
+            dict, str, VerifyKey: the raw key data, the key ID, and the
+                signedjson verify key
+
+        Raises:
+            NotFoundError: if the key is not found
+        """
+        key = yield self.store.get_e2e_cross_signing_key(
+            user_id, key_type, from_user_id
+        )
+        if key is None:
+            logger.debug("no %s key found for %s", key_type, user_id)
+            raise NotFoundError("No %s key found for %s" % (key_type, user_id))
+        key_id, verify_key = get_verify_key_from_cross_signing_key(key)
+        return key, key_id, verify_key
+
 
 def _check_cross_signing_key(key, user_id, key_type, signing_key=None):
     """Check a cross-signing key uploaded by a user.  Performs some basic sanity
@@ -630,7 +999,49 @@ def _check_cross_signing_key(key, user_id, key_type, signing_key=None):
             )
 
 
+def _check_device_signature(user_id, verify_key, signed_device, stored_device):
+    """Check that a signature on a device or cross-signing key is correct and
+    matches the copy of the device/key that we have stored.  Throws an
+    exception if an error is detected.
+
+    Args:
+        user_id (str): the user ID whose signature is being checked
+        verify_key (VerifyKey): the key to verify the device with
+        signed_device (dict): the uploaded signed device data
+        stored_device (dict): our previously stored copy of the device
+
+    Raises:
+        SynapseError: if the signature was invalid or the sent device is not the
+            same as the stored device
+
+    """
+
+    # make sure that the device submitted matches what we have stored
+    stripped_signed_device = {
+        k: v for k, v in signed_device.items() if k not in ["signatures", "unsigned"]
+    }
+    stripped_stored_device = {
+        k: v for k, v in stored_device.items() if k not in ["signatures", "unsigned"]
+    }
+    if stripped_signed_device != stripped_stored_device:
+        logger.debug(
+            "upload signatures: key does not match %s vs %s",
+            signed_device,
+            stored_device,
+        )
+        raise SynapseError(400, "Key does not match")
+
+    try:
+        verify_signed_json(signed_device, user_id, verify_key)
+    except SignatureVerifyException:
+        logger.debug("invalid signature on key")
+        raise SynapseError(400, "Invalid signature", Codes.INVALID_SIGNATURE)
+
+
 def _exception_to_failure(e):
+    if isinstance(e, SynapseError):
+        return {"status": e.code, "errcode": e.errcode, "message": str(e)}
+
     if isinstance(e, CodeMessageException):
         return {"status": e.code, "message": str(e)}
 
@@ -658,3 +1069,111 @@ def _one_time_keys_match(old_key_json, new_key):
     new_key_copy.pop("signatures", None)
 
     return old_key == new_key_copy
+
+
+@attr.s
+class SignatureListItem:
+    """An item in the signature list as used by upload_signatures_for_device_keys.
+    """
+
+    signing_key_id = attr.ib()
+    target_user_id = attr.ib()
+    target_device_id = attr.ib()
+    signature = attr.ib()
+
+
+class SigningKeyEduUpdater(object):
+    """Handles incoming signing key updates from federation and updates the DB"""
+
+    def __init__(self, hs, e2e_keys_handler):
+        self.store = hs.get_datastore()
+        self.federation = hs.get_federation_client()
+        self.clock = hs.get_clock()
+        self.e2e_keys_handler = e2e_keys_handler
+
+        self._remote_edu_linearizer = Linearizer(name="remote_signing_key")
+
+        # user_id -> list of updates waiting to be handled.
+        self._pending_updates = {}
+
+        # Recently seen stream ids. We don't bother keeping these in the DB,
+        # but they're useful to have them about to reduce the number of spurious
+        # resyncs.
+        self._seen_updates = ExpiringCache(
+            cache_name="signing_key_update_edu",
+            clock=self.clock,
+            max_len=10000,
+            expiry_ms=30 * 60 * 1000,
+            iterable=True,
+        )
+
+    @defer.inlineCallbacks
+    def incoming_signing_key_update(self, origin, edu_content):
+        """Called on incoming signing key update from federation. Responsible for
+        parsing the EDU and adding to pending updates list.
+
+        Args:
+            origin (string): the server that sent the EDU
+            edu_content (dict): the contents of the EDU
+        """
+
+        user_id = edu_content.pop("user_id")
+        master_key = edu_content.pop("master_key", None)
+        self_signing_key = edu_content.pop("self_signing_key", None)
+
+        if get_domain_from_id(user_id) != origin:
+            logger.warning("Got signing key update edu for %r from %r", user_id, origin)
+            return
+
+        room_ids = yield self.store.get_rooms_for_user(user_id)
+        if not room_ids:
+            # We don't share any rooms with this user. Ignore update, as we
+            # probably won't get any further updates.
+            return
+
+        self._pending_updates.setdefault(user_id, []).append(
+            (master_key, self_signing_key)
+        )
+
+        yield self._handle_signing_key_updates(user_id)
+
+    @defer.inlineCallbacks
+    def _handle_signing_key_updates(self, user_id):
+        """Actually handle pending updates.
+
+        Args:
+            user_id (string): the user whose updates we are processing
+        """
+
+        device_handler = self.e2e_keys_handler.device_handler
+
+        with (yield self._remote_edu_linearizer.queue(user_id)):
+            pending_updates = self._pending_updates.pop(user_id, [])
+            if not pending_updates:
+                # This can happen since we batch updates
+                return
+
+            device_ids = []
+
+            logger.info("pending updates: %r", pending_updates)
+
+            for master_key, self_signing_key in pending_updates:
+                if master_key:
+                    yield self.store.set_e2e_cross_signing_key(
+                        user_id, "master", master_key
+                    )
+                    _, verify_key = get_verify_key_from_cross_signing_key(master_key)
+                    # verify_key is a VerifyKey from signedjson, which uses
+                    # .version to denote the portion of the key ID after the
+                    # algorithm and colon, which is the device ID
+                    device_ids.append(verify_key.version)
+                if self_signing_key:
+                    yield self.store.set_e2e_cross_signing_key(
+                        user_id, "self_signing", self_signing_key
+                    )
+                    _, verify_key = get_verify_key_from_cross_signing_key(
+                        self_signing_key
+                    )
+                    device_ids.append(verify_key.version)
+
+            yield device_handler.notify_device_update(user_id, device_ids)
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index 5e748687e3..45fe13c62f 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -147,6 +147,10 @@ class EventStreamHandler(BaseHandler):
 
 
 class EventHandler(BaseHandler):
+    def __init__(self, hs):
+        super(EventHandler, self).__init__(hs)
+        self.storage = hs.get_storage()
+
     @defer.inlineCallbacks
     def get_event(self, user, room_id, event_id):
         """Retrieve a single specified event.
@@ -172,7 +176,7 @@ class EventHandler(BaseHandler):
         is_peeking = user.to_string() not in users
 
         filtered = yield filter_events_for_client(
-            self.store, user.to_string(), [event], is_peeking=is_peeking
+            self.storage, user.to_string(), [event], is_peeking=is_peeking
         )
 
         if not filtered:
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 4b4c6c15f9..05dd8d2671 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -45,6 +45,7 @@ from synapse.api.errors import (
 from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersions
 from synapse.crypto.event_signing import compute_event_signature
 from synapse.event_auth import auth_types_for_event
+from synapse.events.snapshot import EventContext
 from synapse.events.validator import EventValidator
 from synapse.logging.context import (
     make_deferred_yieldable,
@@ -109,6 +110,8 @@ class FederationHandler(BaseHandler):
         self.hs = hs
 
         self.store = hs.get_datastore()
+        self.storage = hs.get_storage()
+        self.state_store = self.storage.state
         self.federation_client = hs.get_federation_client()
         self.state_handler = hs.get_state_handler()
         self.server_name = hs.hostname
@@ -180,7 +183,7 @@ class FederationHandler(BaseHandler):
         try:
             self._sanity_check_event(pdu)
         except SynapseError as err:
-            logger.warn(
+            logger.warning(
                 "[%s %s] Received event failed sanity checks", room_id, event_id
             )
             raise FederationError("ERROR", err.code, err.msg, affected=pdu.event_id)
@@ -301,7 +304,7 @@ class FederationHandler(BaseHandler):
                 # following.
 
                 if sent_to_us_directly:
-                    logger.warn(
+                    logger.warning(
                         "[%s %s] Rejecting: failed to fetch %d prev events: %s",
                         room_id,
                         event_id,
@@ -324,7 +327,7 @@ class FederationHandler(BaseHandler):
                 event_map = {event_id: pdu}
                 try:
                     # Get the state of the events we know about
-                    ours = yield self.store.get_state_groups_ids(room_id, seen)
+                    ours = yield self.state_store.get_state_groups_ids(room_id, seen)
 
                     # state_maps is a list of mappings from (type, state_key) to event_id
                     state_maps = list(
@@ -350,10 +353,11 @@ class FederationHandler(BaseHandler):
                             # note that if any of the missing prevs share missing state or
                             # auth events, the requests to fetch those events are deduped
                             # by the get_pdu_cache in federation_client.
-                            remote_state, got_auth_chain = (
-                                yield self.federation_client.get_state_for_room(
-                                    origin, room_id, p
-                                )
+                            (
+                                remote_state,
+                                got_auth_chain,
+                            ) = yield self.federation_client.get_state_for_room(
+                                origin, room_id, p
                             )
 
                             # we want the state *after* p; get_state_for_room returns the
@@ -405,7 +409,7 @@ class FederationHandler(BaseHandler):
                     state = [event_map[e] for e in six.itervalues(state_map)]
                     auth_chain = list(auth_chains)
                 except Exception:
-                    logger.warn(
+                    logger.warning(
                         "[%s %s] Error attempting to resolve state at missing "
                         "prev_events",
                         room_id,
@@ -518,7 +522,9 @@ class FederationHandler(BaseHandler):
             # We failed to get the missing events, but since we need to handle
             # the case of `get_missing_events` not returning the necessary
             # events anyway, it is safe to simply log the error and continue.
-            logger.warn("[%s %s]: Failed to get prev_events: %s", room_id, event_id, e)
+            logger.warning(
+                "[%s %s]: Failed to get prev_events: %s", room_id, event_id, e
+            )
             return
 
         logger.info(
@@ -545,7 +551,7 @@ class FederationHandler(BaseHandler):
                     yield self.on_receive_pdu(origin, ev, sent_to_us_directly=False)
                 except FederationError as e:
                     if e.code == 403:
-                        logger.warn(
+                        logger.warning(
                             "[%s %s] Received prev_event %s failed history check.",
                             room_id,
                             event_id,
@@ -888,7 +894,7 @@ class FederationHandler(BaseHandler):
         # We set `check_history_visibility_only` as we might otherwise get false
         # positives from users having been erased.
         filtered_extremities = yield filter_events_for_server(
-            self.store,
+            self.storage,
             self.server_name,
             list(extremities_events.values()),
             redact=False,
@@ -1059,7 +1065,7 @@ class FederationHandler(BaseHandler):
             SynapseError if the event does not pass muster
         """
         if len(ev.prev_event_ids()) > 20:
-            logger.warn(
+            logger.warning(
                 "Rejecting event %s which has %i prev_events",
                 ev.event_id,
                 len(ev.prev_event_ids()),
@@ -1067,7 +1073,7 @@ class FederationHandler(BaseHandler):
             raise SynapseError(http_client.BAD_REQUEST, "Too many prev_events")
 
         if len(ev.auth_event_ids()) > 10:
-            logger.warn(
+            logger.warning(
                 "Rejecting event %s which has %i auth_events",
                 ev.event_id,
                 len(ev.auth_event_ids()),
@@ -1101,7 +1107,7 @@ class FederationHandler(BaseHandler):
     @defer.inlineCallbacks
     def do_invite_join(self, target_hosts, room_id, joinee, content):
         """ Attempts to join the `joinee` to the room `room_id` via the
-        server `target_host`.
+        servers contained in `target_hosts`.
 
         This first triggers a /make_join/ request that returns a partial
         event that we can fill out and sign. This is then sent to the
@@ -1110,6 +1116,15 @@ class FederationHandler(BaseHandler):
 
         We suspend processing of any received events from this room until we
         have finished processing the join.
+
+        Args:
+            target_hosts (Iterable[str]): List of servers to attempt to join the room with.
+
+            room_id (str): The ID of the room to join.
+
+            joinee (str): The User ID of the joining user.
+
+            content (dict): The event content to use for the join event.
         """
         logger.debug("Joining %s to %s", joinee, room_id)
 
@@ -1169,6 +1184,22 @@ class FederationHandler(BaseHandler):
 
             yield self._persist_auth_tree(origin, auth_chain, state, event)
 
+            # Check whether this room is the result of an upgrade of a room we already know
+            # about. If so, migrate over user information
+            predecessor = yield self.store.get_room_predecessor(room_id)
+            if not predecessor:
+                return
+            old_room_id = predecessor["room_id"]
+            logger.debug(
+                "Found predecessor for %s during remote join: %s", room_id, old_room_id
+            )
+
+            # We retrieve the room member handler here as to not cause a cyclic dependency
+            member_handler = self.hs.get_room_member_handler()
+            yield member_handler.transfer_room_state_on_room_upgrade(
+                old_room_id, room_id
+            )
+
             logger.debug("Finished joining %s to %s", joinee, room_id)
         finally:
             room_queue = self.room_queues[room_id]
@@ -1203,7 +1234,7 @@ class FederationHandler(BaseHandler):
                 with nested_logging_context(p.event_id):
                     yield self.on_receive_pdu(origin, p, sent_to_us_directly=True)
             except Exception as e:
-                logger.warn(
+                logger.warning(
                     "Error handling queued PDU %s from %s: %s", p.event_id, origin, e
                 )
 
@@ -1222,7 +1253,6 @@ class FederationHandler(BaseHandler):
         Returns:
             Deferred[FrozenEvent]
         """
-
         if get_domain_from_id(user_id) != origin:
             logger.info(
                 "Got /make_join request for user %r from different origin %s, ignoring",
@@ -1251,7 +1281,7 @@ class FederationHandler(BaseHandler):
                 builder=builder
             )
         except AuthError as e:
-            logger.warn("Failed to create join %r because %s", event, e)
+            logger.warning("Failed to create join to %s because %s", room_id, e)
             raise e
 
         event_allowed = yield self.third_party_event_rules.check_event_allowed(
@@ -1280,11 +1310,20 @@ class FederationHandler(BaseHandler):
         event = pdu
 
         logger.debug(
-            "on_send_join_request: Got event: %s, signatures: %s",
+            "on_send_join_request from %s: Got event: %s, signatures: %s",
+            origin,
             event.event_id,
             event.signatures,
         )
 
+        if get_domain_from_id(event.sender) != origin:
+            logger.info(
+                "Got /send_join request for user %r from different origin %s",
+                event.sender,
+                origin,
+            )
+            raise SynapseError(403, "User not from origin", Codes.FORBIDDEN)
+
         event.internal_metadata.outlier = False
         # Send this event on behalf of the origin server.
         #
@@ -1486,7 +1525,7 @@ class FederationHandler(BaseHandler):
                 room_version, event, context, do_sig_check=False
             )
         except AuthError as e:
-            logger.warn("Failed to create new leave %r because %s", event, e)
+            logger.warning("Failed to create new leave %r because %s", event, e)
             raise e
 
         return event
@@ -1503,6 +1542,14 @@ class FederationHandler(BaseHandler):
             event.signatures,
         )
 
+        if get_domain_from_id(event.sender) != origin:
+            logger.info(
+                "Got /send_leave request for user %r from different origin %s",
+                event.sender,
+                origin,
+            )
+            raise SynapseError(403, "User not from origin", Codes.FORBIDDEN)
+
         event.internal_metadata.outlier = False
 
         context = yield self._handle_new_event(origin, event)
@@ -1533,7 +1580,7 @@ class FederationHandler(BaseHandler):
             event_id, allow_none=False, check_room_id=room_id
         )
 
-        state_groups = yield self.store.get_state_groups(room_id, [event_id])
+        state_groups = yield self.state_store.get_state_groups(room_id, [event_id])
 
         if state_groups:
             _, state = list(iteritems(state_groups)).pop()
@@ -1562,7 +1609,7 @@ class FederationHandler(BaseHandler):
             event_id, allow_none=False, check_room_id=room_id
         )
 
-        state_groups = yield self.store.get_state_groups_ids(room_id, [event_id])
+        state_groups = yield self.state_store.get_state_groups_ids(room_id, [event_id])
 
         if state_groups:
             _, state = list(state_groups.items()).pop()
@@ -1590,7 +1637,7 @@ class FederationHandler(BaseHandler):
 
         events = yield self.store.get_backfill_events(room_id, pdu_list, limit)
 
-        events = yield filter_events_for_server(self.store, origin, events)
+        events = yield filter_events_for_server(self.storage, origin, events)
 
         return events
 
@@ -1620,7 +1667,7 @@ class FederationHandler(BaseHandler):
             if not in_room:
                 raise AuthError(403, "Host not in room.")
 
-            events = yield filter_events_for_server(self.store, origin, [event])
+            events = yield filter_events_for_server(self.storage, origin, [event])
             event = events[0]
             return event
         else:
@@ -1641,7 +1688,11 @@ class FederationHandler(BaseHandler):
         # hack around with a try/finally instead.
         success = False
         try:
-            if not event.internal_metadata.is_outlier() and not backfilled:
+            if (
+                not event.internal_metadata.is_outlier()
+                and not backfilled
+                and not context.rejected
+            ):
                 yield self.action_generator.handle_push_actions_for_event(
                     event, context
                 )
@@ -1772,7 +1823,7 @@ class FederationHandler(BaseHandler):
                 # cause SynapseErrors in auth.check. We don't want to give up
                 # the attempt to federate altogether in such cases.
 
-                logger.warn("Rejecting %s because %s", e.event_id, err.msg)
+                logger.warning("Rejecting %s because %s", e.event_id, err.msg)
 
                 if e == event:
                     raise
@@ -1825,12 +1876,7 @@ class FederationHandler(BaseHandler):
                 if c and c.type == EventTypes.Create:
                     auth_events[(c.type, c.state_key)] = c
 
-        try:
-            yield self.do_auth(origin, event, context, auth_events=auth_events)
-        except AuthError as e:
-            logger.warn("[%s %s] Rejecting: %s", event.room_id, event.event_id, e.msg)
-
-            context.rejected = RejectedReason.AUTH_ERROR
+        context = yield self.do_auth(origin, event, context, auth_events=auth_events)
 
         if not context.rejected:
             yield self._check_for_soft_fail(event, state, backfilled)
@@ -1886,7 +1932,7 @@ class FederationHandler(BaseHandler):
                 # given state at the event. This should correctly handle cases
                 # like bans, especially with state res v2.
 
-                state_sets = yield self.store.get_state_groups(
+                state_sets = yield self.state_store.get_state_groups(
                     event.room_id, extrem_ids
                 )
                 state_sets = list(state_sets.values())
@@ -1922,7 +1968,7 @@ class FederationHandler(BaseHandler):
             try:
                 event_auth.check(room_version, event, auth_events=current_auth_events)
             except AuthError as e:
-                logger.warn("Soft-failing %r because %s", event, e)
+                logger.warning("Soft-failing %r because %s", event, e)
                 event.internal_metadata.soft_failed = True
 
     @defer.inlineCallbacks
@@ -1977,7 +2023,7 @@ class FederationHandler(BaseHandler):
         )
 
         missing_events = yield filter_events_for_server(
-            self.store, origin, missing_events
+            self.storage, origin, missing_events
         )
 
         return missing_events
@@ -1999,12 +2045,12 @@ class FederationHandler(BaseHandler):
 
                 Also NB that this function adds entries to it.
         Returns:
-            defer.Deferred[None]
+            defer.Deferred[EventContext]: updated context object
         """
         room_version = yield self.store.get_room_version(event.room_id)
 
         try:
-            yield self._update_auth_events_and_context_for_auth(
+            context = yield self._update_auth_events_and_context_for_auth(
                 origin, event, context, auth_events
             )
         except Exception:
@@ -2021,8 +2067,10 @@ class FederationHandler(BaseHandler):
         try:
             event_auth.check(room_version, event, auth_events=auth_events)
         except AuthError as e:
-            logger.warn("Failed auth resolution for %r because %s", event, e)
-            raise e
+            logger.warning("Failed auth resolution for %r because %s", event, e)
+            context.rejected = RejectedReason.AUTH_ERROR
+
+        return context
 
     @defer.inlineCallbacks
     def _update_auth_events_and_context_for_auth(
@@ -2046,7 +2094,7 @@ class FederationHandler(BaseHandler):
             auth_events (dict[(str, str)->synapse.events.EventBase]):
 
         Returns:
-            defer.Deferred[None]
+            defer.Deferred[EventContext]: updated context
         """
         event_auth_events = set(event.auth_event_ids())
 
@@ -2085,7 +2133,7 @@ class FederationHandler(BaseHandler):
                     # The other side isn't around or doesn't implement the
                     # endpoint, so lets just bail out.
                     logger.info("Failed to get event auth from remote: %s", e)
-                    return
+                    return context
 
                 seen_remotes = yield self.store.have_seen_events(
                     [e.event_id for e in remote_auth_chain]
@@ -2126,7 +2174,7 @@ class FederationHandler(BaseHandler):
 
         if event.internal_metadata.is_outlier():
             logger.info("Skipping auth_event fetch for outlier")
-            return
+            return context
 
         # FIXME: Assumes we have and stored all the state for all the
         # prev_events
@@ -2135,7 +2183,7 @@ class FederationHandler(BaseHandler):
         )
 
         if not different_auth:
-            return
+            return context
 
         logger.info(
             "auth_events refers to events which are not in our calculated auth "
@@ -2182,10 +2230,12 @@ class FederationHandler(BaseHandler):
 
             auth_events.update(new_state)
 
-            yield self._update_context_for_auth_events(
+            context = yield self._update_context_for_auth_events(
                 event, context, auth_events, event_key
             )
 
+        return context
+
     @defer.inlineCallbacks
     def _update_context_for_auth_events(self, event, context, auth_events, event_key):
         """Update the state_ids in an event context after auth event resolution,
@@ -2194,14 +2244,16 @@ class FederationHandler(BaseHandler):
         Args:
             event (Event): The event we're handling the context for
 
-            context (synapse.events.snapshot.EventContext): event context
-                to be updated
+            context (synapse.events.snapshot.EventContext): initial event context
 
             auth_events (dict[(str, str)->str]): Events to update in the event
                 context.
 
             event_key ((str, str)): (type, state_key) for the current event.
                 this will not be included in the current_state in the context.
+
+        Returns:
+            Deferred[EventContext]: new event context
         """
         state_updates = {
             k: a.event_id for k, a in iteritems(auth_events) if k != event_key
@@ -2218,7 +2270,7 @@ class FederationHandler(BaseHandler):
 
         # create a new state group as a delta from the existing one.
         prev_group = context.state_group
-        state_group = yield self.store.store_state_group(
+        state_group = yield self.state_store.store_state_group(
             event.event_id,
             event.room_id,
             prev_group=prev_group,
@@ -2226,8 +2278,9 @@ class FederationHandler(BaseHandler):
             current_state_ids=current_state_ids,
         )
 
-        yield context.update_state(
+        return EventContext.with_state(
             state_group=state_group,
+            state_group_before_event=context.state_group_before_event,
             current_state_ids=current_state_ids,
             prev_state_ids=prev_state_ids,
             prev_group=prev_group,
@@ -2415,10 +2468,12 @@ class FederationHandler(BaseHandler):
             try:
                 yield self.auth.check_from_context(room_version, event, context)
             except AuthError as e:
-                logger.warn("Denying new third party invite %r because %s", event, e)
+                logger.warning("Denying new third party invite %r because %s", event, e)
                 raise e
 
             yield self._check_signature(event, context)
+
+            # We retrieve the room member handler here as to not cause a cyclic dependency
             member_handler = self.hs.get_room_member_handler()
             yield member_handler.send_membership_event(None, event, context)
         else:
@@ -2471,7 +2526,7 @@ class FederationHandler(BaseHandler):
         try:
             yield self.auth.check_from_context(room_version, event, context)
         except AuthError as e:
-            logger.warn("Denying third party invite %r because %s", event, e)
+            logger.warning("Denying third party invite %r because %s", event, e)
             raise e
         yield self._check_signature(event, context)
 
@@ -2479,6 +2534,7 @@ class FederationHandler(BaseHandler):
         # though the sender isn't a local user.
         event.internal_metadata.send_on_behalf_of = get_domain_from_id(event.sender)
 
+        # We retrieve the room member handler here as to not cause a cyclic dependency
         member_handler = self.hs.get_room_member_handler()
         yield member_handler.send_membership_event(None, event, context)
 
@@ -2648,7 +2704,7 @@ class FederationHandler(BaseHandler):
                 backfilled=backfilled,
             )
         else:
-            max_stream_id = yield self.store.persist_events(
+            max_stream_id = yield self.storage.persistence.persist_events(
                 event_and_contexts, backfilled=backfilled
             )
 
diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py
index 46eb9ee88b..92fecbfc44 100644
--- a/synapse/handlers/groups_local.py
+++ b/synapse/handlers/groups_local.py
@@ -392,7 +392,7 @@ class GroupsLocalHandler(object):
         try:
             user_profile = yield self.profile_handler.get_profile(user_id)
         except Exception as e:
-            logger.warn("No profile for user %s: %s", user_id, e)
+            logger.warning("No profile for user %s: %s", user_id, e)
             user_profile = {}
 
         return {"state": "invite", "user_profile": user_profile}
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index ba99ddf76d..000fbf090f 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -272,7 +272,7 @@ class IdentityHandler(BaseHandler):
             changed = False
             if e.code in (400, 404, 501):
                 # The remote server probably doesn't support unbinding (yet)
-                logger.warn("Received %d response while unbinding threepid", e.code)
+                logger.warning("Received %d response while unbinding threepid", e.code)
             else:
                 logger.error("Failed to unbind threepid on identity server: %s", e)
                 raise SynapseError(500, "Failed to contact identity server")
@@ -403,7 +403,7 @@ class IdentityHandler(BaseHandler):
 
         if self.hs.config.using_identity_server_from_trusted_list:
             # Warn that a deprecated config option is in use
-            logger.warn(
+            logger.warning(
                 'The config option "trust_identity_server_for_password_resets" '
                 'has been replaced by "account_threepid_delegate". '
                 "Please consult the sample config at docs/sample_config.yaml for "
@@ -457,7 +457,7 @@ class IdentityHandler(BaseHandler):
 
         if self.hs.config.using_identity_server_from_trusted_list:
             # Warn that a deprecated config option is in use
-            logger.warn(
+            logger.warning(
                 'The config option "trust_identity_server_for_password_resets" '
                 'has been replaced by "account_threepid_delegate". '
                 "Please consult the sample config at docs/sample_config.yaml for "
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index f991efeee3..81dce96f4b 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -43,6 +43,8 @@ class InitialSyncHandler(BaseHandler):
         self.validator = EventValidator()
         self.snapshot_cache = SnapshotCache()
         self._event_serializer = hs.get_event_client_serializer()
+        self.storage = hs.get_storage()
+        self.state_store = self.storage.state
 
     def snapshot_all_rooms(
         self,
@@ -126,8 +128,8 @@ class InitialSyncHandler(BaseHandler):
 
         tags_by_room = yield self.store.get_tags_for_user(user_id)
 
-        account_data, account_data_by_room = (
-            yield self.store.get_account_data_for_user(user_id)
+        account_data, account_data_by_room = yield self.store.get_account_data_for_user(
+            user_id
         )
 
         public_room_ids = yield self.store.get_public_room_ids()
@@ -169,7 +171,7 @@ class InitialSyncHandler(BaseHandler):
                 elif event.membership == Membership.LEAVE:
                     room_end_token = "s%d" % (event.stream_ordering,)
                     deferred_room_state = run_in_background(
-                        self.store.get_state_for_events, [event.event_id]
+                        self.state_store.get_state_for_events, [event.event_id]
                     )
                     deferred_room_state.addCallback(
                         lambda states: states[event.event_id]
@@ -189,7 +191,9 @@ class InitialSyncHandler(BaseHandler):
                     )
                 ).addErrback(unwrapFirstError)
 
-                messages = yield filter_events_for_client(self.store, user_id, messages)
+                messages = yield filter_events_for_client(
+                    self.storage, user_id, messages
+                )
 
                 start_token = now_token.copy_and_replace("room_key", token)
                 end_token = now_token.copy_and_replace("room_key", room_end_token)
@@ -307,7 +311,7 @@ class InitialSyncHandler(BaseHandler):
     def _room_initial_sync_parted(
         self, user_id, room_id, pagin_config, membership, member_event_id, is_peeking
     ):
-        room_state = yield self.store.get_state_for_events([member_event_id])
+        room_state = yield self.state_store.get_state_for_events([member_event_id])
 
         room_state = room_state[member_event_id]
 
@@ -322,7 +326,7 @@ class InitialSyncHandler(BaseHandler):
         )
 
         messages = yield filter_events_for_client(
-            self.store, user_id, messages, is_peeking=is_peeking
+            self.storage, user_id, messages, is_peeking=is_peeking
         )
 
         start_token = StreamToken.START.copy_and_replace("room_key", token)
@@ -414,7 +418,7 @@ class InitialSyncHandler(BaseHandler):
         )
 
         messages = yield filter_events_for_client(
-            self.store, user_id, messages, is_peeking=is_peeking
+            self.storage, user_id, messages, is_peeking=is_peeking
         )
 
         start_token = now_token.copy_and_replace("room_key", token)
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 0f8cce8ffe..d682dc2b7a 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -59,6 +59,8 @@ class MessageHandler(object):
         self.clock = hs.get_clock()
         self.state = hs.get_state_handler()
         self.store = hs.get_datastore()
+        self.storage = hs.get_storage()
+        self.state_store = self.storage.state
         self._event_serializer = hs.get_event_client_serializer()
 
     @defer.inlineCallbacks
@@ -74,15 +76,16 @@ class MessageHandler(object):
         Raises:
             SynapseError if something went wrong.
         """
-        membership, membership_event_id = yield self.auth.check_in_room_or_world_readable(
-            room_id, user_id
-        )
+        (
+            membership,
+            membership_event_id,
+        ) = yield self.auth.check_in_room_or_world_readable(room_id, user_id)
 
         if membership == Membership.JOIN:
             data = yield self.state.get_current_state(room_id, event_type, state_key)
         elif membership == Membership.LEAVE:
             key = (event_type, state_key)
-            room_state = yield self.store.get_state_for_events(
+            room_state = yield self.state_store.get_state_for_events(
                 [membership_event_id], StateFilter.from_types([key])
             )
             data = room_state[membership_event_id].get(key)
@@ -135,12 +138,12 @@ class MessageHandler(object):
                 raise NotFoundError("Can't find event for token %s" % (at_token,))
 
             visible_events = yield filter_events_for_client(
-                self.store, user_id, last_events
+                self.storage, user_id, last_events
             )
 
             event = last_events[0]
             if visible_events:
-                room_state = yield self.store.get_state_for_events(
+                room_state = yield self.state_store.get_state_for_events(
                     [event.event_id], state_filter=state_filter
                 )
                 room_state = room_state[event.event_id]
@@ -151,9 +154,10 @@ class MessageHandler(object):
                     % (user_id, room_id, at_token),
                 )
         else:
-            membership, membership_event_id = (
-                yield self.auth.check_in_room_or_world_readable(room_id, user_id)
-            )
+            (
+                membership,
+                membership_event_id,
+            ) = yield self.auth.check_in_room_or_world_readable(room_id, user_id)
 
             if membership == Membership.JOIN:
                 state_ids = yield self.store.get_filtered_current_state_ids(
@@ -161,7 +165,7 @@ class MessageHandler(object):
                 )
                 room_state = yield self.store.get_events(state_ids.values())
             elif membership == Membership.LEAVE:
-                room_state = yield self.store.get_state_for_events(
+                room_state = yield self.state_store.get_state_for_events(
                     [membership_event_id], state_filter=state_filter
                 )
                 room_state = room_state[membership_event_id]
@@ -234,6 +238,7 @@ class EventCreationHandler(object):
         self.hs = hs
         self.auth = hs.get_auth()
         self.store = hs.get_datastore()
+        self.storage = hs.get_storage()
         self.state = hs.get_state_handler()
         self.clock = hs.get_clock()
         self.validator = EventValidator()
@@ -687,7 +692,7 @@ class EventCreationHandler(object):
         try:
             yield self.auth.check_from_context(room_version, event, context)
         except AuthError as err:
-            logger.warn("Denying new event %r because %s", event, err)
+            logger.warning("Denying new event %r because %s", event, err)
             raise err
 
         # Ensure that we can round trip before trying to persist in db
@@ -868,7 +873,7 @@ class EventCreationHandler(object):
             if prev_state_ids:
                 raise AuthError(403, "Changing the room create event is forbidden")
 
-        (event_stream_id, max_stream_id) = yield self.store.persist_event(
+        event_stream_id, max_stream_id = yield self.storage.persistence.persist_event(
             event, context=context
         )
 
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 5744f4579d..260a4351ca 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -69,6 +69,8 @@ class PaginationHandler(object):
         self.hs = hs
         self.auth = hs.get_auth()
         self.store = hs.get_datastore()
+        self.storage = hs.get_storage()
+        self.state_store = self.storage.state
         self.clock = hs.get_clock()
         self._server_name = hs.hostname
 
@@ -125,7 +127,9 @@ class PaginationHandler(object):
         self._purges_in_progress_by_room.add(room_id)
         try:
             with (yield self.pagination_lock.write(room_id)):
-                yield self.store.purge_history(room_id, token, delete_local_events)
+                yield self.storage.purge_events.purge_history(
+                    room_id, token, delete_local_events
+                )
             logger.info("[purge] complete")
             self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
         except Exception:
@@ -168,7 +172,7 @@ class PaginationHandler(object):
             if joined:
                 raise SynapseError(400, "Users are still joined to this room")
 
-            await self.store.purge_room(room_id)
+            await self.storage.purge_events.purge_room(room_id)
 
     @defer.inlineCallbacks
     def get_messages(
@@ -210,9 +214,10 @@ class PaginationHandler(object):
         source_config = pagin_config.get_source_config("room")
 
         with (yield self.pagination_lock.read(room_id)):
-            membership, member_event_id = yield self.auth.check_in_room_or_world_readable(
-                room_id, user_id
-            )
+            (
+                membership,
+                member_event_id,
+            ) = yield self.auth.check_in_room_or_world_readable(room_id, user_id)
 
             if source_config.direction == "b":
                 # if we're going backwards, we might need to backfill. This
@@ -255,7 +260,7 @@ class PaginationHandler(object):
                 events = event_filter.filter(events)
 
             events = yield filter_events_for_client(
-                self.store, user_id, events, is_peeking=(member_event_id is None)
+                self.storage, user_id, events, is_peeking=(member_event_id is None)
             )
 
         if not events:
@@ -274,7 +279,7 @@ class PaginationHandler(object):
                 (EventTypes.Member, event.sender) for event in events
             )
 
-            state_ids = yield self.store.get_state_ids_for_event(
+            state_ids = yield self.state_store.get_state_ids_for_event(
                 events[0].event_id, state_filter=state_filter
             )
 
@@ -295,10 +300,8 @@ class PaginationHandler(object):
         }
 
         if state:
-            chunk["state"] = (
-                yield self._event_serializer.serialize_events(
-                    state, time_now, as_client_event=as_client_event
-                )
+            chunk["state"] = yield self._event_serializer.serialize_events(
+                state, time_now, as_client_event=as_client_event
             )
 
         return chunk
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 8690f69d45..22e0a04da4 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -275,7 +275,7 @@ class BaseProfileHandler(BaseHandler):
                     ratelimit=False,  # Try to hide that these events aren't atomic.
                 )
             except Exception as e:
-                logger.warn(
+                logger.warning(
                     "Failed to update join event for room %s - %s", room_id, str(e)
                 )
 
diff --git a/synapse/handlers/read_marker.py b/synapse/handlers/read_marker.py
index 3e4d8c93a4..e3b528d271 100644
--- a/synapse/handlers/read_marker.py
+++ b/synapse/handlers/read_marker.py
@@ -15,8 +15,6 @@
 
 import logging
 
-from twisted.internet import defer
-
 from synapse.util.async_helpers import Linearizer
 
 from ._base import BaseHandler
@@ -32,8 +30,7 @@ class ReadMarkerHandler(BaseHandler):
         self.read_marker_linearizer = Linearizer(name="read_marker")
         self.notifier = hs.get_notifier()
 
-    @defer.inlineCallbacks
-    def received_client_read_marker(self, room_id, user_id, event_id):
+    async def received_client_read_marker(self, room_id, user_id, event_id):
         """Updates the read marker for a given user in a given room if the event ID given
         is ahead in the stream relative to the current read marker.
 
@@ -41,8 +38,8 @@ class ReadMarkerHandler(BaseHandler):
         the read marker has changed.
         """
 
-        with (yield self.read_marker_linearizer.queue((room_id, user_id))):
-            existing_read_marker = yield self.store.get_account_data_for_room_and_type(
+        with await self.read_marker_linearizer.queue((room_id, user_id)):
+            existing_read_marker = await self.store.get_account_data_for_room_and_type(
                 user_id, room_id, "m.fully_read"
             )
 
@@ -50,13 +47,13 @@ class ReadMarkerHandler(BaseHandler):
 
             if existing_read_marker:
                 # Only update if the new marker is ahead in the stream
-                should_update = yield self.store.is_event_after(
+                should_update = await self.store.is_event_after(
                     event_id, existing_read_marker["event_id"]
                 )
 
             if should_update:
                 content = {"event_id": event_id}
-                max_id = yield self.store.add_account_data_to_room(
+                max_id = await self.store.add_account_data_to_room(
                     user_id, room_id, "m.fully_read", content
                 )
                 self.notifier.on_new_event("account_data_key", max_id, users=[user_id])
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 6854c751a6..9283c039e3 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -18,6 +18,7 @@ from twisted.internet import defer
 
 from synapse.handlers._base import BaseHandler
 from synapse.types import ReadReceipt, get_domain_from_id
+from synapse.util.async_helpers import maybe_awaitable
 
 logger = logging.getLogger(__name__)
 
@@ -36,8 +37,7 @@ class ReceiptsHandler(BaseHandler):
         self.clock = self.hs.get_clock()
         self.state = hs.get_state_handler()
 
-    @defer.inlineCallbacks
-    def _received_remote_receipt(self, origin, content):
+    async def _received_remote_receipt(self, origin, content):
         """Called when we receive an EDU of type m.receipt from a remote HS.
         """
         receipts = []
@@ -62,17 +62,16 @@ class ReceiptsHandler(BaseHandler):
                         )
                     )
 
-        yield self._handle_new_receipts(receipts)
+        await self._handle_new_receipts(receipts)
 
-    @defer.inlineCallbacks
-    def _handle_new_receipts(self, receipts):
+    async def _handle_new_receipts(self, receipts):
         """Takes a list of receipts, stores them and informs the notifier.
         """
         min_batch_id = None
         max_batch_id = None
 
         for receipt in receipts:
-            res = yield self.store.insert_receipt(
+            res = await self.store.insert_receipt(
                 receipt.room_id,
                 receipt.receipt_type,
                 receipt.user_id,
@@ -99,14 +98,15 @@ class ReceiptsHandler(BaseHandler):
 
         self.notifier.on_new_event("receipt_key", max_batch_id, rooms=affected_room_ids)
         # Note that the min here shouldn't be relied upon to be accurate.
-        yield self.hs.get_pusherpool().on_new_receipts(
-            min_batch_id, max_batch_id, affected_room_ids
+        await maybe_awaitable(
+            self.hs.get_pusherpool().on_new_receipts(
+                min_batch_id, max_batch_id, affected_room_ids
+            )
         )
 
         return True
 
-    @defer.inlineCallbacks
-    def received_client_receipt(self, room_id, receipt_type, user_id, event_id):
+    async def received_client_receipt(self, room_id, receipt_type, user_id, event_id):
         """Called when a client tells us a local user has read up to the given
         event_id in the room.
         """
@@ -118,24 +118,11 @@ class ReceiptsHandler(BaseHandler):
             data={"ts": int(self.clock.time_msec())},
         )
 
-        is_new = yield self._handle_new_receipts([receipt])
+        is_new = await self._handle_new_receipts([receipt])
         if not is_new:
             return
 
-        yield self.federation.send_read_receipt(receipt)
-
-    @defer.inlineCallbacks
-    def get_receipts_for_room(self, room_id, to_key):
-        """Gets all receipts for a room, upto the given key.
-        """
-        result = yield self.store.get_linearized_receipts_for_room(
-            room_id, to_key=to_key
-        )
-
-        if not result:
-            return []
-
-        return result
+        await self.federation.send_read_receipt(receipt)
 
 
 class ReceiptEventSource(object):
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 53410f120b..235f11c322 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -24,7 +24,6 @@ from synapse.api.errors import (
     AuthError,
     Codes,
     ConsentNotGivenError,
-    LimitExceededError,
     RegistrationError,
     SynapseError,
 )
@@ -168,6 +167,7 @@ class RegistrationHandler(BaseHandler):
         Raises:
             RegistrationError if there was a problem registering.
         """
+        yield self.check_registration_ratelimit(address)
 
         yield self.auth.check_auth_blocking(threepid=threepid)
         password_hash = None
@@ -217,8 +217,13 @@ class RegistrationHandler(BaseHandler):
 
         else:
             # autogen a sequential user ID
+            fail_count = 0
             user = None
             while not user:
+                # Fail after being unable to find a suitable ID a few times
+                if fail_count > 10:
+                    raise SynapseError(500, "Unable to find a suitable guest user ID")
+
                 localpart = yield self._generate_user_id()
                 user = UserID(localpart, self.hs.hostname)
                 user_id = user.to_string()
@@ -233,10 +238,14 @@ class RegistrationHandler(BaseHandler):
                         create_profile_with_displayname=default_display_name,
                         address=address,
                     )
+
+                    # Successfully registered
+                    break
                 except SynapseError:
                     # if user id is taken, just generate another
                     user = None
                     user_id = None
+                    fail_count += 1
 
         if not self.hs.config.user_consent_at_registration:
             yield self._auto_join_rooms(user_id)
@@ -396,8 +405,8 @@ class RegistrationHandler(BaseHandler):
             room_id = room_identifier
         elif RoomAlias.is_valid(room_identifier):
             room_alias = RoomAlias.from_string(room_identifier)
-            room_id, remote_room_hosts = (
-                yield room_member_handler.lookup_room_alias(room_alias)
+            room_id, remote_room_hosts = yield room_member_handler.lookup_room_alias(
+                room_alias
             )
             room_id = room_id.to_string()
         else:
@@ -414,6 +423,29 @@ class RegistrationHandler(BaseHandler):
             ratelimit=False,
         )
 
+    def check_registration_ratelimit(self, address):
+        """A simple helper method to check whether the registration rate limit has been hit
+        for a given IP address
+
+        Args:
+            address (str|None): the IP address used to perform the registration. If this is
+                None, no ratelimiting will be performed.
+
+        Raises:
+            LimitExceededError: If the rate limit has been exceeded.
+        """
+        if not address:
+            return
+
+        time_now = self.clock.time()
+
+        self.ratelimiter.ratelimit(
+            address,
+            time_now_s=time_now,
+            rate_hz=self.hs.config.rc_registration.per_second,
+            burst_count=self.hs.config.rc_registration.burst_count,
+        )
+
     def register_with_store(
         self,
         user_id,
@@ -446,22 +478,6 @@ class RegistrationHandler(BaseHandler):
         Returns:
             Deferred
         """
-        # Don't rate limit for app services
-        if appservice_id is None and address is not None:
-            time_now = self.clock.time()
-
-            allowed, time_allowed = self.ratelimiter.can_do_action(
-                address,
-                time_now_s=time_now,
-                rate_hz=self.hs.config.rc_registration.per_second,
-                burst_count=self.hs.config.rc_registration.burst_count,
-            )
-
-            if not allowed:
-                raise LimitExceededError(
-                    retry_after_ms=int(1000 * (time_allowed - time_now))
-                )
-
         if self.hs.config.worker_app:
             return self._register_client(
                 user_id=user_id,
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 2816bd8f87..e92b2eafd5 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -129,6 +129,7 @@ class RoomCreationHandler(BaseHandler):
             old_room_id,
             new_version,  # args for _upgrade_room
         )
+
         return ret
 
     @defer.inlineCallbacks
@@ -147,21 +148,22 @@ class RoomCreationHandler(BaseHandler):
 
         # we create and auth the tombstone event before properly creating the new
         # room, to check our user has perms in the old room.
-        tombstone_event, tombstone_context = (
-            yield self.event_creation_handler.create_event(
-                requester,
-                {
-                    "type": EventTypes.Tombstone,
-                    "state_key": "",
-                    "room_id": old_room_id,
-                    "sender": user_id,
-                    "content": {
-                        "body": "This room has been replaced",
-                        "replacement_room": new_room_id,
-                    },
+        (
+            tombstone_event,
+            tombstone_context,
+        ) = yield self.event_creation_handler.create_event(
+            requester,
+            {
+                "type": EventTypes.Tombstone,
+                "state_key": "",
+                "room_id": old_room_id,
+                "sender": user_id,
+                "content": {
+                    "body": "This room has been replaced",
+                    "replacement_room": new_room_id,
                 },
-                token_id=requester.access_token_id,
-            )
+            },
+            token_id=requester.access_token_id,
         )
         old_room_version = yield self.store.get_room_version(old_room_id)
         yield self.auth.check_from_context(
@@ -188,7 +190,12 @@ class RoomCreationHandler(BaseHandler):
             requester, old_room_id, new_room_id, old_room_state
         )
 
-        # and finally, shut down the PLs in the old room, and update them in the new
+        # Copy over user push rules, tags and migrate room directory state
+        yield self.room_member_handler.transfer_room_state_on_room_upgrade(
+            old_room_id, new_room_id
+        )
+
+        # finally, shut down the PLs in the old room, and update them in the new
         # room.
         yield self._update_upgraded_room_pls(
             requester, old_room_id, new_room_id, old_room_state
@@ -822,6 +829,8 @@ class RoomContextHandler(object):
     def __init__(self, hs):
         self.hs = hs
         self.store = hs.get_datastore()
+        self.storage = hs.get_storage()
+        self.state_store = self.storage.state
 
     @defer.inlineCallbacks
     def get_event_context(self, user, room_id, event_id, limit, event_filter):
@@ -848,7 +857,7 @@ class RoomContextHandler(object):
 
         def filter_evts(events):
             return filter_events_for_client(
-                self.store, user.to_string(), events, is_peeking=is_peeking
+                self.storage, user.to_string(), events, is_peeking=is_peeking
             )
 
         event = yield self.store.get_event(
@@ -890,7 +899,7 @@ class RoomContextHandler(object):
         # first? Shouldn't we be consistent with /sync?
         # https://github.com/matrix-org/matrix-doc/issues/687
 
-        state = yield self.store.get_state_for_events(
+        state = yield self.state_store.get_state_for_events(
             [last_event_id], state_filter=state_filter
         )
         results["state"] = list(state[last_event_id].values())
@@ -922,7 +931,7 @@ class RoomEventSource(object):
 
         from_token = RoomStreamToken.parse(from_key)
         if from_token.topological:
-            logger.warn("Stream has topological part!!!! %r", from_key)
+            logger.warning("Stream has topological part!!!! %r", from_key)
             from_key = "s%s" % (from_token.stream,)
 
         app_service = self.store.get_app_service_by_user_id(user.to_string())
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 380e2fad5e..6cfee4b361 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -203,10 +203,6 @@ class RoomMemberHandler(object):
                 prev_member_event = yield self.store.get_event(prev_member_event_id)
                 newly_joined = prev_member_event.membership != Membership.JOIN
             if newly_joined:
-                # Copy over user state if we're joining an upgraded room
-                yield self.copy_user_state_if_room_upgrade(
-                    room_id, requester.user.to_string()
-                )
                 yield self._user_joined_room(target, room_id)
         elif event.membership == Membership.LEAVE:
             if prev_member_event_id:
@@ -455,11 +451,6 @@ class RoomMemberHandler(object):
                     requester, remote_room_hosts, room_id, target, content
                 )
 
-                # Copy over user state if this is a join on an remote upgraded room
-                yield self.copy_user_state_if_room_upgrade(
-                    room_id, requester.user.to_string()
-                )
-
                 return remote_join_response
 
         elif effective_membership_state == Membership.LEAVE:
@@ -498,36 +489,81 @@ class RoomMemberHandler(object):
         return res
 
     @defer.inlineCallbacks
-    def copy_user_state_if_room_upgrade(self, new_room_id, user_id):
-        """Copy user-specific information when they join a new room if that new room is the
+    def transfer_room_state_on_room_upgrade(self, old_room_id, room_id):
+        """Upon our server becoming aware of an upgraded room, either by upgrading a room
+        ourselves or joining one, we can transfer over information from the previous room.
+
+        Copies user state (tags/push rules) for every local user that was in the old room, as
+        well as migrating the room directory state.
+
+        Args:
+            old_room_id (str): The ID of the old room
+
+            room_id (str): The ID of the new room
+
+        Returns:
+            Deferred
+        """
+        # Find all local users that were in the old room and copy over each user's state
+        users = yield self.store.get_users_in_room(old_room_id)
+        yield self.copy_user_state_on_room_upgrade(old_room_id, room_id, users)
+
+        # Add new room to the room directory if the old room was there
+        # Remove old room from the room directory
+        old_room = yield self.store.get_room(old_room_id)
+        if old_room and old_room["is_public"]:
+            yield self.store.set_room_is_public(old_room_id, False)
+            yield self.store.set_room_is_public(room_id, True)
+
+        # Check if any groups we own contain the predecessor room
+        local_group_ids = yield self.store.get_local_groups_for_room(old_room_id)
+        for group_id in local_group_ids:
+            # Add new the new room to those groups
+            yield self.store.add_room_to_group(group_id, room_id, old_room["is_public"])
+
+            # Remove the old room from those groups
+            yield self.store.remove_room_from_group(group_id, old_room_id)
+
+    @defer.inlineCallbacks
+    def copy_user_state_on_room_upgrade(self, old_room_id, new_room_id, user_ids):
+        """Copy user-specific information when they join a new room when that new room is the
         result of a room upgrade
 
         Args:
-            new_room_id (str): The ID of the room the user is joining
-            user_id (str): The ID of the user
+            old_room_id (str): The ID of upgraded room
+            new_room_id (str): The ID of the new room
+            user_ids (Iterable[str]): User IDs to copy state for
 
         Returns:
             Deferred
         """
-        # Check if the new room is an upgraded room
-        predecessor = yield self.store.get_room_predecessor(new_room_id)
-        if not predecessor:
-            return
 
         logger.debug(
-            "Found predecessor for %s: %s. Copying over room tags and push " "rules",
+            "Copying over room tags and push rules from %s to %s for users %s",
+            old_room_id,
             new_room_id,
-            predecessor,
+            user_ids,
         )
 
-        # It is an upgraded room. Copy over old tags
-        yield self.copy_room_tags_and_direct_to_room(
-            predecessor["room_id"], new_room_id, user_id
-        )
-        # Copy over push rules
-        yield self.store.copy_push_rules_from_room_to_room_for_user(
-            predecessor["room_id"], new_room_id, user_id
-        )
+        for user_id in user_ids:
+            try:
+                # It is an upgraded room. Copy over old tags
+                yield self.copy_room_tags_and_direct_to_room(
+                    old_room_id, new_room_id, user_id
+                )
+                # Copy over push rules
+                yield self.store.copy_push_rules_from_room_to_room_for_user(
+                    old_room_id, new_room_id, user_id
+                )
+            except Exception:
+                logger.exception(
+                    "Error copying tags and/or push rules from rooms %s to %s for user %s. "
+                    "Skipping...",
+                    old_room_id,
+                    new_room_id,
+                    user_id,
+                )
+                continue
 
     @defer.inlineCallbacks
     def send_membership_event(self, requester, event, context, ratelimit=True):
@@ -759,22 +795,25 @@ class RoomMemberHandler(object):
         if room_avatar_event:
             room_avatar_url = room_avatar_event.content.get("url", "")
 
-        token, public_keys, fallback_public_key, display_name = (
-            yield self.identity_handler.ask_id_server_for_third_party_invite(
-                requester=requester,
-                id_server=id_server,
-                medium=medium,
-                address=address,
-                room_id=room_id,
-                inviter_user_id=user.to_string(),
-                room_alias=canonical_room_alias,
-                room_avatar_url=room_avatar_url,
-                room_join_rules=room_join_rules,
-                room_name=room_name,
-                inviter_display_name=inviter_display_name,
-                inviter_avatar_url=inviter_avatar_url,
-                id_access_token=id_access_token,
-            )
+        (
+            token,
+            public_keys,
+            fallback_public_key,
+            display_name,
+        ) = yield self.identity_handler.ask_id_server_for_third_party_invite(
+            requester=requester,
+            id_server=id_server,
+            medium=medium,
+            address=address,
+            room_id=room_id,
+            inviter_user_id=user.to_string(),
+            room_alias=canonical_room_alias,
+            room_avatar_url=room_avatar_url,
+            room_join_rules=room_join_rules,
+            room_name=room_name,
+            inviter_display_name=inviter_display_name,
+            inviter_avatar_url=inviter_avatar_url,
+            id_access_token=id_access_token,
         )
 
         yield self.event_creation_handler.create_and_send_nonmember_event(
diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
index cd5e90bacb..56ed262a1f 100644
--- a/synapse/handlers/search.py
+++ b/synapse/handlers/search.py
@@ -35,6 +35,8 @@ class SearchHandler(BaseHandler):
     def __init__(self, hs):
         super(SearchHandler, self).__init__(hs)
         self._event_serializer = hs.get_event_client_serializer()
+        self.storage = hs.get_storage()
+        self.state_store = self.storage.state
 
     @defer.inlineCallbacks
     def get_old_rooms_from_upgraded_room(self, room_id):
@@ -221,7 +223,7 @@ class SearchHandler(BaseHandler):
             filtered_events = search_filter.filter([r["event"] for r in results])
 
             events = yield filter_events_for_client(
-                self.store, user.to_string(), filtered_events
+                self.storage, user.to_string(), filtered_events
             )
 
             events.sort(key=lambda e: -rank_map[e.event_id])
@@ -271,7 +273,7 @@ class SearchHandler(BaseHandler):
                 filtered_events = search_filter.filter([r["event"] for r in results])
 
                 events = yield filter_events_for_client(
-                    self.store, user.to_string(), filtered_events
+                    self.storage, user.to_string(), filtered_events
                 )
 
                 room_events.extend(events)
@@ -340,11 +342,11 @@ class SearchHandler(BaseHandler):
                 )
 
                 res["events_before"] = yield filter_events_for_client(
-                    self.store, user.to_string(), res["events_before"]
+                    self.storage, user.to_string(), res["events_before"]
                 )
 
                 res["events_after"] = yield filter_events_for_client(
-                    self.store, user.to_string(), res["events_after"]
+                    self.storage, user.to_string(), res["events_after"]
                 )
 
                 res["start"] = now_token.copy_and_replace(
@@ -372,7 +374,7 @@ class SearchHandler(BaseHandler):
                         [(EventTypes.Member, sender) for sender in senders]
                     )
 
-                    state = yield self.store.get_state_for_event(
+                    state = yield self.state_store.get_state_for_event(
                         last_event_id, state_filter
                     )
 
@@ -394,15 +396,11 @@ class SearchHandler(BaseHandler):
         time_now = self.clock.time_msec()
 
         for context in contexts.values():
-            context["events_before"] = (
-                yield self._event_serializer.serialize_events(
-                    context["events_before"], time_now
-                )
+            context["events_before"] = yield self._event_serializer.serialize_events(
+                context["events_before"], time_now
             )
-            context["events_after"] = (
-                yield self._event_serializer.serialize_events(
-                    context["events_after"], time_now
-                )
+            context["events_after"] = yield self._event_serializer.serialize_events(
+                context["events_after"], time_now
             )
 
         state_results = {}
diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py
index 466daf9202..7f7d56390e 100644
--- a/synapse/handlers/stats.py
+++ b/synapse/handlers/stats.py
@@ -45,6 +45,8 @@ class StatsHandler(StateDeltasHandler):
         self.is_mine_id = hs.is_mine_id
         self.stats_bucket_size = hs.config.stats_bucket_size
 
+        self.stats_enabled = hs.config.stats_enabled
+
         # The current position in the current_state_delta stream
         self.pos = None
 
@@ -61,7 +63,7 @@ class StatsHandler(StateDeltasHandler):
     def notify_new_event(self):
         """Called when there may be more deltas to process
         """
-        if not self.hs.config.stats_enabled or self._is_processing:
+        if not self.stats_enabled or self._is_processing:
             return
 
         self._is_processing = True
@@ -106,7 +108,10 @@ class StatsHandler(StateDeltasHandler):
                 user_deltas = {}
 
             # Then count deltas for total_events and total_event_bytes.
-            room_count, user_count = yield self.store.get_changes_room_total_events_and_bytes(
+            (
+                room_count,
+                user_count,
+            ) = yield self.store.get_changes_room_total_events_and_bytes(
                 self.pos, max_pos
             )
 
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index d99160e9d7..b536d410e5 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -230,6 +230,8 @@ class SyncHandler(object):
         self.response_cache = ResponseCache(hs, "sync")
         self.state = hs.get_state_handler()
         self.auth = hs.get_auth()
+        self.storage = hs.get_storage()
+        self.state_store = self.storage.state
 
         # ExpiringCache((User, Device)) -> LruCache(state_key => event_id)
         self.lazy_loaded_members_cache = ExpiringCache(
@@ -417,7 +419,7 @@ class SyncHandler(object):
                     current_state_ids = frozenset(itervalues(current_state_ids))
 
                 recents = yield filter_events_for_client(
-                    self.store,
+                    self.storage,
                     sync_config.user.to_string(),
                     recents,
                     always_include_ids=current_state_ids,
@@ -470,7 +472,7 @@ class SyncHandler(object):
                     current_state_ids = frozenset(itervalues(current_state_ids))
 
                 loaded_recents = yield filter_events_for_client(
-                    self.store,
+                    self.storage,
                     sync_config.user.to_string(),
                     loaded_recents,
                     always_include_ids=current_state_ids,
@@ -509,7 +511,7 @@ class SyncHandler(object):
         Returns:
             A Deferred map from ((type, state_key)->Event)
         """
-        state_ids = yield self.store.get_state_ids_for_event(
+        state_ids = yield self.state_store.get_state_ids_for_event(
             event.event_id, state_filter=state_filter
         )
         if event.is_state():
@@ -580,7 +582,7 @@ class SyncHandler(object):
             return None
 
         last_event = last_events[-1]
-        state_ids = yield self.store.get_state_ids_for_event(
+        state_ids = yield self.state_store.get_state_ids_for_event(
             last_event.event_id,
             state_filter=StateFilter.from_types(
                 [(EventTypes.Name, ""), (EventTypes.CanonicalAlias, "")]
@@ -757,11 +759,11 @@ class SyncHandler(object):
 
             if full_state:
                 if batch:
-                    current_state_ids = yield self.store.get_state_ids_for_event(
+                    current_state_ids = yield self.state_store.get_state_ids_for_event(
                         batch.events[-1].event_id, state_filter=state_filter
                     )
 
-                    state_ids = yield self.store.get_state_ids_for_event(
+                    state_ids = yield self.state_store.get_state_ids_for_event(
                         batch.events[0].event_id, state_filter=state_filter
                     )
 
@@ -781,7 +783,7 @@ class SyncHandler(object):
                 )
             elif batch.limited:
                 if batch:
-                    state_at_timeline_start = yield self.store.get_state_ids_for_event(
+                    state_at_timeline_start = yield self.state_store.get_state_ids_for_event(
                         batch.events[0].event_id, state_filter=state_filter
                     )
                 else:
@@ -810,7 +812,7 @@ class SyncHandler(object):
                 )
 
                 if batch:
-                    current_state_ids = yield self.store.get_state_ids_for_event(
+                    current_state_ids = yield self.state_store.get_state_ids_for_event(
                         batch.events[-1].event_id, state_filter=state_filter
                     )
                 else:
@@ -841,7 +843,7 @@ class SyncHandler(object):
                         # So we fish out all the member events corresponding to the
                         # timeline here, and then dedupe any redundant ones below.
 
-                        state_ids = yield self.store.get_state_ids_for_event(
+                        state_ids = yield self.state_store.get_state_ids_for_event(
                             batch.events[0].event_id,
                             # we only want members!
                             state_filter=StateFilter.from_types(
@@ -1204,10 +1206,11 @@ class SyncHandler(object):
         since_token = sync_result_builder.since_token
 
         if since_token and not sync_result_builder.full_state:
-            account_data, account_data_by_room = (
-                yield self.store.get_updated_account_data_for_user(
-                    user_id, since_token.account_data_key
-                )
+            (
+                account_data,
+                account_data_by_room,
+            ) = yield self.store.get_updated_account_data_for_user(
+                user_id, since_token.account_data_key
             )
 
             push_rules_changed = yield self.store.have_push_rules_changed_for_user(
@@ -1219,9 +1222,10 @@ class SyncHandler(object):
                     sync_config.user
                 )
         else:
-            account_data, account_data_by_room = (
-                yield self.store.get_account_data_for_user(sync_config.user.to_string())
-            )
+            (
+                account_data,
+                account_data_by_room,
+            ) = yield self.store.get_account_data_for_user(sync_config.user.to_string())
 
             account_data["m.push_rules"] = yield self.push_rules_for_user(
                 sync_config.user
diff --git a/synapse/handlers/ui_auth/checkers.py b/synapse/handlers/ui_auth/checkers.py
index 29aa1e5aaf..8363d887a9 100644
--- a/synapse/handlers/ui_auth/checkers.py
+++ b/synapse/handlers/ui_auth/checkers.py
@@ -81,7 +81,7 @@ class RecaptchaAuthChecker(UserInteractiveAuthChecker):
     def __init__(self, hs):
         super().__init__(hs)
         self._enabled = bool(hs.config.recaptcha_private_key)
-        self._http_client = hs.get_simple_http_client()
+        self._http_client = hs.get_proxied_http_client()
         self._url = hs.config.recaptcha_siteverify_api
         self._secret = hs.config.recaptcha_private_key