summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/__init__.py2
-rw-r--r--synapse/handlers/_base.py2
-rw-r--r--synapse/handlers/acme.py151
-rw-r--r--synapse/handlers/auth.py13
-rw-r--r--synapse/handlers/device.py33
-rw-r--r--synapse/handlers/directory.py33
-rw-r--r--synapse/handlers/e2e_room_keys.py74
-rw-r--r--synapse/handlers/federation.py160
-rw-r--r--synapse/handlers/groups_local.py12
-rw-r--r--synapse/handlers/identity.py9
-rw-r--r--synapse/handlers/message.py61
-rw-r--r--synapse/handlers/pagination.py21
-rw-r--r--synapse/handlers/register.py379
-rw-r--r--synapse/handlers/room.py46
-rw-r--r--synapse/handlers/room_list.py14
-rw-r--r--synapse/handlers/room_member.py90
-rw-r--r--synapse/handlers/search.py47
-rw-r--r--synapse/handlers/sync.py37
-rw-r--r--synapse/handlers/user_directory.py137
19 files changed, 1045 insertions, 276 deletions
diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py
index 413425fed1..2dd183018a 100644
--- a/synapse/handlers/__init__.py
+++ b/synapse/handlers/__init__.py
@@ -17,7 +17,6 @@ from .admin import AdminHandler
 from .directory import DirectoryHandler
 from .federation import FederationHandler
 from .identity import IdentityHandler
-from .register import RegistrationHandler
 from .search import SearchHandler
 
 
@@ -41,7 +40,6 @@ class Handlers(object):
     """
 
     def __init__(self, hs):
-        self.registration_handler = RegistrationHandler(hs)
         self.federation_handler = FederationHandler(hs)
         self.directory_handler = DirectoryHandler(hs)
         self.admin_handler = AdminHandler(hs)
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index 704181d2d3..594754cfd8 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -167,4 +167,4 @@ class BaseHandler(object):
                     ratelimit=False,
                 )
             except Exception as e:
-                logger.warn("Error kicking guest user: %s" % (e,))
+                logger.exception("Error kicking guest user: %s" % (e,))
diff --git a/synapse/handlers/acme.py b/synapse/handlers/acme.py
new file mode 100644
index 0000000000..813777bf18
--- /dev/null
+++ b/synapse/handlers/acme.py
@@ -0,0 +1,151 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+import attr
+from zope.interface import implementer
+
+import twisted
+import twisted.internet.error
+from twisted.internet import defer
+from twisted.python.filepath import FilePath
+from twisted.python.url import URL
+from twisted.web import server, static
+from twisted.web.resource import Resource
+
+from synapse.app import check_bind_error
+
+logger = logging.getLogger(__name__)
+
+try:
+    from txacme.interfaces import ICertificateStore
+
+    @attr.s
+    @implementer(ICertificateStore)
+    class ErsatzStore(object):
+        """
+        A store that only stores in memory.
+        """
+
+        certs = attr.ib(default=attr.Factory(dict))
+
+        def store(self, server_name, pem_objects):
+            self.certs[server_name] = [o.as_bytes() for o in pem_objects]
+            return defer.succeed(None)
+
+
+except ImportError:
+    # txacme is missing
+    pass
+
+
+class AcmeHandler(object):
+    def __init__(self, hs):
+        self.hs = hs
+        self.reactor = hs.get_reactor()
+        self._acme_domain = hs.config.acme_domain
+
+    @defer.inlineCallbacks
+    def start_listening(self):
+
+        # Configure logging for txacme, if you need to debug
+        # from eliot import add_destinations
+        # from eliot.twisted import TwistedDestination
+        #
+        # add_destinations(TwistedDestination())
+
+        from txacme.challenges import HTTP01Responder
+        from txacme.service import AcmeIssuingService
+        from txacme.endpoint import load_or_create_client_key
+        from txacme.client import Client
+        from josepy.jwa import RS256
+
+        self._store = ErsatzStore()
+        responder = HTTP01Responder()
+
+        self._issuer = AcmeIssuingService(
+            cert_store=self._store,
+            client_creator=(
+                lambda: Client.from_url(
+                    reactor=self.reactor,
+                    url=URL.from_text(self.hs.config.acme_url),
+                    key=load_or_create_client_key(
+                        FilePath(self.hs.config.config_dir_path)
+                    ),
+                    alg=RS256,
+                )
+            ),
+            clock=self.reactor,
+            responders=[responder],
+        )
+
+        well_known = Resource()
+        well_known.putChild(b'acme-challenge', responder.resource)
+        responder_resource = Resource()
+        responder_resource.putChild(b'.well-known', well_known)
+        responder_resource.putChild(b'check', static.Data(b'OK', b'text/plain'))
+
+        srv = server.Site(responder_resource)
+
+        bind_addresses = self.hs.config.acme_bind_addresses
+        for host in bind_addresses:
+            logger.info(
+                "Listening for ACME requests on %s:%i", host, self.hs.config.acme_port,
+            )
+            try:
+                self.reactor.listenTCP(
+                    self.hs.config.acme_port,
+                    srv,
+                    interface=host,
+                )
+            except twisted.internet.error.CannotListenError as e:
+                check_bind_error(e, host, bind_addresses)
+
+        # Make sure we are registered to the ACME server. There's no public API
+        # for this, it is usually triggered by startService, but since we don't
+        # want it to control where we save the certificates, we have to reach in
+        # and trigger the registration machinery ourselves.
+        self._issuer._registered = False
+        yield self._issuer._ensure_registered()
+
+    @defer.inlineCallbacks
+    def provision_certificate(self):
+
+        logger.warning("Reprovisioning %s", self._acme_domain)
+
+        try:
+            yield self._issuer.issue_cert(self._acme_domain)
+        except Exception:
+            logger.exception("Fail!")
+            raise
+        logger.warning("Reprovisioned %s, saving.", self._acme_domain)
+        cert_chain = self._store.certs[self._acme_domain]
+
+        try:
+            with open(self.hs.config.tls_private_key_file, "wb") as private_key_file:
+                for x in cert_chain:
+                    if x.startswith(b"-----BEGIN RSA PRIVATE KEY-----"):
+                        private_key_file.write(x)
+
+            with open(self.hs.config.tls_certificate_file, "wb") as certificate_file:
+                for x in cert_chain:
+                    if x.startswith(b"-----BEGIN CERTIFICATE-----"):
+                        certificate_file.write(x)
+        except Exception:
+            logger.exception("Failed saving!")
+            raise
+
+        defer.returnValue(True)
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index c6e89db4bc..2abd9af94f 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -563,10 +563,10 @@ class AuthHandler(BaseHandler):
         insensitively, but return None if there are multiple inexact matches.
 
         Args:
-            (str) user_id: complete @user:id
+            (unicode|bytes) user_id: complete @user:id
 
         Returns:
-            defer.Deferred: (str) canonical_user_id, or None if zero or
+            defer.Deferred: (unicode) canonical_user_id, or None if zero or
             multiple matches
         """
         res = yield self._find_user_id_and_pwd_hash(user_id)
@@ -954,6 +954,15 @@ class MacaroonGenerator(object):
         return macaroon.serialize()
 
     def generate_short_term_login_token(self, user_id, duration_in_ms=(2 * 60 * 1000)):
+        """
+
+        Args:
+            user_id (unicode):
+            duration_in_ms (int):
+
+        Returns:
+            unicode
+        """
         macaroon = self._generate_base_macaroon(user_id)
         macaroon.add_first_party_caveat("type = login")
         now = self.hs.get_clock().time_msec()
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 9e017116a9..c708c35d4d 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -20,7 +20,11 @@ from twisted.internet import defer
 
 from synapse.api import errors
 from synapse.api.constants import EventTypes
-from synapse.api.errors import FederationDeniedError
+from synapse.api.errors import (
+    FederationDeniedError,
+    HttpResponseException,
+    RequestSendFailed,
+)
 from synapse.types import RoomStreamToken, get_domain_from_id
 from synapse.util import stringutils
 from synapse.util.async_helpers import Linearizer
@@ -504,13 +508,13 @@ class DeviceListEduUpdater(object):
                 origin = get_domain_from_id(user_id)
                 try:
                     result = yield self.federation.query_user_devices(origin, user_id)
-                except NotRetryingDestination:
+                except (
+                    NotRetryingDestination, RequestSendFailed, HttpResponseException,
+                ):
                     # TODO: Remember that we are now out of sync and try again
                     # later
                     logger.warn(
-                        "Failed to handle device list update for %s,"
-                        " we're not retrying the remote",
-                        user_id,
+                        "Failed to handle device list update for %s", user_id,
                     )
                     # We abort on exceptions rather than accepting the update
                     # as otherwise synapse will 'forget' that its device list
@@ -532,6 +536,25 @@ class DeviceListEduUpdater(object):
 
                 stream_id = result["stream_id"]
                 devices = result["devices"]
+
+                # If the remote server has more than ~1000 devices for this user
+                # we assume that something is going horribly wrong (e.g. a bot
+                # that logs in and creates a new device every time it tries to
+                # send a message).  Maintaining lots of devices per user in the
+                # cache can cause serious performance issues as if this request
+                # takes more than 60s to complete, internal replication from the
+                # inbound federation worker to the synapse master may time out
+                # causing the inbound federation to fail and causing the remote
+                # server to retry, causing a DoS.  So in this scenario we give
+                # up on storing the total list of devices and only handle the
+                # delta instead.
+                if len(devices) > 1000:
+                    logger.warn(
+                        "Ignoring device list snapshot for %s as it has >1K devs (%d)",
+                        user_id, len(devices)
+                    )
+                    devices = []
+
                 yield self.store.update_remote_device_list_cache(
                     user_id, devices, stream_id,
                 )
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 0699731c13..8b113307d2 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -57,8 +57,8 @@ class DirectoryHandler(BaseHandler):
         # general association creation for both human users and app services
 
         for wchar in string.whitespace:
-                if wchar in room_alias.localpart:
-                    raise SynapseError(400, "Invalid characters in room alias")
+            if wchar in room_alias.localpart:
+                raise SynapseError(400, "Invalid characters in room alias")
 
         if not self.hs.is_mine(room_alias):
             raise SynapseError(400, "Room alias must be local")
@@ -112,7 +112,9 @@ class DirectoryHandler(BaseHandler):
                     403, "This user is not permitted to create this alias",
                 )
 
-            if not self.config.is_alias_creation_allowed(user_id, room_alias.to_string()):
+            if not self.config.is_alias_creation_allowed(
+                user_id, room_id, room_alias.to_string(),
+            ):
                 # Lets just return a generic message, as there may be all sorts of
                 # reasons why we said no. TODO: Allow configurable error messages
                 # per alias creation rule?
@@ -395,9 +397,9 @@ class DirectoryHandler(BaseHandler):
         room_id (str)
         visibility (str): "public" or "private"
         """
-        if not self.spam_checker.user_may_publish_room(
-            requester.user.to_string(), room_id
-        ):
+        user_id = requester.user.to_string()
+
+        if not self.spam_checker.user_may_publish_room(user_id, room_id):
             raise AuthError(
                 403,
                 "This user is not permitted to publish rooms to the room list"
@@ -415,7 +417,24 @@ class DirectoryHandler(BaseHandler):
 
         yield self.auth.check_can_change_room_list(room_id, requester.user)
 
-        yield self.store.set_room_is_public(room_id, visibility == "public")
+        making_public = visibility == "public"
+        if making_public:
+            room_aliases = yield self.store.get_aliases_for_room(room_id)
+            canonical_alias = yield self.store.get_canonical_alias_for_room(room_id)
+            if canonical_alias:
+                room_aliases.append(canonical_alias)
+
+            if not self.config.is_publishing_room_allowed(
+                user_id, room_id, room_aliases,
+            ):
+                # Lets just return a generic message, as there may be all sorts of
+                # reasons why we said no. TODO: Allow configurable error messages
+                # per alias creation rule?
+                raise SynapseError(
+                    403, "Not allowed to publish room",
+                )
+
+        yield self.store.set_room_is_public(room_id, making_public)
 
     @defer.inlineCallbacks
     def edit_published_appservice_room_list(self, appservice_id, network_id,
diff --git a/synapse/handlers/e2e_room_keys.py b/synapse/handlers/e2e_room_keys.py
index 42b040375f..7bc174070e 100644
--- a/synapse/handlers/e2e_room_keys.py
+++ b/synapse/handlers/e2e_room_keys.py
@@ -19,7 +19,13 @@ from six import iteritems
 
 from twisted.internet import defer
 
-from synapse.api.errors import NotFoundError, RoomKeysVersionError, StoreError
+from synapse.api.errors import (
+    Codes,
+    NotFoundError,
+    RoomKeysVersionError,
+    StoreError,
+    SynapseError,
+)
 from synapse.util.async_helpers import Linearizer
 
 logger = logging.getLogger(__name__)
@@ -267,7 +273,7 @@ class E2eRoomKeysHandler(object):
             version(str): Optional; if None gives the most recent version
                 otherwise a historical one.
         Raises:
-            StoreError: code 404 if the requested backup version doesn't exist
+            NotFoundError: if the requested backup version doesn't exist
         Returns:
             A deferred of a info dict that gives the info about the new version.
 
@@ -279,7 +285,13 @@ class E2eRoomKeysHandler(object):
         """
 
         with (yield self._upload_linearizer.queue(user_id)):
-            res = yield self.store.get_e2e_room_keys_version_info(user_id, version)
+            try:
+                res = yield self.store.get_e2e_room_keys_version_info(user_id, version)
+            except StoreError as e:
+                if e.code == 404:
+                    raise NotFoundError("Unknown backup version")
+                else:
+                    raise
             defer.returnValue(res)
 
     @defer.inlineCallbacks
@@ -290,8 +302,60 @@ class E2eRoomKeysHandler(object):
             user_id(str): the user whose current backup version we're deleting
             version(str): the version id of the backup being deleted
         Raises:
-            StoreError: code 404 if this backup version doesn't exist
+            NotFoundError: if this backup version doesn't exist
         """
 
         with (yield self._upload_linearizer.queue(user_id)):
-            yield self.store.delete_e2e_room_keys_version(user_id, version)
+            try:
+                yield self.store.delete_e2e_room_keys_version(user_id, version)
+            except StoreError as e:
+                if e.code == 404:
+                    raise NotFoundError("Unknown backup version")
+                else:
+                    raise
+
+    @defer.inlineCallbacks
+    def update_version(self, user_id, version, version_info):
+        """Update the info about a given version of the user's backup
+
+        Args:
+            user_id(str): the user whose current backup version we're updating
+            version(str): the backup version we're updating
+            version_info(dict): the new information about the backup
+        Raises:
+            NotFoundError: if the requested backup version doesn't exist
+        Returns:
+            A deferred of an empty dict.
+        """
+        if "version" not in version_info:
+            raise SynapseError(
+                400,
+                "Missing version in body",
+                Codes.MISSING_PARAM
+            )
+        if version_info["version"] != version:
+            raise SynapseError(
+                400,
+                "Version in body does not match",
+                Codes.INVALID_PARAM
+            )
+        with (yield self._upload_linearizer.queue(user_id)):
+            try:
+                old_info = yield self.store.get_e2e_room_keys_version_info(
+                    user_id, version
+                )
+            except StoreError as e:
+                if e.code == 404:
+                    raise NotFoundError("Unknown backup version")
+                else:
+                    raise
+            if old_info["algorithm"] != version_info["algorithm"]:
+                raise SynapseError(
+                    400,
+                    "Algorithm does not match",
+                    Codes.INVALID_PARAM
+                )
+
+            yield self.store.update_e2e_room_keys_version(user_id, version, version_info)
+
+            defer.returnValue({})
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index a3bb864bb2..083f2e0ac3 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -34,6 +34,7 @@ from synapse.api.constants import (
     EventTypes,
     Membership,
     RejectedReason,
+    RoomVersions,
 )
 from synapse.api.errors import (
     AuthError,
@@ -43,10 +44,7 @@ from synapse.api.errors import (
     StoreError,
     SynapseError,
 )
-from synapse.crypto.event_signing import (
-    add_hashes_and_signatures,
-    compute_event_signature,
-)
+from synapse.crypto.event_signing import compute_event_signature
 from synapse.events.validator import EventValidator
 from synapse.replication.http.federation import (
     ReplicationCleanRoomRestServlet,
@@ -58,7 +56,6 @@ from synapse.types import UserID, get_domain_from_id
 from synapse.util import logcontext, unwrapFirstError
 from synapse.util.async_helpers import Linearizer
 from synapse.util.distributor import user_joined_room
-from synapse.util.frozenutils import unfreeze
 from synapse.util.logutils import log_function
 from synapse.util.retryutils import NotRetryingDestination
 from synapse.visibility import filter_events_for_server
@@ -105,7 +102,7 @@ class FederationHandler(BaseHandler):
 
         self.hs = hs
 
-        self.store = hs.get_datastore()  # type: synapse.storage.DataStore
+        self.store = hs.get_datastore()
         self.federation_client = hs.get_federation_client()
         self.state_handler = hs.get_state_handler()
         self.server_name = hs.hostname
@@ -342,6 +339,8 @@ class FederationHandler(BaseHandler):
                             room_id, event_id, p,
                         )
 
+                        room_version = yield self.store.get_room_version(room_id)
+
                         with logcontext.nested_logging_context(p):
                             # note that if any of the missing prevs share missing state or
                             # auth events, the requests to fetch those events are deduped
@@ -355,7 +354,7 @@ class FederationHandler(BaseHandler):
                             # we want the state *after* p; get_state_for_room returns the
                             # state *before* p.
                             remote_event = yield self.federation_client.get_pdu(
-                                [origin], p, outlier=True,
+                                [origin], p, room_version, outlier=True,
                             )
 
                             if remote_event is None:
@@ -379,7 +378,6 @@ class FederationHandler(BaseHandler):
                             for x in remote_state:
                                 event_map[x.event_id] = x
 
-                    room_version = yield self.store.get_room_version(room_id)
                     state_map = yield resolve_events_with_store(
                         room_version, state_maps, event_map,
                         state_res_store=StateResolutionStore(self.store),
@@ -655,6 +653,8 @@ class FederationHandler(BaseHandler):
         if dest == self.server_name:
             raise SynapseError(400, "Can't backfill from self.")
 
+        room_version = yield self.store.get_room_version(room_id)
+
         events = yield self.federation_client.backfill(
             dest,
             room_id,
@@ -748,6 +748,7 @@ class FederationHandler(BaseHandler):
                             self.federation_client.get_pdu,
                             [dest],
                             event_id,
+                            room_version=room_version,
                             outlier=True,
                             timeout=10000,
                         )
@@ -1060,7 +1061,7 @@ class FederationHandler(BaseHandler):
         """
         logger.debug("Joining %s to %s", joinee, room_id)
 
-        origin, event = yield self._make_and_verify_event(
+        origin, event, event_format_version = yield self._make_and_verify_event(
             target_hosts,
             room_id,
             joinee,
@@ -1083,7 +1084,6 @@ class FederationHandler(BaseHandler):
         handled_events = set()
 
         try:
-            event = self._sign_event(event)
             # Try the host we successfully got a response to /make_join/
             # request first.
             try:
@@ -1091,7 +1091,9 @@ class FederationHandler(BaseHandler):
                 target_hosts.insert(0, origin)
             except ValueError:
                 pass
-            ret = yield self.federation_client.send_join(target_hosts, event)
+            ret = yield self.federation_client.send_join(
+                target_hosts, event, event_format_version,
+            )
 
             origin = ret["origin"]
             state = ret["state"]
@@ -1164,13 +1166,18 @@ class FederationHandler(BaseHandler):
         """
         event_content = {"membership": Membership.JOIN}
 
-        builder = self.event_builder_factory.new({
-            "type": EventTypes.Member,
-            "content": event_content,
-            "room_id": room_id,
-            "sender": user_id,
-            "state_key": user_id,
-        })
+        room_version = yield self.store.get_room_version(room_id)
+
+        builder = self.event_builder_factory.new(
+            room_version,
+            {
+                "type": EventTypes.Member,
+                "content": event_content,
+                "room_id": room_id,
+                "sender": user_id,
+                "state_key": user_id,
+            }
+        )
 
         try:
             event, context = yield self.event_creation_handler.create_new_client_event(
@@ -1182,7 +1189,9 @@ class FederationHandler(BaseHandler):
 
         # The remote hasn't signed it yet, obviously. We'll do the full checks
         # when we get the event back in `on_send_join_request`
-        yield self.auth.check_from_context(event, context, do_sig_check=False)
+        yield self.auth.check_from_context(
+            room_version, event, context, do_sig_check=False,
+        )
 
         defer.returnValue(event)
 
@@ -1287,11 +1296,11 @@ class FederationHandler(BaseHandler):
             )
 
         event.internal_metadata.outlier = True
-        event.internal_metadata.invite_from_remote = True
+        event.internal_metadata.out_of_band_membership = True
 
         event.signatures.update(
             compute_event_signature(
-                event,
+                event.get_pdu_json(),
                 self.hs.hostname,
                 self.hs.config.signing_key[0]
             )
@@ -1304,7 +1313,7 @@ class FederationHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def do_remotely_reject_invite(self, target_hosts, room_id, user_id):
-        origin, event = yield self._make_and_verify_event(
+        origin, event, event_format_version = yield self._make_and_verify_event(
             target_hosts,
             room_id,
             user_id,
@@ -1313,7 +1322,7 @@ class FederationHandler(BaseHandler):
         # Mark as outlier as we don't have any state for this event; we're not
         # even in the room.
         event.internal_metadata.outlier = True
-        event = self._sign_event(event)
+        event.internal_metadata.out_of_band_membership = True
 
         # Try the host that we succesfully called /make_leave/ on first for
         # the /send_leave/ request.
@@ -1336,7 +1345,7 @@ class FederationHandler(BaseHandler):
     @defer.inlineCallbacks
     def _make_and_verify_event(self, target_hosts, room_id, user_id, membership,
                                content={}, params=None):
-        origin, pdu = yield self.federation_client.make_membership_event(
+        origin, event, format_ver = yield self.federation_client.make_membership_event(
             target_hosts,
             room_id,
             user_id,
@@ -1345,9 +1354,7 @@ class FederationHandler(BaseHandler):
             params=params,
         )
 
-        logger.debug("Got response to make_%s: %s", membership, pdu)
-
-        event = pdu
+        logger.debug("Got response to make_%s: %s", membership, event)
 
         # We should assert some things.
         # FIXME: Do this in a nicer way
@@ -1355,28 +1362,7 @@ class FederationHandler(BaseHandler):
         assert(event.user_id == user_id)
         assert(event.state_key == user_id)
         assert(event.room_id == room_id)
-        defer.returnValue((origin, event))
-
-    def _sign_event(self, event):
-        event.internal_metadata.outlier = False
-
-        builder = self.event_builder_factory.new(
-            unfreeze(event.get_pdu_json())
-        )
-
-        builder.event_id = self.event_builder_factory.create_event_id()
-        builder.origin = self.hs.hostname
-
-        if not hasattr(event, "signatures"):
-            builder.signatures = {}
-
-        add_hashes_and_signatures(
-            builder,
-            self.hs.hostname,
-            self.hs.config.signing_key[0],
-        )
-
-        return builder.build()
+        defer.returnValue((origin, event, format_ver))
 
     @defer.inlineCallbacks
     @log_function
@@ -1385,13 +1371,17 @@ class FederationHandler(BaseHandler):
         leave event for the room and return that. We do *not* persist or
         process it until the other server has signed it and sent it back.
         """
-        builder = self.event_builder_factory.new({
-            "type": EventTypes.Member,
-            "content": {"membership": Membership.LEAVE},
-            "room_id": room_id,
-            "sender": user_id,
-            "state_key": user_id,
-        })
+        room_version = yield self.store.get_room_version(room_id)
+        builder = self.event_builder_factory.new(
+            room_version,
+            {
+                "type": EventTypes.Member,
+                "content": {"membership": Membership.LEAVE},
+                "room_id": room_id,
+                "sender": user_id,
+                "state_key": user_id,
+            }
+        )
 
         event, context = yield self.event_creation_handler.create_new_client_event(
             builder=builder,
@@ -1400,7 +1390,9 @@ class FederationHandler(BaseHandler):
         try:
             # The remote hasn't signed it yet, obviously. We'll do the full checks
             # when we get the event back in `on_send_leave_request`
-            yield self.auth.check_from_context(event, context, do_sig_check=False)
+            yield self.auth.check_from_context(
+                room_version, event, context, do_sig_check=False,
+            )
         except AuthError as e:
             logger.warn("Failed to create new leave %r because %s", event, e)
             raise e
@@ -1659,6 +1651,13 @@ class FederationHandler(BaseHandler):
                 create_event = e
                 break
 
+        if create_event is None:
+            # If the state doesn't have a create event then the room is
+            # invalid, and it would fail auth checks anyway.
+            raise SynapseError(400, "No create event in state")
+
+        room_version = create_event.content.get("room_version", RoomVersions.V1)
+
         missing_auth_events = set()
         for e in itertools.chain(auth_events, state, [event]):
             for e_id in e.auth_event_ids():
@@ -1669,6 +1668,7 @@ class FederationHandler(BaseHandler):
             m_ev = yield self.federation_client.get_pdu(
                 [origin],
                 e_id,
+                room_version=room_version,
                 outlier=True,
                 timeout=10000,
             )
@@ -1687,7 +1687,7 @@ class FederationHandler(BaseHandler):
                 auth_for_e[(EventTypes.Create, "")] = create_event
 
             try:
-                self.auth.check(e, auth_events=auth_for_e)
+                self.auth.check(room_version, e, auth_events=auth_for_e)
             except SynapseError as err:
                 # we may get SynapseErrors here as well as AuthErrors. For
                 # instance, there are a couple of (ancient) events in some
@@ -1931,6 +1931,8 @@ class FederationHandler(BaseHandler):
         current_state = set(e.event_id for e in auth_events.values())
         different_auth = event_auth_events - current_state
 
+        room_version = yield self.store.get_room_version(event.room_id)
+
         if different_auth and not event.internal_metadata.is_outlier():
             # Do auth conflict res.
             logger.info("Different auth: %s", different_auth)
@@ -1955,8 +1957,6 @@ class FederationHandler(BaseHandler):
                     (d.type, d.state_key): d for d in different_events if d
                 })
 
-                room_version = yield self.store.get_room_version(event.room_id)
-
                 new_state = yield self.state_handler.resolve_events(
                     room_version,
                     [list(local_view.values()), list(remote_view.values())],
@@ -2056,7 +2056,7 @@ class FederationHandler(BaseHandler):
                 )
 
         try:
-            self.auth.check(event, auth_events=auth_events)
+            self.auth.check(room_version, event, auth_events=auth_events)
         except AuthError as e:
             logger.warn("Failed auth resolution for %r because %s", event, e)
             raise e
@@ -2279,18 +2279,26 @@ class FederationHandler(BaseHandler):
         }
 
         if (yield self.auth.check_host_in_room(room_id, self.hs.hostname)):
-            builder = self.event_builder_factory.new(event_dict)
-            EventValidator().validate_new(builder)
+            room_version = yield self.store.get_room_version(room_id)
+            builder = self.event_builder_factory.new(room_version, event_dict)
+
+            EventValidator().validate_builder(builder)
             event, context = yield self.event_creation_handler.create_new_client_event(
                 builder=builder
             )
 
             event, context = yield self.add_display_name_to_third_party_invite(
-                event_dict, event, context
+                room_version, event_dict, event, context
             )
 
+            EventValidator().validate_new(event)
+
+            # We need to tell the transaction queue to send this out, even
+            # though the sender isn't a local user.
+            event.internal_metadata.send_on_behalf_of = self.hs.hostname
+
             try:
-                yield self.auth.check_from_context(event, context)
+                yield self.auth.check_from_context(room_version, event, context)
             except AuthError as e:
                 logger.warn("Denying new third party invite %r because %s", event, e)
                 raise e
@@ -2317,23 +2325,31 @@ class FederationHandler(BaseHandler):
         Returns:
             Deferred: resolves (to None)
         """
-        builder = self.event_builder_factory.new(event_dict)
+        room_version = yield self.store.get_room_version(room_id)
+
+        # NB: event_dict has a particular specced format we might need to fudge
+        # if we change event formats too much.
+        builder = self.event_builder_factory.new(room_version, event_dict)
 
         event, context = yield self.event_creation_handler.create_new_client_event(
             builder=builder,
         )
 
         event, context = yield self.add_display_name_to_third_party_invite(
-            event_dict, event, context
+            room_version, event_dict, event, context
         )
 
         try:
-            self.auth.check_from_context(event, context)
+            self.auth.check_from_context(room_version, event, context)
         except AuthError as e:
             logger.warn("Denying third party invite %r because %s", event, e)
             raise e
         yield self._check_signature(event, context)
 
+        # We need to tell the transaction queue to send this out, even
+        # though the sender isn't a local user.
+        event.internal_metadata.send_on_behalf_of = get_domain_from_id(event.sender)
+
         # XXX we send the invite here, but send_membership_event also sends it,
         # so we end up making two requests. I think this is redundant.
         returned_invite = yield self.send_invite(origin, event)
@@ -2344,7 +2360,8 @@ class FederationHandler(BaseHandler):
         yield member_handler.send_membership_event(None, event, context)
 
     @defer.inlineCallbacks
-    def add_display_name_to_third_party_invite(self, event_dict, event, context):
+    def add_display_name_to_third_party_invite(self, room_version, event_dict,
+                                               event, context):
         key = (
             EventTypes.ThirdPartyInvite,
             event.content["third_party_invite"]["signed"]["token"]
@@ -2368,11 +2385,12 @@ class FederationHandler(BaseHandler):
             # auth checks. If we need the invite and don't have it then the
             # auth check code will explode appropriately.
 
-        builder = self.event_builder_factory.new(event_dict)
-        EventValidator().validate_new(builder)
+        builder = self.event_builder_factory.new(room_version, event_dict)
+        EventValidator().validate_builder(builder)
         event, context = yield self.event_creation_handler.create_new_client_event(
             builder=builder,
         )
+        EventValidator().validate_new(event)
         defer.returnValue((event, context))
 
     @defer.inlineCallbacks
diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py
index 173315af6c..02c508acec 100644
--- a/synapse/handlers/groups_local.py
+++ b/synapse/handlers/groups_local.py
@@ -20,7 +20,7 @@ from six import iteritems
 
 from twisted.internet import defer
 
-from synapse.api.errors import HttpResponseException, SynapseError
+from synapse.api.errors import HttpResponseException, RequestSendFailed, SynapseError
 from synapse.types import get_domain_from_id
 
 logger = logging.getLogger(__name__)
@@ -46,13 +46,19 @@ def _create_rerouter(func_name):
             # when the remote end responds with things like 403 Not
             # In Group, we can communicate that to the client instead
             # of a 500.
-            def h(failure):
+            def http_response_errback(failure):
                 failure.trap(HttpResponseException)
                 e = failure.value
                 if e.code == 403:
                     raise e.to_synapse_error()
                 return failure
-            d.addErrback(h)
+
+            def request_failed_errback(failure):
+                failure.trap(RequestSendFailed)
+                raise SynapseError(502, "Failed to contact group server")
+
+            d.addErrback(http_response_errback)
+            d.addErrback(request_failed_errback)
             return d
     return f
 
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index 5feb3f22a6..39184f0e22 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -167,18 +167,21 @@ class IdentityHandler(BaseHandler):
             "mxid": mxid,
             "threepid": threepid,
         }
-        headers = {}
+
         # we abuse the federation http client to sign the request, but we have to send it
         # using the normal http client since we don't want the SRV lookup and want normal
         # 'browser-like' HTTPS.
-        self.federation_http_client.sign_request(
+        auth_headers = self.federation_http_client.build_auth_headers(
             destination=None,
             method='POST',
             url_bytes='/_matrix/identity/api/v1/3pid/unbind'.encode('ascii'),
-            headers_dict=headers,
             content=content,
             destination_is=id_server,
         )
+        headers = {
+            b"Authorization": auth_headers,
+        }
+
         try:
             yield self.http_client.post_json_get_json(
                 url,
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index a7cd779b02..3981fe69ce 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -22,7 +22,7 @@ from canonicaljson import encode_canonical_json, json
 from twisted.internet import defer
 from twisted.internet.defer import succeed
 
-from synapse.api.constants import MAX_DEPTH, EventTypes, Membership
+from synapse.api.constants import EventTypes, Membership, RoomVersions
 from synapse.api.errors import (
     AuthError,
     Codes,
@@ -31,7 +31,6 @@ from synapse.api.errors import (
     SynapseError,
 )
 from synapse.api.urls import ConsentURIBuilder
-from synapse.crypto.event_signing import add_hashes_and_signatures
 from synapse.events.utils import serialize_event
 from synapse.events.validator import EventValidator
 from synapse.replication.http.send_event import ReplicationSendEventRestServlet
@@ -278,9 +277,17 @@ class EventCreationHandler(object):
         """
         yield self.auth.check_auth_blocking(requester.user.to_string())
 
-        builder = self.event_builder_factory.new(event_dict)
+        if event_dict["type"] == EventTypes.Create and event_dict["state_key"] == "":
+            room_version = event_dict["content"]["room_version"]
+        else:
+            try:
+                room_version = yield self.store.get_room_version(event_dict["room_id"])
+            except NotFoundError:
+                raise AuthError(403, "Unknown room")
 
-        self.validator.validate_new(builder)
+        builder = self.event_builder_factory.new(room_version, event_dict)
+
+        self.validator.validate_builder(builder)
 
         if builder.type == EventTypes.Member:
             membership = builder.content.get("membership", None)
@@ -318,6 +325,8 @@ class EventCreationHandler(object):
             prev_events_and_hashes=prev_events_and_hashes,
         )
 
+        self.validator.validate_new(event)
+
         defer.returnValue((event, context))
 
     def _is_exempt_from_privacy_policy(self, builder, requester):
@@ -535,40 +544,19 @@ class EventCreationHandler(object):
             prev_events_and_hashes = \
                 yield self.store.get_prev_events_for_room(builder.room_id)
 
-        if prev_events_and_hashes:
-            depth = max([d for _, _, d in prev_events_and_hashes]) + 1
-            # we cap depth of generated events, to ensure that they are not
-            # rejected by other servers (and so that they can be persisted in
-            # the db)
-            depth = min(depth, MAX_DEPTH)
-        else:
-            depth = 1
-
         prev_events = [
             (event_id, prev_hashes)
             for event_id, prev_hashes, _ in prev_events_and_hashes
         ]
 
-        builder.prev_events = prev_events
-        builder.depth = depth
-
-        context = yield self.state.compute_event_context(builder)
+        event = yield builder.build(
+            prev_event_ids=[p for p, _ in prev_events],
+        )
+        context = yield self.state.compute_event_context(event)
         if requester:
             context.app_service = requester.app_service
 
-        if builder.is_state():
-            builder.prev_state = yield self.store.add_event_hashes(
-                context.prev_state_events
-            )
-
-        yield self.auth.add_auth_events(builder, context)
-
-        signing_key = self.hs.config.signing_key[0]
-        add_hashes_and_signatures(
-            builder, self.server_name, signing_key
-        )
-
-        event = builder.build()
+        self.validator.validate_new(event)
 
         logger.debug(
             "Created event %s",
@@ -603,8 +591,13 @@ class EventCreationHandler(object):
             extra_users (list(UserID)): Any extra users to notify about event
         """
 
+        if event.is_state() and (event.type, event.state_key) == (EventTypes.Create, ""):
+            room_version = event.content.get("room_version", RoomVersions.V1)
+        else:
+            room_version = yield self.store.get_room_version(event.room_id)
+
         try:
-            yield self.auth.check_from_context(event, context)
+            yield self.auth.check_from_context(room_version, event, context)
         except AuthError as err:
             logger.warn("Denying new event %r because %s", event, err)
             raise err
@@ -752,7 +745,8 @@ class EventCreationHandler(object):
             auth_events = {
                 (e.type, e.state_key): e for e in auth_events.values()
             }
-            if self.auth.check_redaction(event, auth_events=auth_events):
+            room_version = yield self.store.get_room_version(event.room_id)
+            if self.auth.check_redaction(room_version, event, auth_events=auth_events):
                 original_event = yield self.store.get_event(
                     event.redacts,
                     check_redacted=False,
@@ -766,6 +760,9 @@ class EventCreationHandler(object):
                         "You don't have permission to redact events"
                     )
 
+                # We've already checked.
+                event.internal_metadata.recheck_redaction = False
+
         if event.type == EventTypes.Create:
             prev_state_ids = yield context.get_prev_state_ids(self.store)
             if prev_state_ids:
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index f2be6c1185..084c1503da 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -235,6 +235,17 @@ class PaginationHandler(object):
                 "room_key", next_key
             )
 
+        if events:
+            if event_filter:
+                events = event_filter.filter(events)
+
+            events = yield filter_events_for_client(
+                self.store,
+                user_id,
+                events,
+                is_peeking=(member_event_id is None),
+            )
+
         if not events:
             defer.returnValue({
                 "chunk": [],
@@ -242,16 +253,6 @@ class PaginationHandler(object):
                 "end": next_token.to_string(),
             })
 
-        if event_filter:
-            events = event_filter.filter(events)
-
-        events = yield filter_events_for_client(
-            self.store,
-            user_id,
-            events,
-            is_peeking=(member_event_id is None),
-        )
-
         state = None
         if event_filter and event_filter.lazy_load_members() and len(events) > 0:
             # TODO: remove redundant members
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 015909bb26..24a4cb5a83 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -19,6 +19,7 @@ import logging
 from twisted.internet import defer
 
 from synapse import types
+from synapse.api.constants import LoginType
 from synapse.api.errors import (
     AuthError,
     Codes,
@@ -26,7 +27,14 @@ from synapse.api.errors import (
     RegistrationError,
     SynapseError,
 )
+from synapse.config.server import is_threepid_reserved
 from synapse.http.client import CaptchaServerHttpClient
+from synapse.http.servlet import assert_params_in_dict
+from synapse.replication.http.login import RegisterDeviceReplicationServlet
+from synapse.replication.http.register import (
+    ReplicationPostRegisterActionsServlet,
+    ReplicationRegisterServlet,
+)
 from synapse.types import RoomAlias, RoomID, UserID, create_requester
 from synapse.util.async_helpers import Linearizer
 from synapse.util.threepids import check_3pid_allowed
@@ -51,6 +59,7 @@ class RegistrationHandler(BaseHandler):
         self.profile_handler = hs.get_profile_handler()
         self.user_directory_handler = hs.get_user_directory_handler()
         self.captcha_client = CaptchaServerHttpClient(hs)
+        self.identity_handler = self.hs.get_handlers().identity_handler
 
         self._next_generated_user_id = None
 
@@ -61,6 +70,18 @@ class RegistrationHandler(BaseHandler):
         )
         self._server_notices_mxid = hs.config.server_notices_mxid
 
+        if hs.config.worker_app:
+            self._register_client = ReplicationRegisterServlet.make_client(hs)
+            self._register_device_client = (
+                RegisterDeviceReplicationServlet.make_client(hs)
+            )
+            self._post_registration_client = (
+                ReplicationPostRegisterActionsServlet.make_client(hs)
+            )
+        else:
+            self.device_handler = hs.get_device_handler()
+            self.pusher_pool = hs.get_pusherpool()
+
     @defer.inlineCallbacks
     def check_username(self, localpart, guest_access_token=None,
                        assigned_user_id=None):
@@ -126,6 +147,8 @@ class RegistrationHandler(BaseHandler):
         make_guest=False,
         admin=False,
         threepid=None,
+        user_type=None,
+        default_display_name=None,
     ):
         """Registers a new client on the server.
 
@@ -140,6 +163,10 @@ class RegistrationHandler(BaseHandler):
               since it offers no means of associating a device_id with the
               access_token. Instead you should call auth_handler.issue_access_token
               after registration.
+            user_type (str|None): type of user. One of the values from
+              api.constants.UserTypes, or None for a normal user.
+            default_display_name (unicode|None): if set, the new user's displayname
+              will be set to this. Defaults to 'localpart'.
         Returns:
             A tuple of (user_id, access_token).
         Raises:
@@ -149,7 +176,7 @@ class RegistrationHandler(BaseHandler):
         yield self.auth.check_auth_blocking(threepid=threepid)
         password_hash = None
         if password:
-            password_hash = yield self.auth_handler().hash(password)
+            password_hash = yield self._auth_handler.hash(password)
 
         if localpart:
             yield self.check_username(localpart, guest_access_token=guest_access_token)
@@ -169,20 +196,25 @@ class RegistrationHandler(BaseHandler):
             user = UserID(localpart, self.hs.hostname)
             user_id = user.to_string()
 
+            if was_guest:
+                # If the user was a guest then they already have a profile
+                default_display_name = None
+
+            elif default_display_name is None:
+                default_display_name = localpart
+
             token = None
             if generate_token:
                 token = self.macaroon_gen.generate_access_token(user_id)
-            yield self.store.register(
+            yield self._register_with_store(
                 user_id=user_id,
                 token=token,
                 password_hash=password_hash,
                 was_guest=was_guest,
                 make_guest=make_guest,
-                create_profile_with_localpart=(
-                    # If the user was a guest then they already have a profile
-                    None if was_guest else user.localpart
-                ),
+                create_profile_with_displayname=default_display_name,
                 admin=admin,
+                user_type=user_type,
             )
 
             if self.hs.config.user_directory_search_all_users:
@@ -203,13 +235,15 @@ class RegistrationHandler(BaseHandler):
                 yield self.check_user_id_not_appservice_exclusive(user_id)
                 if generate_token:
                     token = self.macaroon_gen.generate_access_token(user_id)
+                if default_display_name is None:
+                    default_display_name = localpart
                 try:
-                    yield self.store.register(
+                    yield self._register_with_store(
                         user_id=user_id,
                         token=token,
                         password_hash=password_hash,
                         make_guest=make_guest,
-                        create_profile_with_localpart=user.localpart,
+                        create_profile_with_displayname=default_display_name,
                     )
                 except SynapseError:
                     # if user id is taken, just generate another
@@ -233,9 +267,16 @@ class RegistrationHandler(BaseHandler):
         # auto-join the user to any rooms we're supposed to dump them into
         fake_requester = create_requester(user_id)
 
-        # try to create the room if we're the first user on the server
+        # try to create the room if we're the first real user on the server. Note
+        # that an auto-generated support user is not a real user and will never be
+        # the user to create the room
         should_auto_create_rooms = False
-        if self.hs.config.autocreate_auto_join_rooms:
+        is_support = yield self.store.is_support_user(user_id)
+        # There is an edge case where the first user is the support user, then
+        # the room is never created, though this seems unlikely and
+        # recoverable from given the support user being involved in the first
+        # place.
+        if self.hs.config.autocreate_auto_join_rooms and not is_support:
             count = yield self.store.count_all_users()
             should_auto_create_rooms = count == 1
         for r in self.hs.config.auto_join_rooms:
@@ -296,11 +337,11 @@ class RegistrationHandler(BaseHandler):
             user_id, allowed_appservice=service
         )
 
-        yield self.store.register(
+        yield self._register_with_store(
             user_id=user_id,
             password_hash="",
             appservice_id=service_id,
-            create_profile_with_localpart=user.localpart,
+            create_profile_with_displayname=user.localpart,
         )
         defer.returnValue(user_id)
 
@@ -328,35 +369,6 @@ class RegistrationHandler(BaseHandler):
             logger.info("Valid captcha entered from %s", ip)
 
     @defer.inlineCallbacks
-    def register_saml2(self, localpart):
-        """
-        Registers email_id as SAML2 Based Auth.
-        """
-        if types.contains_invalid_mxid_characters(localpart):
-            raise SynapseError(
-                400,
-                "User ID can only contain characters a-z, 0-9, or '=_-./'",
-            )
-        yield self.auth.check_auth_blocking()
-        user = UserID(localpart, self.hs.hostname)
-        user_id = user.to_string()
-
-        yield self.check_user_id_not_appservice_exclusive(user_id)
-        token = self.macaroon_gen.generate_access_token(user_id)
-        try:
-            yield self.store.register(
-                user_id=user_id,
-                token=token,
-                password_hash=None,
-                create_profile_with_localpart=user.localpart,
-            )
-        except Exception as e:
-            yield self.store.add_access_token_to_user(user_id, token)
-            # Ignore Registration errors
-            logger.exception(e)
-        defer.returnValue((user_id, token))
-
-    @defer.inlineCallbacks
     def register_email(self, threepidCreds):
         """
         Registers emails with an identity server.
@@ -368,8 +380,7 @@ class RegistrationHandler(BaseHandler):
             logger.info("validating threepidcred sid %s on id server %s",
                         c['sid'], c['idServer'])
             try:
-                identity_handler = self.hs.get_handlers().identity_handler
-                threepid = yield identity_handler.threepid_from_creds(c)
+                threepid = yield self.identity_handler.threepid_from_creds(c)
             except Exception:
                 logger.exception("Couldn't validate 3pid")
                 raise RegistrationError(400, "Couldn't validate 3pid")
@@ -393,9 +404,8 @@ class RegistrationHandler(BaseHandler):
 
         # Now we have a matrix ID, bind it to the threepids we were given
         for c in threepidCreds:
-            identity_handler = self.hs.get_handlers().identity_handler
             # XXX: This should be a deferred list, shouldn't it?
-            yield identity_handler.bind_threepid(c, user_id)
+            yield self.identity_handler.bind_threepid(c, user_id)
 
     def check_user_id_not_appservice_exclusive(self, user_id, allowed_appservice=None):
         # don't allow people to register the server notices mxid
@@ -503,11 +513,11 @@ class RegistrationHandler(BaseHandler):
         token = self.macaroon_gen.generate_access_token(user_id)
 
         if need_register:
-            yield self.store.register(
+            yield self._register_with_store(
                 user_id=user_id,
                 token=token,
                 password_hash=password_hash,
-                create_profile_with_localpart=user.localpart,
+                create_profile_with_displayname=user.localpart,
             )
         else:
             yield self._auth_handler.delete_access_tokens_for_user(user_id)
@@ -521,9 +531,6 @@ class RegistrationHandler(BaseHandler):
 
         defer.returnValue((user_id, token))
 
-    def auth_handler(self):
-        return self.hs.get_auth_handler()
-
     @defer.inlineCallbacks
     def get_or_register_3pid_guest(self, medium, address, inviter_user_id):
         """Get a guest access token for a 3PID, creating a guest account if
@@ -582,3 +589,275 @@ class RegistrationHandler(BaseHandler):
             action="join",
             ratelimit=False,
         )
+
+    def _register_with_store(self, user_id, token=None, password_hash=None,
+                             was_guest=False, make_guest=False, appservice_id=None,
+                             create_profile_with_displayname=None, admin=False,
+                             user_type=None):
+        """Register user in the datastore.
+
+        Args:
+            user_id (str): The desired user ID to register.
+            token (str): The desired access token to use for this user. If this
+                is not None, the given access token is associated with the user
+                id.
+            password_hash (str|None): Optional. The password hash for this user.
+            was_guest (bool): Optional. Whether this is a guest account being
+                upgraded to a non-guest account.
+            make_guest (boolean): True if the the new user should be guest,
+                false to add a regular user account.
+            appservice_id (str|None): The ID of the appservice registering the user.
+            create_profile_with_displayname (unicode|None): Optionally create a
+                profile for the user, setting their displayname to the given value
+            admin (boolean): is an admin user?
+            user_type (str|None): type of user. One of the values from
+                api.constants.UserTypes, or None for a normal user.
+
+        Returns:
+            Deferred
+        """
+        if self.hs.config.worker_app:
+            return self._register_client(
+                user_id=user_id,
+                token=token,
+                password_hash=password_hash,
+                was_guest=was_guest,
+                make_guest=make_guest,
+                appservice_id=appservice_id,
+                create_profile_with_displayname=create_profile_with_displayname,
+                admin=admin,
+                user_type=user_type,
+            )
+        else:
+            return self.store.register(
+                user_id=user_id,
+                token=token,
+                password_hash=password_hash,
+                was_guest=was_guest,
+                make_guest=make_guest,
+                appservice_id=appservice_id,
+                create_profile_with_displayname=create_profile_with_displayname,
+                admin=admin,
+                user_type=user_type,
+            )
+
+    @defer.inlineCallbacks
+    def register_device(self, user_id, device_id, initial_display_name,
+                        is_guest=False):
+        """Register a device for a user and generate an access token.
+
+        Args:
+            user_id (str): full canonical @user:id
+            device_id (str|None): The device ID to check, or None to generate
+                a new one.
+            initial_display_name (str|None): An optional display name for the
+                device.
+            is_guest (bool): Whether this is a guest account
+
+        Returns:
+            defer.Deferred[tuple[str, str]]: Tuple of device ID and access token
+        """
+
+        if self.hs.config.worker_app:
+            r = yield self._register_device_client(
+                user_id=user_id,
+                device_id=device_id,
+                initial_display_name=initial_display_name,
+                is_guest=is_guest,
+            )
+            defer.returnValue((r["device_id"], r["access_token"]))
+        else:
+            device_id = yield self.device_handler.check_device_registered(
+                user_id, device_id, initial_display_name
+            )
+            if is_guest:
+                access_token = self.macaroon_gen.generate_access_token(
+                    user_id, ["guest = true"]
+                )
+            else:
+                access_token = yield self._auth_handler.get_access_token_for_user_id(
+                    user_id, device_id=device_id,
+                )
+
+            defer.returnValue((device_id, access_token))
+
+    @defer.inlineCallbacks
+    def post_registration_actions(self, user_id, auth_result, access_token,
+                                  bind_email, bind_msisdn):
+        """A user has completed registration
+
+        Args:
+            user_id (str): The user ID that consented
+            auth_result (dict): The authenticated credentials of the newly
+                registered user.
+            access_token (str|None): The access token of the newly logged in
+                device, or None if `inhibit_login` enabled.
+            bind_email (bool): Whether to bind the email with the identity
+                server
+            bind_msisdn (bool): Whether to bind the msisdn with the identity
+                server
+        """
+        if self.hs.config.worker_app:
+            yield self._post_registration_client(
+                user_id=user_id,
+                auth_result=auth_result,
+                access_token=access_token,
+                bind_email=bind_email,
+                bind_msisdn=bind_msisdn,
+            )
+            return
+
+        if auth_result and LoginType.EMAIL_IDENTITY in auth_result:
+            threepid = auth_result[LoginType.EMAIL_IDENTITY]
+            # Necessary due to auth checks prior to the threepid being
+            # written to the db
+            if is_threepid_reserved(
+                self.hs.config.mau_limits_reserved_threepids, threepid
+            ):
+                yield self.store.upsert_monthly_active_user(user_id)
+
+            yield self._register_email_threepid(
+                user_id, threepid, access_token,
+                bind_email,
+            )
+
+        if auth_result and LoginType.MSISDN in auth_result:
+            threepid = auth_result[LoginType.MSISDN]
+            yield self._register_msisdn_threepid(
+                user_id, threepid, bind_msisdn,
+            )
+
+        if auth_result and LoginType.TERMS in auth_result:
+            yield self._on_user_consented(
+                user_id, self.hs.config.user_consent_version,
+            )
+
+    @defer.inlineCallbacks
+    def _on_user_consented(self, user_id, consent_version):
+        """A user consented to the terms on registration
+
+        Args:
+            user_id (str): The user ID that consented
+            consent_version (str): version of the policy the user has
+                consented to.
+        """
+        logger.info("%s has consented to the privacy policy", user_id)
+        yield self.store.user_set_consent_version(
+            user_id, consent_version,
+        )
+        yield self.post_consent_actions(user_id)
+
+    @defer.inlineCallbacks
+    def _register_email_threepid(self, user_id, threepid, token, bind_email):
+        """Add an email address as a 3pid identifier
+
+        Also adds an email pusher for the email address, if configured in the
+        HS config
+
+        Also optionally binds emails to the given user_id on the identity server
+
+        Must be called on master.
+
+        Args:
+            user_id (str): id of user
+            threepid (object): m.login.email.identity auth response
+            token (str|None): access_token for the user, or None if not logged
+                in.
+            bind_email (bool): true if the client requested the email to be
+                bound at the identity server
+        Returns:
+            defer.Deferred:
+        """
+        reqd = ('medium', 'address', 'validated_at')
+        if any(x not in threepid for x in reqd):
+            # This will only happen if the ID server returns a malformed response
+            logger.info("Can't add incomplete 3pid")
+            return
+
+        yield self._auth_handler.add_threepid(
+            user_id,
+            threepid['medium'],
+            threepid['address'],
+            threepid['validated_at'],
+        )
+
+        # And we add an email pusher for them by default, but only
+        # if email notifications are enabled (so people don't start
+        # getting mail spam where they weren't before if email
+        # notifs are set up on a home server)
+        if (self.hs.config.email_enable_notifs and
+                self.hs.config.email_notif_for_new_users
+                and token):
+            # Pull the ID of the access token back out of the db
+            # It would really make more sense for this to be passed
+            # up when the access token is saved, but that's quite an
+            # invasive change I'd rather do separately.
+            user_tuple = yield self.store.get_user_by_access_token(
+                token
+            )
+            token_id = user_tuple["token_id"]
+
+            yield self.pusher_pool.add_pusher(
+                user_id=user_id,
+                access_token=token_id,
+                kind="email",
+                app_id="m.email",
+                app_display_name="Email Notifications",
+                device_display_name=threepid["address"],
+                pushkey=threepid["address"],
+                lang=None,  # We don't know a user's language here
+                data={},
+            )
+
+        if bind_email:
+            logger.info("bind_email specified: binding")
+            logger.debug("Binding emails %s to %s" % (
+                threepid, user_id
+            ))
+            yield self.identity_handler.bind_threepid(
+                threepid['threepid_creds'], user_id
+            )
+        else:
+            logger.info("bind_email not specified: not binding email")
+
+    @defer.inlineCallbacks
+    def _register_msisdn_threepid(self, user_id, threepid, bind_msisdn):
+        """Add a phone number as a 3pid identifier
+
+        Also optionally binds msisdn to the given user_id on the identity server
+
+        Must be called on master.
+
+        Args:
+            user_id (str): id of user
+            threepid (object): m.login.msisdn auth response
+            token (str): access_token for the user
+            bind_email (bool): true if the client requested the email to be
+                bound at the identity server
+        Returns:
+            defer.Deferred:
+        """
+        try:
+            assert_params_in_dict(threepid, ['medium', 'address', 'validated_at'])
+        except SynapseError as ex:
+            if ex.errcode == Codes.MISSING_PARAM:
+                # This will only happen if the ID server returns a malformed response
+                logger.info("Can't add incomplete 3pid")
+                defer.returnValue(None)
+            raise
+
+        yield self._auth_handler.add_threepid(
+            user_id,
+            threepid['medium'],
+            threepid['address'],
+            threepid['validated_at'],
+        )
+
+        if bind_msisdn:
+            logger.info("bind_msisdn specified: binding")
+            logger.debug("Binding msisdn %s to %s", threepid, user_id)
+            yield self.identity_handler.bind_threepid(
+                threepid['threepid_creds'], user_id
+            )
+        else:
+            logger.info("bind_msisdn not specified: not binding msisdn")
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 3928faa6e7..67b15697fd 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -123,9 +123,12 @@ class RoomCreationHandler(BaseHandler):
                     token_id=requester.access_token_id,
                 )
             )
-            yield self.auth.check_from_context(tombstone_event, tombstone_context)
+            old_room_version = yield self.store.get_room_version(old_room_id)
+            yield self.auth.check_from_context(
+                old_room_version, tombstone_event, tombstone_context,
+            )
 
-            yield self.clone_exiting_room(
+            yield self.clone_existing_room(
                 requester,
                 old_room_id=old_room_id,
                 new_room_id=new_room_id,
@@ -230,7 +233,7 @@ class RoomCreationHandler(BaseHandler):
         )
 
     @defer.inlineCallbacks
-    def clone_exiting_room(
+    def clone_existing_room(
             self, requester, old_room_id, new_room_id, new_room_version,
             tombstone_event_id,
     ):
@@ -260,8 +263,19 @@ class RoomCreationHandler(BaseHandler):
             }
         }
 
+        # Check if old room was non-federatable
+
+        # Get old room's create event
+        old_room_create_event = yield self.store.get_create_event_for_room(old_room_id)
+
+        # Check if the create event specified a non-federatable room
+        if not old_room_create_event.content.get("m.federate", True):
+            # If so, mark the new room as non-federatable as well
+            creation_content["m.federate"] = False
+
         initial_state = dict()
 
+        # Replicate relevant room events
         types_to_copy = (
             (EventTypes.JoinRules, ""),
             (EventTypes.Name, ""),
@@ -269,6 +283,8 @@ class RoomCreationHandler(BaseHandler):
             (EventTypes.RoomHistoryVisibility, ""),
             (EventTypes.GuestAccess, ""),
             (EventTypes.RoomAvatar, ""),
+            (EventTypes.Encryption, ""),
+            (EventTypes.ServerACL, ""),
         )
 
         old_room_state_ids = yield self.store.get_filtered_current_state_ids(
@@ -295,6 +311,28 @@ class RoomCreationHandler(BaseHandler):
             creation_content=creation_content,
         )
 
+        # Transfer membership events
+        old_room_member_state_ids = yield self.store.get_filtered_current_state_ids(
+            old_room_id, StateFilter.from_types([(EventTypes.Member, None)]),
+        )
+
+        # map from event_id to BaseEvent
+        old_room_member_state_events = yield self.store.get_events(
+            old_room_member_state_ids.values(),
+        )
+        for k, old_event in iteritems(old_room_member_state_events):
+            # Only transfer ban events
+            if ("membership" in old_event.content and
+                    old_event.content["membership"] == "ban"):
+                yield self.room_member_handler.update_membership(
+                    requester,
+                    UserID.from_string(old_event['state_key']),
+                    new_room_id,
+                    "ban",
+                    ratelimit=False,
+                    content=old_event.content,
+                )
+
         # XXX invites/joins
         # XXX 3pid invites
 
@@ -433,7 +471,7 @@ class RoomCreationHandler(BaseHandler):
         """
         user_id = requester.user.to_string()
 
-        self.auth.check_auth_blocking(user_id)
+        yield self.auth.check_auth_blocking(user_id)
 
         if not self.spam_checker.user_may_create_room(user_id):
             raise SynapseError(403, "You are not permitted to create rooms")
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index dc88620885..13e212d669 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -73,8 +73,14 @@ class RoomListHandler(BaseHandler):
             # We explicitly don't bother caching searches or requests for
             # appservice specific lists.
             logger.info("Bypassing cache as search request.")
+
+            # XXX: Quick hack to stop room directory queries taking too long.
+            # Timeout request after 60s. Probably want a more fundamental
+            # solution at some point
+            timeout = self.clock.time() + 60
             return self._get_public_room_list(
-                limit, since_token, search_filter, network_tuple=network_tuple,
+                limit, since_token, search_filter,
+                network_tuple=network_tuple, timeout=timeout,
             )
 
         key = (limit, since_token, network_tuple)
@@ -87,7 +93,8 @@ class RoomListHandler(BaseHandler):
     @defer.inlineCallbacks
     def _get_public_room_list(self, limit=None, since_token=None,
                               search_filter=None,
-                              network_tuple=EMPTY_THIRD_PARTY_ID,):
+                              network_tuple=EMPTY_THIRD_PARTY_ID,
+                              timeout=None,):
         if since_token and since_token != "END":
             since_token = RoomListNextBatch.from_token(since_token)
         else:
@@ -202,6 +209,9 @@ class RoomListHandler(BaseHandler):
 
         chunk = []
         for i in range(0, len(rooms_to_scan), step):
+            if timeout and self.clock.time() > timeout:
+                raise Exception("Timed out searching room directory")
+
             batch = rooms_to_scan[i:i + step]
             logger.info("Processing %i rooms for result", len(batch))
             yield concurrently_execute(
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 07fd3e82fc..190ea2c7b1 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -61,9 +61,9 @@ class RoomMemberHandler(object):
 
         self.federation_handler = hs.get_handlers().federation_handler
         self.directory_handler = hs.get_handlers().directory_handler
-        self.registration_handler = hs.get_handlers().registration_handler
+        self.registration_handler = hs.get_registration_handler()
         self.profile_handler = hs.get_profile_handler()
-        self.event_creation_hander = hs.get_event_creation_handler()
+        self.event_creation_handler = hs.get_event_creation_handler()
 
         self.member_linearizer = Linearizer(name="member")
 
@@ -161,6 +161,8 @@ class RoomMemberHandler(object):
         ratelimit=True,
         content=None,
     ):
+        user_id = target.to_string()
+
         if content is None:
             content = {}
 
@@ -168,14 +170,14 @@ class RoomMemberHandler(object):
         if requester.is_guest:
             content["kind"] = "guest"
 
-        event, context = yield self.event_creation_hander.create_event(
+        event, context = yield self.event_creation_handler.create_event(
             requester,
             {
                 "type": EventTypes.Member,
                 "content": content,
                 "room_id": room_id,
                 "sender": requester.user.to_string(),
-                "state_key": target.to_string(),
+                "state_key": user_id,
 
                 # For backwards compatibility:
                 "membership": membership,
@@ -186,14 +188,14 @@ class RoomMemberHandler(object):
         )
 
         # Check if this event matches the previous membership event for the user.
-        duplicate = yield self.event_creation_hander.deduplicate_state_event(
+        duplicate = yield self.event_creation_handler.deduplicate_state_event(
             event, context,
         )
         if duplicate is not None:
             # Discard the new event since this membership change is a no-op.
             defer.returnValue(duplicate)
 
-        yield self.event_creation_hander.handle_new_client_event(
+        yield self.event_creation_handler.handle_new_client_event(
             requester,
             event,
             context,
@@ -204,12 +206,12 @@ class RoomMemberHandler(object):
         prev_state_ids = yield context.get_prev_state_ids(self.store)
 
         prev_member_event_id = prev_state_ids.get(
-            (EventTypes.Member, target.to_string()),
+            (EventTypes.Member, user_id),
             None
         )
 
         if event.membership == Membership.JOIN:
-            # Only fire user_joined_room if the user has acutally joined the
+            # Only fire user_joined_room if the user has actually joined the
             # room. Don't bother if the user is just changing their profile
             # info.
             newly_joined = True
@@ -218,6 +220,18 @@ class RoomMemberHandler(object):
                 newly_joined = prev_member_event.membership != Membership.JOIN
             if newly_joined:
                 yield self._user_joined_room(target, room_id)
+
+            # Copy over direct message status and room tags if this is a join
+            # on an upgraded room
+
+            # Check if this is an upgraded room
+            predecessor = yield self.store.get_room_predecessor(room_id)
+
+            if predecessor:
+                # It is an upgraded room. Copy over old tags
+                self.copy_room_tags_and_direct_to_room(
+                    predecessor["room_id"], room_id, user_id,
+                )
         elif event.membership == Membership.LEAVE:
             if prev_member_event_id:
                 prev_member_event = yield self.store.get_event(prev_member_event_id)
@@ -227,6 +241,55 @@ class RoomMemberHandler(object):
         defer.returnValue(event)
 
     @defer.inlineCallbacks
+    def copy_room_tags_and_direct_to_room(
+        self,
+        old_room_id,
+        new_room_id,
+        user_id,
+    ):
+        """Copies the tags and direct room state from one room to another.
+
+        Args:
+            old_room_id (str)
+            new_room_id (str)
+            user_id (str)
+
+        Returns:
+            Deferred[None]
+        """
+        # Retrieve user account data for predecessor room
+        user_account_data, _ = yield self.store.get_account_data_for_user(
+            user_id,
+        )
+
+        # Copy direct message state if applicable
+        direct_rooms = user_account_data.get("m.direct", {})
+
+        # Check which key this room is under
+        if isinstance(direct_rooms, dict):
+            for key, room_id_list in direct_rooms.items():
+                if old_room_id in room_id_list and new_room_id not in room_id_list:
+                    # Add new room_id to this key
+                    direct_rooms[key].append(new_room_id)
+
+                    # Save back to user's m.direct account data
+                    yield self.store.add_account_data_for_user(
+                        user_id, "m.direct", direct_rooms,
+                    )
+                    break
+
+        # Copy room tags if applicable
+        room_tags = yield self.store.get_tags_for_room(
+            user_id, old_room_id,
+        )
+
+        # Copy each room tag to the new room
+        for tag, tag_content in room_tags.items():
+            yield self.store.add_tag_to_room(
+                user_id, new_room_id, tag, tag_content
+            )
+
+    @defer.inlineCallbacks
     def update_membership(
             self,
             requester,
@@ -493,7 +556,7 @@ class RoomMemberHandler(object):
         else:
             requester = synapse.types.create_requester(target_user)
 
-        prev_event = yield self.event_creation_hander.deduplicate_state_event(
+        prev_event = yield self.event_creation_handler.deduplicate_state_event(
             event, context,
         )
         if prev_event is not None:
@@ -513,7 +576,7 @@ class RoomMemberHandler(object):
             if is_blocked:
                 raise SynapseError(403, "This room has been blocked on this server")
 
-        yield self.event_creation_hander.handle_new_client_event(
+        yield self.event_creation_handler.handle_new_client_event(
             requester,
             event,
             context,
@@ -527,7 +590,7 @@ class RoomMemberHandler(object):
         )
 
         if event.membership == Membership.JOIN:
-            # Only fire user_joined_room if the user has acutally joined the
+            # Only fire user_joined_room if the user has actually joined the
             # room. Don't bother if the user is just changing their profile
             # info.
             newly_joined = True
@@ -755,7 +818,7 @@ class RoomMemberHandler(object):
             )
         )
 
-        yield self.event_creation_hander.create_and_send_nonmember_event(
+        yield self.event_creation_handler.create_and_send_nonmember_event(
             requester,
             {
                 "type": EventTypes.ThirdPartyInvite,
@@ -877,7 +940,8 @@ class RoomMemberHandler(object):
         # first member event?
         create_event_id = current_state_ids.get(("m.room.create", ""))
         if len(current_state_ids) == 1 and create_event_id:
-            defer.returnValue(self.hs.is_mine_id(create_event_id))
+            # We can only get here if we're in the process of creating the room
+            defer.returnValue(True)
 
         for etype, state_key in current_state_ids:
             if etype != EventTypes.Member or not self.hs.is_mine_id(state_key):
diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
index ec936bbb4e..49c439313e 100644
--- a/synapse/handlers/search.py
+++ b/synapse/handlers/search.py
@@ -38,6 +38,41 @@ class SearchHandler(BaseHandler):
         super(SearchHandler, self).__init__(hs)
 
     @defer.inlineCallbacks
+    def get_old_rooms_from_upgraded_room(self, room_id):
+        """Retrieves room IDs of old rooms in the history of an upgraded room.
+
+        We do so by checking the m.room.create event of the room for a
+        `predecessor` key. If it exists, we add the room ID to our return
+        list and then check that room for a m.room.create event and so on
+        until we can no longer find any more previous rooms.
+
+        The full list of all found rooms in then returned.
+
+        Args:
+            room_id (str): id of the room to search through.
+
+        Returns:
+            Deferred[iterable[unicode]]: predecessor room ids
+        """
+
+        historical_room_ids = []
+
+        while True:
+            predecessor = yield self.store.get_room_predecessor(room_id)
+
+            # If no predecessor, assume we've hit a dead end
+            if not predecessor:
+                break
+
+            # Add predecessor's room ID
+            historical_room_ids.append(predecessor["room_id"])
+
+            # Scan through the old room for further predecessors
+            room_id = predecessor["room_id"]
+
+        defer.returnValue(historical_room_ids)
+
+    @defer.inlineCallbacks
     def search(self, user, content, batch=None):
         """Performs a full text search for a user.
 
@@ -137,6 +172,18 @@ class SearchHandler(BaseHandler):
         )
         room_ids = set(r.room_id for r in rooms)
 
+        # If doing a subset of all rooms seearch, check if any of the rooms
+        # are from an upgraded room, and search their contents as well
+        if search_filter.rooms:
+            historical_room_ids = []
+            for room_id in search_filter.rooms:
+                # Add any previous rooms to the search if they exist
+                ids = yield self.get_old_rooms_from_upgraded_room(room_id)
+                historical_room_ids += ids
+
+            # Prevent any historical events from being filtered
+            search_filter = search_filter.with_room_ids(historical_room_ids)
+
         room_ids = search_filter.filter_rooms(room_ids)
 
         if batch_group == "room_id":
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 09739f2862..bd97241ab4 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -895,14 +895,17 @@ class SyncHandler(object):
         Returns:
             Deferred(SyncResult)
         """
-        logger.info("Calculating sync response for %r", sync_config.user)
-
         # NB: The now_token gets changed by some of the generate_sync_* methods,
         # this is due to some of the underlying streams not supporting the ability
         # to query up to a given point.
         # Always use the `now_token` in `SyncResultBuilder`
         now_token = yield self.event_sources.get_current_token()
 
+        logger.info(
+            "Calculating sync response for %r between %s and %s",
+            sync_config.user, since_token, now_token,
+        )
+
         user_id = sync_config.user.to_string()
         app_service = self.store.get_app_service_by_user_id(user_id)
         if app_service:
@@ -1390,6 +1393,12 @@ class SyncHandler(object):
         room_entries = []
         invited = []
         for room_id, events in iteritems(mem_change_events_by_room_id):
+            logger.info(
+                "Membership changes in %s: [%s]",
+                room_id,
+                ", ".join(("%s (%s)" % (e.event_id, e.membership) for e in events)),
+            )
+
             non_joins = [e for e in events if e.membership != Membership.JOIN]
             has_join = len(non_joins) != len(events)
 
@@ -1473,10 +1482,22 @@ class SyncHandler(object):
                 if since_token and since_token.is_after(leave_token):
                     continue
 
+                # If this is an out of band message, like a remote invite
+                # rejection, we include it in the recents batch. Otherwise, we
+                # let _load_filtered_recents handle fetching the correct
+                # batches.
+                #
+                # This is all screaming out for a refactor, as the logic here is
+                # subtle and the moving parts numerous.
+                if leave_event.internal_metadata.is_out_of_band_membership():
+                    batch_events = [leave_event]
+                else:
+                    batch_events = None
+
                 room_entries.append(RoomSyncResultBuilder(
                     room_id=room_id,
                     rtype="archived",
-                    events=None,
+                    events=batch_events,
                     newly_joined=room_id in newly_joined_rooms,
                     full_state=False,
                     since_token=since_token,
@@ -1668,13 +1689,17 @@ class SyncHandler(object):
                 "content": content,
             })
 
-        account_data = sync_config.filter_collection.filter_room_account_data(
+        account_data_events = sync_config.filter_collection.filter_room_account_data(
             account_data_events
         )
 
         ephemeral = sync_config.filter_collection.filter_room_ephemeral(ephemeral)
 
-        if not (always_include or batch or account_data or ephemeral or full_state):
+        if not (always_include
+                or batch
+                or account_data_events
+                or ephemeral
+                or full_state):
             return
 
         state = yield self.compute_state_delta(
@@ -1745,7 +1770,7 @@ class SyncHandler(object):
                 room_id=room_id,
                 timeline=batch,
                 state=state,
-                account_data=account_data,
+                account_data=account_data_events,
             )
             if room_sync or always_include:
                 sync_result_builder.archived.append(room_sync)
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index f11b430126..283c6c1b81 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -19,6 +19,7 @@ from six import iteritems
 
 from twisted.internet import defer
 
+import synapse.metrics
 from synapse.api.constants import EventTypes, JoinRules, Membership
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage.roommember import ProfileInfo
@@ -125,9 +126,12 @@ class UserDirectoryHandler(object):
         """
         # FIXME(#3714): We should probably do this in the same worker as all
         # the other changes.
-        yield self.store.update_profile_in_user_dir(
-            user_id, profile.display_name, profile.avatar_url, None,
-        )
+        is_support = yield self.store.is_support_user(user_id)
+        # Support users are for diagnostics and should not appear in the user directory.
+        if not is_support:
+            yield self.store.update_profile_in_user_dir(
+                user_id, profile.display_name, profile.avatar_url, None
+            )
 
     @defer.inlineCallbacks
     def handle_user_deactivated(self, user_id):
@@ -160,6 +164,12 @@ class UserDirectoryHandler(object):
                 yield self._handle_deltas(deltas)
 
                 self.pos = deltas[-1]["stream_id"]
+
+                # Expose current event processing position to prometheus
+                synapse.metrics.event_processing_positions.labels("user_dir").set(
+                    self.pos
+                )
+
                 yield self.store.update_user_directory_stream_pos(self.pos)
 
     @defer.inlineCallbacks
@@ -182,21 +192,25 @@ class UserDirectoryHandler(object):
             logger.info("Handling room %d/%d", num_processed_rooms + 1, len(room_ids))
             yield self._handle_initial_room(room_id)
             num_processed_rooms += 1
-            yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
+            yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0)
 
         logger.info("Processed all rooms.")
 
         if self.search_all_users:
             num_processed_users = 0
             user_ids = yield self.store.get_all_local_users()
-            logger.info("Doing initial update of user directory. %d users", len(user_ids))
+            logger.info(
+                "Doing initial update of user directory. %d users", len(user_ids)
+            )
             for user_id in user_ids:
                 # We add profiles for all users even if they don't match the
                 # include pattern, just in case we want to change it in future
-                logger.info("Handling user %d/%d", num_processed_users + 1, len(user_ids))
+                logger.info(
+                    "Handling user %d/%d", num_processed_users + 1, len(user_ids)
+                )
                 yield self._handle_local_user(user_id)
                 num_processed_users += 1
-                yield self.clock.sleep(self.INITIAL_USER_SLEEP_MS / 1000.)
+                yield self.clock.sleep(self.INITIAL_USER_SLEEP_MS / 1000.0)
 
             logger.info("Processed all users")
 
@@ -215,24 +229,24 @@ class UserDirectoryHandler(object):
         if not is_in_room:
             return
 
-        is_public = yield self.store.is_room_world_readable_or_publicly_joinable(room_id)
+        is_public = yield self.store.is_room_world_readable_or_publicly_joinable(
+            room_id
+        )
 
         users_with_profile = yield self.state.get_current_user_in_room(room_id)
         user_ids = set(users_with_profile)
         unhandled_users = user_ids - self.initially_handled_users
 
         yield self.store.add_profiles_to_user_dir(
-            room_id, {
-                user_id: users_with_profile[user_id] for user_id in unhandled_users
-            }
+            room_id,
+            {user_id: users_with_profile[user_id] for user_id in unhandled_users},
         )
 
         self.initially_handled_users |= unhandled_users
 
         if is_public:
             yield self.store.add_users_to_public_room(
-                room_id,
-                user_ids=user_ids - self.initially_handled_users_in_public
+                room_id, user_ids=user_ids - self.initially_handled_users_in_public
             )
             self.initially_handled_users_in_public |= user_ids
 
@@ -244,7 +258,7 @@ class UserDirectoryHandler(object):
         count = 0
         for user_id in user_ids:
             if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
-                yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
+                yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0)
 
             if not self.is_mine_id(user_id):
                 count += 1
@@ -259,7 +273,7 @@ class UserDirectoryHandler(object):
                     continue
 
                 if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
-                    yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
+                    yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0)
                 count += 1
 
                 user_set = (user_id, other_user_id)
@@ -281,25 +295,23 @@ class UserDirectoryHandler(object):
 
                 if len(to_insert) > self.INITIAL_ROOM_BATCH_SIZE:
                     yield self.store.add_users_who_share_room(
-                        room_id, not is_public, to_insert,
+                        room_id, not is_public, to_insert
                     )
                     to_insert.clear()
 
                 if len(to_update) > self.INITIAL_ROOM_BATCH_SIZE:
                     yield self.store.update_users_who_share_room(
-                        room_id, not is_public, to_update,
+                        room_id, not is_public, to_update
                     )
                     to_update.clear()
 
         if to_insert:
-            yield self.store.add_users_who_share_room(
-                room_id, not is_public, to_insert,
-            )
+            yield self.store.add_users_who_share_room(room_id, not is_public, to_insert)
             to_insert.clear()
 
         if to_update:
             yield self.store.update_users_who_share_room(
-                room_id, not is_public, to_update,
+                room_id, not is_public, to_update
             )
             to_update.clear()
 
@@ -320,50 +332,55 @@ class UserDirectoryHandler(object):
             # may have become public or not and add/remove the users in said room
             if typ in (EventTypes.RoomHistoryVisibility, EventTypes.JoinRules):
                 yield self._handle_room_publicity_change(
-                    room_id, prev_event_id, event_id, typ,
+                    room_id, prev_event_id, event_id, typ
                 )
             elif typ == EventTypes.Member:
                 change = yield self._get_key_change(
-                    prev_event_id, event_id,
+                    prev_event_id,
+                    event_id,
                     key_name="membership",
                     public_value=Membership.JOIN,
                 )
 
-                if change is None:
-                    # Handle any profile changes
-                    yield self._handle_profile_change(
-                        state_key, room_id, prev_event_id, event_id,
-                    )
-                    continue
-
-                if not change:
+                if change is False:
                     # Need to check if the server left the room entirely, if so
                     # we might need to remove all the users in that room
                     is_in_room = yield self.store.is_host_joined(
-                        room_id, self.server_name,
+                        room_id, self.server_name
                     )
                     if not is_in_room:
                         logger.info("Server left room: %r", room_id)
                         # Fetch all the users that we marked as being in user
                         # directory due to being in the room and then check if
                         # need to remove those users or not
-                        user_ids = yield self.store.get_users_in_dir_due_to_room(room_id)
+                        user_ids = yield self.store.get_users_in_dir_due_to_room(
+                            room_id
+                        )
                         for user_id in user_ids:
                             yield self._handle_remove_user(room_id, user_id)
                         return
                     else:
                         logger.debug("Server is still in room: %r", room_id)
 
-                if change:  # The user joined
-                    event = yield self.store.get_event(event_id, allow_none=True)
-                    profile = ProfileInfo(
-                        avatar_url=event.content.get("avatar_url"),
-                        display_name=event.content.get("displayname"),
-                    )
+                is_support = yield self.store.is_support_user(state_key)
+                if not is_support:
+                    if change is None:
+                        # Handle any profile changes
+                        yield self._handle_profile_change(
+                            state_key, room_id, prev_event_id, event_id
+                        )
+                        continue
+
+                    if change:  # The user joined
+                        event = yield self.store.get_event(event_id, allow_none=True)
+                        profile = ProfileInfo(
+                            avatar_url=event.content.get("avatar_url"),
+                            display_name=event.content.get("displayname"),
+                        )
 
-                    yield self._handle_new_user(room_id, state_key, profile)
-                else:  # The user left
-                    yield self._handle_remove_user(room_id, state_key)
+                        yield self._handle_new_user(room_id, state_key, profile)
+                    else:  # The user left
+                        yield self._handle_remove_user(room_id, state_key)
             else:
                 logger.debug("Ignoring irrelevant type: %r", typ)
 
@@ -382,13 +399,15 @@ class UserDirectoryHandler(object):
 
         if typ == EventTypes.RoomHistoryVisibility:
             change = yield self._get_key_change(
-                prev_event_id, event_id,
+                prev_event_id,
+                event_id,
                 key_name="history_visibility",
                 public_value="world_readable",
             )
         elif typ == EventTypes.JoinRules:
             change = yield self._get_key_change(
-                prev_event_id, event_id,
+                prev_event_id,
+                event_id,
                 key_name="join_rule",
                 public_value=JoinRules.PUBLIC,
             )
@@ -513,7 +532,7 @@ class UserDirectoryHandler(object):
             )
             if self.is_mine_id(other_user_id) and not is_appservice:
                 shared_is_private = yield self.store.get_if_users_share_a_room(
-                    other_user_id, user_id,
+                    other_user_id, user_id
                 )
                 if shared_is_private is True:
                     # We've already marked in the database they share a private room
@@ -528,13 +547,11 @@ class UserDirectoryHandler(object):
                     to_insert.add((other_user_id, user_id))
 
         if to_insert:
-            yield self.store.add_users_who_share_room(
-                room_id, not is_public, to_insert,
-            )
+            yield self.store.add_users_who_share_room(room_id, not is_public, to_insert)
 
         if to_update:
             yield self.store.update_users_who_share_room(
-                room_id, not is_public, to_update,
+                room_id, not is_public, to_update
             )
 
     @defer.inlineCallbacks
@@ -553,15 +570,15 @@ class UserDirectoryHandler(object):
         row = yield self.store.get_user_in_public_room(user_id)
         update_user_in_public = row and row["room_id"] == room_id
 
-        if (update_user_in_public or update_user_dir):
+        if update_user_in_public or update_user_dir:
             # XXX: Make this faster?
             rooms = yield self.store.get_rooms_for_user(user_id)
             for j_room_id in rooms:
-                if (not update_user_in_public and not update_user_dir):
+                if not update_user_in_public and not update_user_dir:
                     break
 
                 is_in_room = yield self.store.is_host_joined(
-                    j_room_id, self.server_name,
+                    j_room_id, self.server_name
                 )
 
                 if not is_in_room:
@@ -589,19 +606,19 @@ class UserDirectoryHandler(object):
         # Get a list of user tuples that were in the DB due to this room and
         # users (this includes tuples where the other user matches `user_id`)
         user_tuples = yield self.store.get_users_in_share_dir_with_room_id(
-            user_id, room_id,
+            user_id, room_id
         )
 
         for user_id, other_user_id in user_tuples:
             # For each user tuple get a list of rooms that they still share,
             # trying to find a private room, and update the entry in the DB
-            rooms = yield self.store.get_rooms_in_common_for_users(user_id, other_user_id)
+            rooms = yield self.store.get_rooms_in_common_for_users(
+                user_id, other_user_id
+            )
 
             # If they dont share a room anymore, remove the mapping
             if not rooms:
-                yield self.store.remove_user_who_share_room(
-                    user_id, other_user_id,
-                )
+                yield self.store.remove_user_who_share_room(user_id, other_user_id)
                 continue
 
             found_public_share = None
@@ -615,13 +632,13 @@ class UserDirectoryHandler(object):
                 else:
                     found_public_share = None
                     yield self.store.update_users_who_share_room(
-                        room_id, not is_public, [(user_id, other_user_id)],
+                        room_id, not is_public, [(user_id, other_user_id)]
                     )
                     break
 
             if found_public_share:
                 yield self.store.update_users_who_share_room(
-                    room_id, not is_public, [(user_id, other_user_id)],
+                    room_id, not is_public, [(user_id, other_user_id)]
                 )
 
     @defer.inlineCallbacks
@@ -649,7 +666,7 @@ class UserDirectoryHandler(object):
 
         if prev_name != new_name or prev_avatar != new_avatar:
             yield self.store.update_profile_in_user_dir(
-                user_id, new_name, new_avatar, room_id,
+                user_id, new_name, new_avatar, room_id
             )
 
     @defer.inlineCallbacks