summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/appservice.py27
-rw-r--r--synapse/handlers/auth.py84
-rw-r--r--synapse/handlers/deactivate_account.py17
-rw-r--r--synapse/handlers/device.py2
-rw-r--r--synapse/handlers/directory.py135
-rw-r--r--synapse/handlers/e2e_keys.py7
-rw-r--r--synapse/handlers/e2e_room_keys.py297
-rw-r--r--synapse/handlers/federation.py668
-rw-r--r--synapse/handlers/groups_local.py18
-rw-r--r--synapse/handlers/identity.py32
-rw-r--r--synapse/handlers/initial_sync.py10
-rw-r--r--synapse/handlers/message.py140
-rw-r--r--synapse/handlers/pagination.py29
-rw-r--r--synapse/handlers/presence.py33
-rw-r--r--synapse/handlers/profile.py41
-rw-r--r--synapse/handlers/read_marker.py2
-rw-r--r--synapse/handlers/receipts.py18
-rw-r--r--synapse/handlers/register.py80
-rw-r--r--synapse/handlers/room.py456
-rw-r--r--synapse/handlers/room_list.py15
-rw-r--r--synapse/handlers/room_member.py8
-rw-r--r--synapse/handlers/room_member_worker.py41
-rw-r--r--synapse/handlers/search.py25
-rw-r--r--synapse/handlers/sync.py416
-rw-r--r--synapse/handlers/typing.py37
-rw-r--r--synapse/handlers/user_directory.py18
26 files changed, 2033 insertions, 623 deletions
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index ee41aed69e..17eedf4dbf 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -23,7 +23,12 @@ from twisted.internet import defer
 
 import synapse
 from synapse.api.constants import EventTypes
+from synapse.metrics import (
+    event_processing_loop_counter,
+    event_processing_loop_room_count,
+)
 from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.util import log_failure
 from synapse.util.logcontext import make_deferred_yieldable, run_in_background
 from synapse.util.metrics import Measure
 
@@ -32,17 +37,6 @@ logger = logging.getLogger(__name__)
 events_processed_counter = Counter("synapse_handlers_appservice_events_processed", "")
 
 
-def log_failure(failure):
-    logger.error(
-        "Application Services Failure",
-        exc_info=(
-            failure.type,
-            failure.value,
-            failure.getTracebackObject()
-        )
-    )
-
-
 class ApplicationServicesHandler(object):
 
     def __init__(self, hs):
@@ -108,7 +102,10 @@ class ApplicationServicesHandler(object):
 
                         if not self.started_scheduler:
                             def start_scheduler():
-                                return self.scheduler.start().addErrback(log_failure)
+                                return self.scheduler.start().addErrback(
+                                    log_failure, "Application Services Failure",
+                                )
+
                             run_as_background_process("as_scheduler", start_scheduler)
                             self.started_scheduler = True
 
@@ -136,6 +133,12 @@ class ApplicationServicesHandler(object):
 
                     events_processed_counter.inc(len(events))
 
+                    event_processing_loop_room_count.labels(
+                        "appservice_sender"
+                    ).inc(len(events_by_room))
+
+                    event_processing_loop_counter.labels("appservice_sender").inc()
+
                     synapse.metrics.event_processing_lag.labels(
                         "appservice_sender").set(now - ts)
                     synapse.metrics.event_processing_last_ts.labels(
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 184eef09d0..c6e89db4bc 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -22,7 +22,7 @@ import bcrypt
 import pymacaroons
 from canonicaljson import json
 
-from twisted.internet import defer, threads
+from twisted.internet import defer
 from twisted.web.client import PartialDownloadError
 
 import synapse.util.stringutils as stringutils
@@ -37,8 +37,8 @@ from synapse.api.errors import (
 )
 from synapse.module_api import ModuleApi
 from synapse.types import UserID
+from synapse.util import logcontext
 from synapse.util.caches.expiringcache import ExpiringCache
-from synapse.util.logcontext import make_deferred_yieldable
 
 from ._base import BaseHandler
 
@@ -59,6 +59,7 @@ class AuthHandler(BaseHandler):
             LoginType.EMAIL_IDENTITY: self._check_email_identity,
             LoginType.MSISDN: self._check_msisdn,
             LoginType.DUMMY: self._check_dummy_auth,
+            LoginType.TERMS: self._check_terms_auth,
         }
         self.bcrypt_rounds = hs.config.bcrypt_rounds
 
@@ -431,6 +432,9 @@ class AuthHandler(BaseHandler):
     def _check_dummy_auth(self, authdict, _):
         return defer.succeed(True)
 
+    def _check_terms_auth(self, authdict, _):
+        return defer.succeed(True)
+
     @defer.inlineCallbacks
     def _check_threepid(self, medium, authdict):
         if 'threepid_creds' not in authdict:
@@ -462,6 +466,22 @@ class AuthHandler(BaseHandler):
     def _get_params_recaptcha(self):
         return {"public_key": self.hs.config.recaptcha_public_key}
 
+    def _get_params_terms(self):
+        return {
+            "policies": {
+                "privacy_policy": {
+                    "version": self.hs.config.user_consent_version,
+                    "en": {
+                        "name": self.hs.config.user_consent_policy_name,
+                        "url": "%s_matrix/consent?v=%s" % (
+                            self.hs.config.public_baseurl,
+                            self.hs.config.user_consent_version,
+                        ),
+                    },
+                },
+            },
+        }
+
     def _auth_dict_for_flows(self, flows, session):
         public_flows = []
         for f in flows:
@@ -469,6 +489,7 @@ class AuthHandler(BaseHandler):
 
         get_params = {
             LoginType.RECAPTCHA: self._get_params_recaptcha,
+            LoginType.TERMS: self._get_params_terms,
         }
 
         params = {}
@@ -520,7 +541,7 @@ class AuthHandler(BaseHandler):
         """
         logger.info("Logging in user %s on device %s", user_id, device_id)
         access_token = yield self.issue_access_token(user_id, device_id)
-        yield self._check_mau_limits()
+        yield self.auth.check_auth_blocking(user_id)
 
         # the device *should* have been registered before we got here; however,
         # it's possible we raced against a DELETE operation. The thing we
@@ -734,7 +755,6 @@ class AuthHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def validate_short_term_login_token_and_get_user_id(self, login_token):
-        yield self._check_mau_limits()
         auth_api = self.hs.get_auth()
         user_id = None
         try:
@@ -743,6 +763,7 @@ class AuthHandler(BaseHandler):
             auth_api.validate_macaroon(macaroon, "login", True, user_id)
         except Exception:
             raise AuthError(403, "Invalid token", errcode=Codes.FORBIDDEN)
+        yield self.auth.check_auth_blocking(user_id)
         defer.returnValue(user_id)
 
     @defer.inlineCallbacks
@@ -828,12 +849,26 @@ class AuthHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def delete_threepid(self, user_id, medium, address):
+        """Attempts to unbind the 3pid on the identity servers and deletes it
+        from the local database.
+
+        Args:
+            user_id (str)
+            medium (str)
+            address (str)
+
+        Returns:
+            Deferred[bool]: Returns True if successfully unbound the 3pid on
+            the identity server, False if identity server doesn't support the
+            unbind API.
+        """
+
         # 'Canonicalise' email addresses as per above
         if medium == 'email':
             address = address.lower()
 
         identity_handler = self.hs.get_handlers().identity_handler
-        yield identity_handler.unbind_threepid(
+        result = yield identity_handler.try_unbind_threepid(
             user_id,
             {
                 'medium': medium,
@@ -841,10 +876,10 @@ class AuthHandler(BaseHandler):
             },
         )
 
-        ret = yield self.store.user_delete_threepid(
+        yield self.store.user_delete_threepid(
             user_id, medium, address,
         )
-        defer.returnValue(ret)
+        defer.returnValue(result)
 
     def _save_session(self, session):
         # TODO: Persistent storage
@@ -870,56 +905,35 @@ class AuthHandler(BaseHandler):
                 bcrypt.gensalt(self.bcrypt_rounds),
             ).decode('ascii')
 
-        return make_deferred_yieldable(
-            threads.deferToThreadPool(
-                self.hs.get_reactor(), self.hs.get_reactor().getThreadPool(), _do_hash
-            ),
-        )
+        return logcontext.defer_to_thread(self.hs.get_reactor(), _do_hash)
 
     def validate_hash(self, password, stored_hash):
         """Validates that self.hash(password) == stored_hash.
 
         Args:
             password (unicode): Password to hash.
-            stored_hash (unicode): Expected hash value.
+            stored_hash (bytes): Expected hash value.
 
         Returns:
             Deferred(bool): Whether self.hash(password) == stored_hash.
         """
-
         def _do_validate_hash():
             # Normalise the Unicode in the password
             pw = unicodedata.normalize("NFKC", password)
 
             return bcrypt.checkpw(
                 pw.encode('utf8') + self.hs.config.password_pepper.encode("utf8"),
-                stored_hash.encode('utf8')
+                stored_hash
             )
 
         if stored_hash:
-            return make_deferred_yieldable(
-                threads.deferToThreadPool(
-                    self.hs.get_reactor(),
-                    self.hs.get_reactor().getThreadPool(),
-                    _do_validate_hash,
-                ),
-            )
+            if not isinstance(stored_hash, bytes):
+                stored_hash = stored_hash.encode('ascii')
+
+            return logcontext.defer_to_thread(self.hs.get_reactor(), _do_validate_hash)
         else:
             return defer.succeed(False)
 
-    @defer.inlineCallbacks
-    def _check_mau_limits(self):
-        """
-        Ensure that if mau blocking is enabled that invalid users cannot
-        log in.
-        """
-        if self.hs.config.limit_usage_by_mau is True:
-            current_mau = yield self.store.count_monthly_users()
-            if current_mau >= self.hs.config.max_mau_value:
-                raise AuthError(
-                    403, "MAU Limit Exceeded", errcode=Codes.MAU_LIMIT_EXCEEDED
-                )
-
 
 @attr.s
 class MacaroonGenerator(object):
diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py
index b3c5a9ee64..75fe50c42c 100644
--- a/synapse/handlers/deactivate_account.py
+++ b/synapse/handlers/deactivate_account.py
@@ -17,8 +17,8 @@ import logging
 from twisted.internet import defer
 
 from synapse.api.errors import SynapseError
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.types import UserID, create_requester
-from synapse.util.logcontext import run_in_background
 
 from ._base import BaseHandler
 
@@ -51,7 +51,8 @@ class DeactivateAccountHandler(BaseHandler):
             erase_data (bool): whether to GDPR-erase the user's data
 
         Returns:
-            Deferred
+            Deferred[bool]: True if identity server supports removing
+            threepids, otherwise False.
         """
         # FIXME: Theoretically there is a race here wherein user resets
         # password using threepid.
@@ -60,16 +61,22 @@ class DeactivateAccountHandler(BaseHandler):
         # leave the user still active so they can try again.
         # Ideally we would prevent password resets and then do this in the
         # background thread.
+
+        # This will be set to false if the identity server doesn't support
+        # unbinding
+        identity_server_supports_unbinding = True
+
         threepids = yield self.store.user_get_threepids(user_id)
         for threepid in threepids:
             try:
-                yield self._identity_handler.unbind_threepid(
+                result = yield self._identity_handler.try_unbind_threepid(
                     user_id,
                     {
                         'medium': threepid['medium'],
                         'address': threepid['address'],
                     },
                 )
+                identity_server_supports_unbinding &= result
             except Exception:
                 # Do we want this to be a fatal error or should we carry on?
                 logger.exception("Failed to remove threepid from ID server")
@@ -103,6 +110,8 @@ class DeactivateAccountHandler(BaseHandler):
         # parts users from rooms (if it isn't already running)
         self._start_user_parting()
 
+        defer.returnValue(identity_server_supports_unbinding)
+
     def _start_user_parting(self):
         """
         Start the process that goes through the table of users
@@ -112,7 +121,7 @@ class DeactivateAccountHandler(BaseHandler):
             None
         """
         if not self._user_parter_running:
-            run_in_background(self._user_parter_loop)
+            run_as_background_process("user_parter_loop", self._user_parter_loop)
 
     @defer.inlineCallbacks
     def _user_parter_loop(self):
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 2d44f15da3..9e017116a9 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -23,7 +23,7 @@ from synapse.api.constants import EventTypes
 from synapse.api.errors import FederationDeniedError
 from synapse.types import RoomStreamToken, get_domain_from_id
 from synapse.util import stringutils
-from synapse.util.async import Linearizer
+from synapse.util.async_helpers import Linearizer
 from synapse.util.caches.expiringcache import ExpiringCache
 from synapse.util.metrics import measure_func
 from synapse.util.retryutils import NotRetryingDestination
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index ef866da1b6..0699731c13 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -20,7 +20,14 @@ import string
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes
-from synapse.api.errors import AuthError, CodeMessageException, Codes, SynapseError
+from synapse.api.errors import (
+    AuthError,
+    CodeMessageException,
+    Codes,
+    NotFoundError,
+    StoreError,
+    SynapseError,
+)
 from synapse.types import RoomAlias, UserID, get_domain_from_id
 
 from ._base import BaseHandler
@@ -36,6 +43,7 @@ class DirectoryHandler(BaseHandler):
         self.state = hs.get_state_handler()
         self.appservice_handler = hs.get_application_service_handler()
         self.event_creation_handler = hs.get_event_creation_handler()
+        self.config = hs.config
 
         self.federation = hs.get_federation_client()
         hs.get_federation_registry().register_query_handler(
@@ -73,43 +81,96 @@ class DirectoryHandler(BaseHandler):
         )
 
     @defer.inlineCallbacks
-    def create_association(self, user_id, room_alias, room_id, servers=None):
-        # association creation for human users
-        # TODO(erikj): Do user auth.
+    def create_association(self, requester, room_alias, room_id, servers=None,
+                           send_event=True):
+        """Attempt to create a new alias
 
-        if not self.spam_checker.user_may_create_room_alias(user_id, room_alias):
-            raise SynapseError(
-                403, "This user is not permitted to create this alias",
-            )
+        Args:
+            requester (Requester)
+            room_alias (RoomAlias)
+            room_id (str)
+            servers (list[str]|None): List of servers that others servers
+                should try and join via
+            send_event (bool): Whether to send an updated m.room.aliases event
 
-        can_create = yield self.can_modify_alias(
-            room_alias,
-            user_id=user_id
-        )
-        if not can_create:
-            raise SynapseError(
-                400, "This alias is reserved by an application service.",
-                errcode=Codes.EXCLUSIVE
+        Returns:
+            Deferred
+        """
+
+        user_id = requester.user.to_string()
+
+        service = requester.app_service
+        if service:
+            if not service.is_interested_in_alias(room_alias.to_string()):
+                raise SynapseError(
+                    400, "This application service has not reserved"
+                    " this kind of alias.", errcode=Codes.EXCLUSIVE
+                )
+        else:
+            if not self.spam_checker.user_may_create_room_alias(user_id, room_alias):
+                raise AuthError(
+                    403, "This user is not permitted to create this alias",
+                )
+
+            if not self.config.is_alias_creation_allowed(user_id, room_alias.to_string()):
+                # Lets just return a generic message, as there may be all sorts of
+                # reasons why we said no. TODO: Allow configurable error messages
+                # per alias creation rule?
+                raise SynapseError(
+                    403, "Not allowed to create alias",
+                )
+
+            can_create = yield self.can_modify_alias(
+                room_alias,
+                user_id=user_id
             )
+            if not can_create:
+                raise AuthError(
+                    400, "This alias is reserved by an application service.",
+                    errcode=Codes.EXCLUSIVE
+                )
+
         yield self._create_association(room_alias, room_id, servers, creator=user_id)
+        if send_event:
+            yield self.send_room_alias_update_event(
+                requester,
+                room_id
+            )
 
     @defer.inlineCallbacks
-    def create_appservice_association(self, service, room_alias, room_id,
-                                      servers=None):
-        if not service.is_interested_in_alias(room_alias.to_string()):
-            raise SynapseError(
-                400, "This application service has not reserved"
-                " this kind of alias.", errcode=Codes.EXCLUSIVE
-            )
+    def delete_association(self, requester, room_alias, send_event=True):
+        """Remove an alias from the directory
 
-        # association creation for app services
-        yield self._create_association(room_alias, room_id, servers)
+        (this is only meant for human users; AS users should call
+        delete_appservice_association)
 
-    @defer.inlineCallbacks
-    def delete_association(self, requester, user_id, room_alias):
-        # association deletion for human users
+        Args:
+            requester (Requester):
+            room_alias (RoomAlias):
+            send_event (bool): Whether to send an updated m.room.aliases event.
+                Note that, if we delete the canonical alias, we will always attempt
+                to send an m.room.canonical_alias event
+
+        Returns:
+            Deferred[unicode]: room id that the alias used to point to
+
+        Raises:
+            NotFoundError: if the alias doesn't exist
+
+            AuthError: if the user doesn't have perms to delete the alias (ie, the user
+                is neither the creator of the alias, nor a server admin.
+
+            SynapseError: if the alias belongs to an AS
+        """
+        user_id = requester.user.to_string()
+
+        try:
+            can_delete = yield self._user_can_delete_alias(room_alias, user_id)
+        except StoreError as e:
+            if e.code == 404:
+                raise NotFoundError("Unknown room alias")
+            raise
 
-        can_delete = yield self._user_can_delete_alias(room_alias, user_id)
         if not can_delete:
             raise AuthError(
                 403, "You don't have permission to delete the alias.",
@@ -128,11 +189,11 @@ class DirectoryHandler(BaseHandler):
         room_id = yield self._delete_association(room_alias)
 
         try:
-            yield self.send_room_alias_update_event(
-                requester,
-                requester.user.to_string(),
-                room_id
-            )
+            if send_event:
+                yield self.send_room_alias_update_event(
+                    requester,
+                    room_id
+                )
 
             yield self._update_canonical_alias(
                 requester,
@@ -248,7 +309,7 @@ class DirectoryHandler(BaseHandler):
             )
 
     @defer.inlineCallbacks
-    def send_room_alias_update_event(self, requester, user_id, room_id):
+    def send_room_alias_update_event(self, requester, room_id):
         aliases = yield self.store.get_aliases_for_room(room_id)
 
         yield self.event_creation_handler.create_and_send_nonmember_event(
@@ -257,7 +318,7 @@ class DirectoryHandler(BaseHandler):
                 "type": EventTypes.Aliases,
                 "state_key": self.hs.hostname,
                 "room_id": room_id,
-                "sender": user_id,
+                "sender": requester.user.to_string(),
                 "content": {"aliases": aliases},
             },
             ratelimit=False
@@ -320,7 +381,7 @@ class DirectoryHandler(BaseHandler):
     def _user_can_delete_alias(self, alias, user_id):
         creator = yield self.store.get_room_alias_creator(alias.to_string())
 
-        if creator and creator == user_id:
+        if creator is not None and creator == user_id:
             defer.returnValue(True)
 
         is_admin = yield self.auth.is_server_admin(UserID.from_string(user_id))
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 5816bf8b4f..9dc46aa15f 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -330,7 +330,8 @@ class E2eKeysHandler(object):
                         (algorithm, key_id, ex_json, key)
                     )
             else:
-                new_keys.append((algorithm, key_id, encode_canonical_json(key)))
+                new_keys.append((
+                    algorithm, key_id, encode_canonical_json(key).decode('ascii')))
 
         yield self.store.add_e2e_one_time_keys(
             user_id, device_id, time_now, new_keys
@@ -340,7 +341,7 @@ class E2eKeysHandler(object):
 def _exception_to_failure(e):
     if isinstance(e, CodeMessageException):
         return {
-            "status": e.code, "message": e.message,
+            "status": e.code, "message": str(e),
         }
 
     if isinstance(e, NotRetryingDestination):
@@ -358,7 +359,7 @@ def _exception_to_failure(e):
     # Note that some Exceptions (notably twisted's ResponseFailed etc) don't
     # give a string for e.message, which json then fails to serialize.
     return {
-        "status": 503, "message": str(e.message),
+        "status": 503, "message": str(e),
     }
 
 
diff --git a/synapse/handlers/e2e_room_keys.py b/synapse/handlers/e2e_room_keys.py
new file mode 100644
index 0000000000..42b040375f
--- /dev/null
+++ b/synapse/handlers/e2e_room_keys.py
@@ -0,0 +1,297 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017, 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from six import iteritems
+
+from twisted.internet import defer
+
+from synapse.api.errors import NotFoundError, RoomKeysVersionError, StoreError
+from synapse.util.async_helpers import Linearizer
+
+logger = logging.getLogger(__name__)
+
+
+class E2eRoomKeysHandler(object):
+    """
+    Implements an optional realtime backup mechanism for encrypted E2E megolm room keys.
+    This gives a way for users to store and recover their megolm keys if they lose all
+    their clients. It should also extend easily to future room key mechanisms.
+    The actual payload of the encrypted keys is completely opaque to the handler.
+    """
+
+    def __init__(self, hs):
+        self.store = hs.get_datastore()
+
+        # Used to lock whenever a client is uploading key data.  This prevents collisions
+        # between clients trying to upload the details of a new session, given all
+        # clients belonging to a user will receive and try to upload a new session at
+        # roughly the same time.  Also used to lock out uploads when the key is being
+        # changed.
+        self._upload_linearizer = Linearizer("upload_room_keys_lock")
+
+    @defer.inlineCallbacks
+    def get_room_keys(self, user_id, version, room_id=None, session_id=None):
+        """Bulk get the E2E room keys for a given backup, optionally filtered to a given
+        room, or a given session.
+        See EndToEndRoomKeyStore.get_e2e_room_keys for full details.
+
+        Args:
+            user_id(str): the user whose keys we're getting
+            version(str): the version ID of the backup we're getting keys from
+            room_id(string): room ID to get keys for, for None to get keys for all rooms
+            session_id(string): session ID to get keys for, for None to get keys for all
+                sessions
+        Raises:
+            NotFoundError: if the backup version does not exist
+        Returns:
+            A deferred list of dicts giving the session_data and message metadata for
+            these room keys.
+        """
+
+        # we deliberately take the lock to get keys so that changing the version
+        # works atomically
+        with (yield self._upload_linearizer.queue(user_id)):
+            # make sure the backup version exists
+            try:
+                yield self.store.get_e2e_room_keys_version_info(user_id, version)
+            except StoreError as e:
+                if e.code == 404:
+                    raise NotFoundError("Unknown backup version")
+                else:
+                    raise
+
+            results = yield self.store.get_e2e_room_keys(
+                user_id, version, room_id, session_id
+            )
+
+            defer.returnValue(results)
+
+    @defer.inlineCallbacks
+    def delete_room_keys(self, user_id, version, room_id=None, session_id=None):
+        """Bulk delete the E2E room keys for a given backup, optionally filtered to a given
+        room or a given session.
+        See EndToEndRoomKeyStore.delete_e2e_room_keys for full details.
+
+        Args:
+            user_id(str): the user whose backup we're deleting
+            version(str): the version ID of the backup we're deleting
+            room_id(string): room ID to delete keys for, for None to delete keys for all
+                rooms
+            session_id(string): session ID to delete keys for, for None to delete keys
+                for all sessions
+        Returns:
+            A deferred of the deletion transaction
+        """
+
+        # lock for consistency with uploading
+        with (yield self._upload_linearizer.queue(user_id)):
+            yield self.store.delete_e2e_room_keys(user_id, version, room_id, session_id)
+
+    @defer.inlineCallbacks
+    def upload_room_keys(self, user_id, version, room_keys):
+        """Bulk upload a list of room keys into a given backup version, asserting
+        that the given version is the current backup version.  room_keys are merged
+        into the current backup as described in RoomKeysServlet.on_PUT().
+
+        Args:
+            user_id(str): the user whose backup we're setting
+            version(str): the version ID of the backup we're updating
+            room_keys(dict): a nested dict describing the room_keys we're setting:
+
+        {
+            "rooms": {
+                "!abc:matrix.org": {
+                    "sessions": {
+                        "c0ff33": {
+                            "first_message_index": 1,
+                            "forwarded_count": 1,
+                            "is_verified": false,
+                            "session_data": "SSBBTSBBIEZJU0gK"
+                        }
+                    }
+                }
+            }
+        }
+
+        Raises:
+            NotFoundError: if there are no versions defined
+            RoomKeysVersionError: if the uploaded version is not the current version
+        """
+
+        # TODO: Validate the JSON to make sure it has the right keys.
+
+        # XXX: perhaps we should use a finer grained lock here?
+        with (yield self._upload_linearizer.queue(user_id)):
+
+            # Check that the version we're trying to upload is the current version
+            try:
+                version_info = yield self.store.get_e2e_room_keys_version_info(user_id)
+            except StoreError as e:
+                if e.code == 404:
+                    raise NotFoundError("Version '%s' not found" % (version,))
+                else:
+                    raise
+
+            if version_info['version'] != version:
+                # Check that the version we're trying to upload actually exists
+                try:
+                    version_info = yield self.store.get_e2e_room_keys_version_info(
+                        user_id, version,
+                    )
+                    # if we get this far, the version must exist
+                    raise RoomKeysVersionError(current_version=version_info['version'])
+                except StoreError as e:
+                    if e.code == 404:
+                        raise NotFoundError("Version '%s' not found" % (version,))
+                    else:
+                        raise
+
+            # go through the room_keys.
+            # XXX: this should/could be done concurrently, given we're in a lock.
+            for room_id, room in iteritems(room_keys['rooms']):
+                for session_id, session in iteritems(room['sessions']):
+                    yield self._upload_room_key(
+                        user_id, version, room_id, session_id, session
+                    )
+
+    @defer.inlineCallbacks
+    def _upload_room_key(self, user_id, version, room_id, session_id, room_key):
+        """Upload a given room_key for a given room and session into a given
+        version of the backup.  Merges the key with any which might already exist.
+
+        Args:
+            user_id(str): the user whose backup we're setting
+            version(str): the version ID of the backup we're updating
+            room_id(str): the ID of the room whose keys we're setting
+            session_id(str): the session whose room_key we're setting
+            room_key(dict): the room_key being set
+        """
+
+        # get the room_key for this particular row
+        current_room_key = None
+        try:
+            current_room_key = yield self.store.get_e2e_room_key(
+                user_id, version, room_id, session_id
+            )
+        except StoreError as e:
+            if e.code == 404:
+                pass
+            else:
+                raise
+
+        if self._should_replace_room_key(current_room_key, room_key):
+            yield self.store.set_e2e_room_key(
+                user_id, version, room_id, session_id, room_key
+            )
+
+    @staticmethod
+    def _should_replace_room_key(current_room_key, room_key):
+        """
+        Determine whether to replace a given current_room_key (if any)
+        with a newly uploaded room_key backup
+
+        Args:
+            current_room_key (dict): Optional, the current room_key dict if any
+            room_key (dict): The new room_key dict which may or may not be fit to
+                replace the current_room_key
+
+        Returns:
+            True if current_room_key should be replaced by room_key in the backup
+        """
+
+        if current_room_key:
+            # spelt out with if/elifs rather than nested boolean expressions
+            # purely for legibility.
+
+            if room_key['is_verified'] and not current_room_key['is_verified']:
+                return True
+            elif (
+                room_key['first_message_index'] <
+                current_room_key['first_message_index']
+            ):
+                return True
+            elif room_key['forwarded_count'] < current_room_key['forwarded_count']:
+                return True
+            else:
+                return False
+        return True
+
+    @defer.inlineCallbacks
+    def create_version(self, user_id, version_info):
+        """Create a new backup version.  This automatically becomes the new
+        backup version for the user's keys; previous backups will no longer be
+        writeable to.
+
+        Args:
+            user_id(str): the user whose backup version we're creating
+            version_info(dict): metadata about the new version being created
+
+        {
+            "algorithm": "m.megolm_backup.v1",
+            "auth_data": "dGhpcyBzaG91bGQgYWN0dWFsbHkgYmUgZW5jcnlwdGVkIGpzb24K"
+        }
+
+        Returns:
+            A deferred of a string that gives the new version number.
+        """
+
+        # TODO: Validate the JSON to make sure it has the right keys.
+
+        # lock everyone out until we've switched version
+        with (yield self._upload_linearizer.queue(user_id)):
+            new_version = yield self.store.create_e2e_room_keys_version(
+                user_id, version_info
+            )
+            defer.returnValue(new_version)
+
+    @defer.inlineCallbacks
+    def get_version_info(self, user_id, version=None):
+        """Get the info about a given version of the user's backup
+
+        Args:
+            user_id(str): the user whose current backup version we're querying
+            version(str): Optional; if None gives the most recent version
+                otherwise a historical one.
+        Raises:
+            StoreError: code 404 if the requested backup version doesn't exist
+        Returns:
+            A deferred of a info dict that gives the info about the new version.
+
+        {
+            "version": "1234",
+            "algorithm": "m.megolm_backup.v1",
+            "auth_data": "dGhpcyBzaG91bGQgYWN0dWFsbHkgYmUgZW5jcnlwdGVkIGpzb24K"
+        }
+        """
+
+        with (yield self._upload_linearizer.queue(user_id)):
+            res = yield self.store.get_e2e_room_keys_version_info(user_id, version)
+            defer.returnValue(res)
+
+    @defer.inlineCallbacks
+    def delete_version(self, user_id, version=None):
+        """Deletes a given version of the user's e2e_room_keys backup
+
+        Args:
+            user_id(str): the user whose current backup version we're deleting
+            version(str): the version id of the backup being deleted
+        Raises:
+            StoreError: code 404 if this backup version doesn't exist
+        """
+
+        with (yield self._upload_linearizer.queue(user_id)):
+            yield self.store.delete_e2e_room_keys_version(user_id, version)
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 533b82c783..a3bb864bb2 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -18,7 +18,6 @@
 
 import itertools
 import logging
-import sys
 
 import six
 from six import iteritems, itervalues
@@ -30,7 +29,12 @@ from unpaddedbase64 import decode_base64
 
 from twisted.internet import defer
 
-from synapse.api.constants import EventTypes, Membership, RejectedReason
+from synapse.api.constants import (
+    KNOWN_ROOM_VERSIONS,
+    EventTypes,
+    Membership,
+    RejectedReason,
+)
 from synapse.api.errors import (
     AuthError,
     CodeMessageException,
@@ -44,10 +48,15 @@ from synapse.crypto.event_signing import (
     compute_event_signature,
 )
 from synapse.events.validator import EventValidator
-from synapse.state import resolve_events_with_factory
+from synapse.replication.http.federation import (
+    ReplicationCleanRoomRestServlet,
+    ReplicationFederationSendEventsRestServlet,
+)
+from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet
+from synapse.state import StateResolutionStore, resolve_events_with_store
 from synapse.types import UserID, get_domain_from_id
 from synapse.util import logcontext, unwrapFirstError
-from synapse.util.async import Linearizer
+from synapse.util.async_helpers import Linearizer
 from synapse.util.distributor import user_joined_room
 from synapse.util.frozenutils import unfreeze
 from synapse.util.logutils import log_function
@@ -59,6 +68,27 @@ from ._base import BaseHandler
 logger = logging.getLogger(__name__)
 
 
+def shortstr(iterable, maxitems=5):
+    """If iterable has maxitems or fewer, return the stringification of a list
+    containing those items.
+
+    Otherwise, return the stringification of a a list with the first maxitems items,
+    followed by "...".
+
+    Args:
+        iterable (Iterable): iterable to truncate
+        maxitems (int): number of items to return before truncating
+
+    Returns:
+        unicode
+    """
+
+    items = list(itertools.islice(iterable, maxitems + 1))
+    if len(items) <= maxitems:
+        return str(items)
+    return u"[" + u", ".join(repr(r) for r in items[:maxitems]) + u", ...]"
+
+
 class FederationHandler(BaseHandler):
     """Handles events that originated from federation.
         Responsible for:
@@ -75,7 +105,7 @@ class FederationHandler(BaseHandler):
 
         self.hs = hs
 
-        self.store = hs.get_datastore()
+        self.store = hs.get_datastore()  # type: synapse.storage.DataStore
         self.federation_client = hs.get_federation_client()
         self.state_handler = hs.get_state_handler()
         self.server_name = hs.hostname
@@ -86,15 +116,26 @@ class FederationHandler(BaseHandler):
         self.spam_checker = hs.get_spam_checker()
         self.event_creation_handler = hs.get_event_creation_handler()
         self._server_notices_mxid = hs.config.server_notices_mxid
+        self.config = hs.config
+        self.http_client = hs.get_simple_http_client()
+
+        self._send_events_to_master = (
+            ReplicationFederationSendEventsRestServlet.make_client(hs)
+        )
+        self._notify_user_membership_change = (
+            ReplicationUserJoinedLeftRoomRestServlet.make_client(hs)
+        )
+        self._clean_room_for_join_client = (
+            ReplicationCleanRoomRestServlet.make_client(hs)
+        )
 
         # When joining a room we need to queue any events for that room up
         self.room_queues = {}
         self._room_pdu_linearizer = Linearizer("fed_room_pdu")
 
     @defer.inlineCallbacks
-    @log_function
     def on_receive_pdu(
-            self, origin, pdu, get_missing=True, sent_to_us_directly=False,
+            self, origin, pdu, sent_to_us_directly=False,
     ):
         """ Process a PDU received via a federation /send/ transaction, or
         via backfill of missing prev_events
@@ -103,14 +144,23 @@ class FederationHandler(BaseHandler):
             origin (str): server which initiated the /send/ transaction. Will
                 be used to fetch missing events or state.
             pdu (FrozenEvent): received PDU
-            get_missing (bool): True if we should fetch missing prev_events
+            sent_to_us_directly (bool): True if this event was pushed to us; False if
+                we pulled it as the result of a missing prev_event.
 
         Returns (Deferred): completes with None
         """
 
+        room_id = pdu.room_id
+        event_id = pdu.event_id
+
+        logger.info(
+            "[%s %s] handling received PDU: %s",
+            room_id, event_id, pdu,
+        )
+
         # We reprocess pdus when we have seen them only as outliers
         existing = yield self.store.get_event(
-            pdu.event_id,
+            event_id,
             allow_none=True,
             allow_rejected=True,
         )
@@ -125,7 +175,7 @@ class FederationHandler(BaseHandler):
             )
         )
         if already_seen:
-            logger.debug("Already seen pdu %s", pdu.event_id)
+            logger.debug("[%s %s]: Already seen pdu", room_id, event_id)
             return
 
         # do some initial sanity-checking of the event. In particular, make
@@ -134,6 +184,7 @@ class FederationHandler(BaseHandler):
         try:
             self._sanity_check_event(pdu)
         except SynapseError as err:
+            logger.warn("[%s %s] Received event failed sanity checks", room_id, event_id)
             raise FederationError(
                 "ERROR",
                 err.code,
@@ -143,33 +194,30 @@ class FederationHandler(BaseHandler):
 
         # If we are currently in the process of joining this room, then we
         # queue up events for later processing.
-        if pdu.room_id in self.room_queues:
-            logger.info("Ignoring PDU %s for room %s from %s for now; join "
-                        "in progress", pdu.event_id, pdu.room_id, origin)
-            self.room_queues[pdu.room_id].append((pdu, origin))
+        if room_id in self.room_queues:
+            logger.info(
+                "[%s %s] Queuing PDU from %s for now: join in progress",
+                room_id, event_id, origin,
+            )
+            self.room_queues[room_id].append((pdu, origin))
             return
 
-        # If we're no longer in the room just ditch the event entirely. This
-        # is probably an old server that has come back and thinks we're still
-        # in the room (or we've been rejoined to the room by a state reset).
+        # If we're not in the room just ditch the event entirely. This is
+        # probably an old server that has come back and thinks we're still in
+        # the room (or we've been rejoined to the room by a state reset).
         #
-        # If we were never in the room then maybe our database got vaped and
-        # we should check if we *are* in fact in the room. If we are then we
-        # can magically rejoin the room.
+        # Note that if we were never in the room then we would have already
+        # dropped the event, since we wouldn't know the room version.
         is_in_room = yield self.auth.check_host_in_room(
-            pdu.room_id,
+            room_id,
             self.server_name
         )
         if not is_in_room:
-            was_in_room = yield self.store.was_host_joined(
-                pdu.room_id, self.server_name,
+            logger.info(
+                "[%s %s] Ignoring PDU from %s as we're not in the room",
+                room_id, event_id, origin,
             )
-            if was_in_room:
-                logger.info(
-                    "Ignoring PDU %s for room %s from %s as we've left the room!",
-                    pdu.event_id, pdu.room_id, origin,
-                )
-                defer.returnValue(None)
+            defer.returnValue(None)
 
         state = None
         auth_chain = []
@@ -182,11 +230,11 @@ class FederationHandler(BaseHandler):
             )
 
             logger.debug(
-                "_handle_new_pdu min_depth for %s: %d",
-                pdu.room_id, min_depth
+                "[%s %s] min_depth: %d",
+                room_id, event_id, min_depth,
             )
 
-            prevs = {e_id for e_id, _ in pdu.prev_events}
+            prevs = set(pdu.prev_event_ids())
             seen = yield self.store.have_seen_events(prevs)
 
             if min_depth and pdu.depth < min_depth:
@@ -196,17 +244,18 @@ class FederationHandler(BaseHandler):
                 # send to the clients.
                 pdu.internal_metadata.outlier = True
             elif min_depth and pdu.depth > min_depth:
-                if get_missing and prevs - seen:
+                missing_prevs = prevs - seen
+                if sent_to_us_directly and missing_prevs:
                     # If we're missing stuff, ensure we only fetch stuff one
                     # at a time.
                     logger.info(
-                        "Acquiring lock for room %r to fetch %d missing events: %r...",
-                        pdu.room_id, len(prevs - seen), list(prevs - seen)[:5],
+                        "[%s %s] Acquiring room lock to fetch %d missing prev_events: %s",
+                        room_id, event_id, len(missing_prevs), shortstr(missing_prevs),
                     )
                     with (yield self._room_pdu_linearizer.queue(pdu.room_id)):
                         logger.info(
-                            "Acquired lock for room %r to fetch %d missing events",
-                            pdu.room_id, len(prevs - seen),
+                            "[%s %s] Acquired room lock to fetch %d missing prev_events",
+                            room_id, event_id, len(missing_prevs),
                         )
 
                         yield self._get_missing_events_for_pdu(
@@ -219,68 +268,150 @@ class FederationHandler(BaseHandler):
 
                         if not prevs - seen:
                             logger.info(
-                                "Found all missing prev events for %s", pdu.event_id
+                                "[%s %s] Found all missing prev_events",
+                                room_id, event_id,
                             )
-                elif prevs - seen:
+                elif missing_prevs:
                     logger.info(
-                        "Not fetching %d missing events for room %r,event %s: %r...",
-                        len(prevs - seen), pdu.room_id, pdu.event_id,
-                        list(prevs - seen)[:5],
+                        "[%s %s] Not recursively fetching %d missing prev_events: %s",
+                        room_id, event_id, len(missing_prevs), shortstr(missing_prevs),
                     )
 
-            if sent_to_us_directly and prevs - seen:
-                # If they have sent it to us directly, and the server
-                # isn't telling us about the auth events that it's
-                # made a message referencing, we explode
-                raise FederationError(
-                    "ERROR",
-                    403,
-                    (
-                        "Your server isn't divulging details about prev_events "
-                        "referenced in this event."
-                    ),
-                    affected=pdu.event_id,
-                )
-            elif prevs - seen:
-                # Calculate the state of the previous events, and
-                # de-conflict them to find the current state.
-                state_groups = []
+            if prevs - seen:
+                # We've still not been able to get all of the prev_events for this event.
+                #
+                # In this case, we need to fall back to asking another server in the
+                # federation for the state at this event. That's ok provided we then
+                # resolve the state against other bits of the DAG before using it (which
+                # will ensure that you can't just take over a room by sending an event,
+                # withholding its prev_events, and declaring yourself to be an admin in
+                # the subsequent state request).
+                #
+                # Now, if we're pulling this event as a missing prev_event, then clearly
+                # this event is not going to become the only forward-extremity and we are
+                # guaranteed to resolve its state against our existing forward
+                # extremities, so that should be fine.
+                #
+                # On the other hand, if this event was pushed to us, it is possible for
+                # it to become the only forward-extremity in the room, and we would then
+                # trust its state to be the state for the whole room. This is very bad.
+                # Further, if the event was pushed to us, there is no excuse for us not to
+                # have all the prev_events. We therefore reject any such events.
+                #
+                # XXX this really feels like it could/should be merged with the above,
+                # but there is an interaction with min_depth that I'm not really
+                # following.
+
+                if sent_to_us_directly:
+                    logger.warn(
+                        "[%s %s] Rejecting: failed to fetch %d prev events: %s",
+                        room_id, event_id, len(prevs - seen), shortstr(prevs - seen)
+                    )
+                    raise FederationError(
+                        "ERROR",
+                        403,
+                        (
+                            "Your server isn't divulging details about prev_events "
+                            "referenced in this event."
+                        ),
+                        affected=pdu.event_id,
+                    )
+
+                # Calculate the state after each of the previous events, and
+                # resolve them to find the correct state at the current event.
                 auth_chains = set()
+                event_map = {
+                    event_id: pdu,
+                }
                 try:
                     # Get the state of the events we know about
-                    ours = yield self.store.get_state_groups(pdu.room_id, list(seen))
-                    state_groups.append(ours)
+                    ours = yield self.store.get_state_groups_ids(room_id, seen)
+
+                    # state_maps is a list of mappings from (type, state_key) to event_id
+                    # type: list[dict[tuple[str, str], str]]
+                    state_maps = list(ours.values())
+
+                    # we don't need this any more, let's delete it.
+                    del ours
 
                     # Ask the remote server for the states we don't
                     # know about
                     for p in prevs - seen:
-                        state, got_auth_chain = (
-                            yield self.federation_client.get_state_for_room(
-                                origin, pdu.room_id, p
-                            )
-                        )
-                        auth_chains.update(got_auth_chain)
-                        state_group = {(x.type, x.state_key): x.event_id for x in state}
-                        state_groups.append(state_group)
-
-                    # Resolve any conflicting state
-                    def fetch(ev_ids):
-                        return self.store.get_events(
-                            ev_ids, get_prev_content=False, check_redacted=False
+                        logger.info(
+                            "[%s %s] Requesting state at missing prev_event %s",
+                            room_id, event_id, p,
                         )
 
-                    state_map = yield resolve_events_with_factory(
-                        state_groups, {pdu.event_id: pdu}, fetch
+                        with logcontext.nested_logging_context(p):
+                            # note that if any of the missing prevs share missing state or
+                            # auth events, the requests to fetch those events are deduped
+                            # by the get_pdu_cache in federation_client.
+                            remote_state, got_auth_chain = (
+                                yield self.federation_client.get_state_for_room(
+                                    origin, room_id, p,
+                                )
+                            )
+
+                            # we want the state *after* p; get_state_for_room returns the
+                            # state *before* p.
+                            remote_event = yield self.federation_client.get_pdu(
+                                [origin], p, outlier=True,
+                            )
+
+                            if remote_event is None:
+                                raise Exception(
+                                    "Unable to get missing prev_event %s" % (p, )
+                                )
+
+                            if remote_event.is_state():
+                                remote_state.append(remote_event)
+
+                            # XXX hrm I'm not convinced that duplicate events will compare
+                            # for equality, so I'm not sure this does what the author
+                            # hoped.
+                            auth_chains.update(got_auth_chain)
+
+                            remote_state_map = {
+                                (x.type, x.state_key): x.event_id for x in remote_state
+                            }
+                            state_maps.append(remote_state_map)
+
+                            for x in remote_state:
+                                event_map[x.event_id] = x
+
+                    room_version = yield self.store.get_room_version(room_id)
+                    state_map = yield resolve_events_with_store(
+                        room_version, state_maps, event_map,
+                        state_res_store=StateResolutionStore(self.store),
+                    )
+
+                    # We need to give _process_received_pdu the actual state events
+                    # rather than event ids, so generate that now.
+
+                    # First though we need to fetch all the events that are in
+                    # state_map, so we can build up the state below.
+                    evs = yield self.store.get_events(
+                        list(state_map.values()),
+                        get_prev_content=False,
+                        check_redacted=False,
                     )
+                    event_map.update(evs)
 
-                    state = (yield self.store.get_events(state_map.values())).values()
+                    state = [
+                        event_map[e] for e in six.itervalues(state_map)
+                    ]
                     auth_chain = list(auth_chains)
                 except Exception:
+                    logger.warn(
+                        "[%s %s] Error attempting to resolve state at missing "
+                        "prev_events",
+                        room_id, event_id, exc_info=True,
+                    )
                     raise FederationError(
                         "ERROR",
                         403,
                         "We can't get valid state history.",
-                        affected=pdu.event_id,
+                        affected=event_id,
                     )
 
         yield self._process_received_pdu(
@@ -299,15 +430,16 @@ class FederationHandler(BaseHandler):
             prevs (set(str)): List of event ids which we are missing
             min_depth (int): Minimum depth of events to return.
         """
-        # We recalculate seen, since it may have changed.
+
+        room_id = pdu.room_id
+        event_id = pdu.event_id
+
         seen = yield self.store.have_seen_events(prevs)
 
         if not prevs - seen:
             return
 
-        latest = yield self.store.get_latest_event_ids_in_room(
-            pdu.room_id
-        )
+        latest = yield self.store.get_latest_event_ids_in_room(room_id)
 
         # We add the prev events that we have seen to the latest
         # list to ensure the remote server doesn't give them to us
@@ -315,8 +447,8 @@ class FederationHandler(BaseHandler):
         latest |= seen
 
         logger.info(
-            "Missing %d events for room %r pdu %s: %r...",
-            len(prevs - seen), pdu.room_id, pdu.event_id, list(prevs - seen)[:5]
+            "[%s %s]: Requesting missing events between %s and %s",
+            room_id, event_id, shortstr(latest), event_id,
         )
 
         # XXX: we set timeout to 10s to help workaround
@@ -337,132 +469,144 @@ class FederationHandler(BaseHandler):
         # apparently.
         #
         # see https://github.com/matrix-org/synapse/pull/1744
+        #
+        # ----
+        #
+        # Update richvdh 2018/09/18: There are a number of problems with timing this
+        # request out agressively on the client side:
+        #
+        # - it plays badly with the server-side rate-limiter, which starts tarpitting you
+        #   if you send too many requests at once, so you end up with the server carefully
+        #   working through the backlog of your requests, which you have already timed
+        #   out.
+        #
+        # - for this request in particular, we now (as of
+        #   https://github.com/matrix-org/synapse/pull/3456) reject any PDUs where the
+        #   server can't produce a plausible-looking set of prev_events - so we becone
+        #   much more likely to reject the event.
+        #
+        # - contrary to what it says above, we do *not* fall back to fetching fresh state
+        #   for the room if get_missing_events times out. Rather, we give up processing
+        #   the PDU whose prevs we are missing, which then makes it much more likely that
+        #   we'll end up back here for the *next* PDU in the list, which exacerbates the
+        #   problem.
+        #
+        # - the agressive 10s timeout was introduced to deal with incoming federation
+        #   requests taking 8 hours to process. It's not entirely clear why that was going
+        #   on; certainly there were other issues causing traffic storms which are now
+        #   resolved, and I think in any case we may be more sensible about our locking
+        #   now. We're *certainly* more sensible about our logging.
+        #
+        # All that said: Let's try increasing the timout to 60s and see what happens.
 
         missing_events = yield self.federation_client.get_missing_events(
             origin,
-            pdu.room_id,
+            room_id,
             earliest_events_ids=list(latest),
             latest_events=[pdu],
             limit=10,
             min_depth=min_depth,
-            timeout=10000,
+            timeout=60000,
         )
 
         logger.info(
-            "Got %d events: %r...",
-            len(missing_events), [e.event_id for e in missing_events[:5]]
+            "[%s %s]: Got %d prev_events: %s",
+            room_id, event_id, len(missing_events), shortstr(missing_events),
         )
 
         # We want to sort these by depth so we process them and
         # tell clients about them in order.
         missing_events.sort(key=lambda x: x.depth)
 
-        for e in missing_events:
-            logger.info("Handling found event %s", e.event_id)
-            try:
-                yield self.on_receive_pdu(
-                    origin,
-                    e,
-                    get_missing=False
-                )
-            except FederationError as e:
-                if e.code == 403:
-                    logger.warn("Event %s failed history check.")
-                else:
-                    raise
+        for ev in missing_events:
+            logger.info(
+                "[%s %s] Handling received prev_event %s",
+                room_id, event_id, ev.event_id,
+            )
+            with logcontext.nested_logging_context(ev.event_id):
+                try:
+                    yield self.on_receive_pdu(
+                        origin,
+                        ev,
+                        sent_to_us_directly=False,
+                    )
+                except FederationError as e:
+                    if e.code == 403:
+                        logger.warn(
+                            "[%s %s] Received prev_event %s failed history check.",
+                            room_id, event_id, ev.event_id,
+                        )
+                    else:
+                        raise
 
-    @log_function
     @defer.inlineCallbacks
-    def _process_received_pdu(self, origin, pdu, state, auth_chain):
+    def _process_received_pdu(self, origin, event, state, auth_chain):
         """ Called when we have a new pdu. We need to do auth checks and put it
         through the StateHandler.
         """
-        event = pdu
+        room_id = event.room_id
+        event_id = event.event_id
 
-        logger.debug("Processing event: %s", event)
+        logger.debug(
+            "[%s %s] Processing event: %s",
+            room_id, event_id, event,
+        )
 
-        # FIXME (erikj): Awful hack to make the case where we are not currently
-        # in the room work
-        # If state and auth_chain are None, then we don't need to do this check
-        # as we already know we have enough state in the DB to handle this
-        # event.
-        if state and auth_chain and not event.internal_metadata.is_outlier():
-            is_in_room = yield self.auth.check_host_in_room(
-                event.room_id,
-                self.server_name
-            )
-        else:
-            is_in_room = True
-        if not is_in_room:
-            logger.info(
-                "Got event for room we're not in: %r %r",
-                event.room_id, event.event_id
-            )
+        event_ids = set()
+        if state:
+            event_ids |= {e.event_id for e in state}
+        if auth_chain:
+            event_ids |= {e.event_id for e in auth_chain}
 
-            try:
-                yield self._persist_auth_tree(
-                    origin, auth_chain, state, event
-                )
-            except AuthError as e:
-                raise FederationError(
-                    "ERROR",
-                    e.code,
-                    e.msg,
-                    affected=event.event_id,
-                )
+        seen_ids = yield self.store.have_seen_events(event_ids)
 
-        else:
-            event_ids = set()
-            if state:
-                event_ids |= {e.event_id for e in state}
-            if auth_chain:
-                event_ids |= {e.event_id for e in auth_chain}
+        if state and auth_chain is not None:
+            # If we have any state or auth_chain given to us by the replication
+            # layer, then we should handle them (if we haven't before.)
 
-            seen_ids = yield self.store.have_seen_events(event_ids)
+            event_infos = []
 
-            if state and auth_chain is not None:
-                # If we have any state or auth_chain given to us by the replication
-                # layer, then we should handle them (if we haven't before.)
-
-                event_infos = []
+            for e in itertools.chain(auth_chain, state):
+                if e.event_id in seen_ids:
+                    continue
+                e.internal_metadata.outlier = True
+                auth_ids = e.auth_event_ids()
+                auth = {
+                    (e.type, e.state_key): e for e in auth_chain
+                    if e.event_id in auth_ids or e.type == EventTypes.Create
+                }
+                event_infos.append({
+                    "event": e,
+                    "auth_events": auth,
+                })
+                seen_ids.add(e.event_id)
 
-                for e in itertools.chain(auth_chain, state):
-                    if e.event_id in seen_ids:
-                        continue
-                    e.internal_metadata.outlier = True
-                    auth_ids = [e_id for e_id, _ in e.auth_events]
-                    auth = {
-                        (e.type, e.state_key): e for e in auth_chain
-                        if e.event_id in auth_ids or e.type == EventTypes.Create
-                    }
-                    event_infos.append({
-                        "event": e,
-                        "auth_events": auth,
-                    })
-                    seen_ids.add(e.event_id)
-
-                yield self._handle_new_events(origin, event_infos)
+            logger.info(
+                "[%s %s] persisting newly-received auth/state events %s",
+                room_id, event_id, [e["event"].event_id for e in event_infos]
+            )
+            yield self._handle_new_events(origin, event_infos)
 
-            try:
-                context = yield self._handle_new_event(
-                    origin,
-                    event,
-                    state=state,
-                )
-            except AuthError as e:
-                raise FederationError(
-                    "ERROR",
-                    e.code,
-                    e.msg,
-                    affected=event.event_id,
-                )
+        try:
+            context = yield self._handle_new_event(
+                origin,
+                event,
+                state=state,
+            )
+        except AuthError as e:
+            raise FederationError(
+                "ERROR",
+                e.code,
+                e.msg,
+                affected=event.event_id,
+            )
 
-        room = yield self.store.get_room(event.room_id)
+        room = yield self.store.get_room(room_id)
 
         if not room:
             try:
                 yield self.store.store_room(
-                    room_id=event.room_id,
+                    room_id=room_id,
                     room_creator_user_id="",
                     is_public=False,
                 )
@@ -490,7 +634,7 @@ class FederationHandler(BaseHandler):
 
                 if newly_joined:
                     user = UserID.from_string(event.state_key)
-                    yield self.user_joined_room(user, event.room_id)
+                    yield self.user_joined_room(user, room_id)
 
     @log_function
     @defer.inlineCallbacks
@@ -545,7 +689,7 @@ class FederationHandler(BaseHandler):
         edges = [
             ev.event_id
             for ev in events
-            if set(e_id for e_id, _ in ev.prev_events) - event_ids
+            if set(ev.prev_event_ids()) - event_ids
         ]
 
         logger.info(
@@ -571,8 +715,8 @@ class FederationHandler(BaseHandler):
 
         required_auth = set(
             a_id
-            for event in events + state_events.values() + auth_events.values()
-            for a_id, _ in event.auth_events
+            for event in events + list(state_events.values()) + list(auth_events.values())
+            for a_id in event.auth_event_ids()
         )
         auth_events.update({
             e_id: event_map[e_id] for e_id in required_auth if e_id in event_map
@@ -588,7 +732,7 @@ class FederationHandler(BaseHandler):
             auth_events.update(ret_events)
 
             required_auth.update(
-                a_id for event in ret_events.values() for a_id, _ in event.auth_events
+                a_id for event in ret_events.values() for a_id in event.auth_event_ids()
             )
             missing_auth = required_auth - set(auth_events)
 
@@ -615,7 +759,7 @@ class FederationHandler(BaseHandler):
                 required_auth.update(
                     a_id
                     for event in results if event
-                    for a_id, _ in event.auth_events
+                    for a_id in event.auth_event_ids()
                 )
                 missing_auth = required_auth - set(auth_events)
 
@@ -635,7 +779,7 @@ class FederationHandler(BaseHandler):
                 "auth_events": {
                     (auth_events[a_id].type, auth_events[a_id].state_key):
                     auth_events[a_id]
-                    for a_id, _ in a.auth_events
+                    for a_id in a.auth_event_ids()
                     if a_id in auth_events
                 }
             })
@@ -647,7 +791,7 @@ class FederationHandler(BaseHandler):
                 "auth_events": {
                     (auth_events[a_id].type, auth_events[a_id].state_key):
                     auth_events[a_id]
-                    for a_id, _ in event_map[e_id].auth_events
+                    for a_id in event_map[e_id].auth_event_ids()
                     if a_id in auth_events
                 }
             })
@@ -779,7 +923,7 @@ class FederationHandler(BaseHandler):
                     )
                     continue
                 except NotRetryingDestination as e:
-                    logger.info(e.message)
+                    logger.info(str(e))
                     continue
                 except FederationDeniedError as e:
                     logger.info(e)
@@ -860,17 +1004,17 @@ class FederationHandler(BaseHandler):
         Raises:
             SynapseError if the event does not pass muster
         """
-        if len(ev.prev_events) > 20:
+        if len(ev.prev_event_ids()) > 20:
             logger.warn("Rejecting event %s which has %i prev_events",
-                        ev.event_id, len(ev.prev_events))
+                        ev.event_id, len(ev.prev_event_ids()))
             raise SynapseError(
                 http_client.BAD_REQUEST,
                 "Too many prev_events",
             )
 
-        if len(ev.auth_events) > 10:
+        if len(ev.auth_event_ids()) > 10:
             logger.warn("Rejecting event %s which has %i auth_events",
-                        ev.event_id, len(ev.auth_events))
+                        ev.event_id, len(ev.auth_event_ids()))
             raise SynapseError(
                 http_client.BAD_REQUEST,
                 "Too many auth_events",
@@ -895,7 +1039,7 @@ class FederationHandler(BaseHandler):
     def on_event_auth(self, event_id):
         event = yield self.store.get_event(event_id)
         auth = yield self.store.get_auth_chain(
-            [auth_id for auth_id, _ in event.auth_events],
+            [auth_id for auth_id in event.auth_event_ids()],
             include_given=True
         )
         defer.returnValue([e for e in auth])
@@ -922,6 +1066,9 @@ class FederationHandler(BaseHandler):
             joinee,
             "join",
             content,
+            params={
+                "ver": KNOWN_ROOM_VERSIONS,
+            },
         )
 
         # This shouldn't happen, because the RoomMemberHandler has a
@@ -1001,7 +1148,8 @@ class FederationHandler(BaseHandler):
             try:
                 logger.info("Processing queued PDU %s which was received "
                             "while we were joining %s", p.event_id, p.room_id)
-                yield self.on_receive_pdu(origin, p)
+                with logcontext.nested_logging_context(p.event_id):
+                    yield self.on_receive_pdu(origin, p, sent_to_us_directly=True)
             except Exception as e:
                 logger.warn(
                     "Error handling queued PDU %s from %s: %s",
@@ -1150,7 +1298,7 @@ class FederationHandler(BaseHandler):
         )
 
         context = yield self.state_handler.compute_event_context(event)
-        yield self._persist_events([(event, context)])
+        yield self.persist_events_and_notify([(event, context)])
 
         defer.returnValue(event)
 
@@ -1181,19 +1329,20 @@ class FederationHandler(BaseHandler):
         )
 
         context = yield self.state_handler.compute_event_context(event)
-        yield self._persist_events([(event, context)])
+        yield self.persist_events_and_notify([(event, context)])
 
         defer.returnValue(event)
 
     @defer.inlineCallbacks
     def _make_and_verify_event(self, target_hosts, room_id, user_id, membership,
-                               content={},):
+                               content={}, params=None):
         origin, pdu = yield self.federation_client.make_membership_event(
             target_hosts,
             room_id,
             user_id,
             membership,
             content,
+            params=params,
         )
 
         logger.debug("Got response to make_%s: %s", membership, pdu)
@@ -1331,7 +1480,7 @@ class FederationHandler(BaseHandler):
         )
 
         if state_groups:
-            _, state = state_groups.items().pop()
+            _, state = list(state_groups.items()).pop()
             results = state
 
             if event.is_state():
@@ -1403,12 +1552,10 @@ class FederationHandler(BaseHandler):
         else:
             defer.returnValue(None)
 
-    @log_function
     def get_min_depth_for_context(self, context):
         return self.store.get_min_depth(context)
 
     @defer.inlineCallbacks
-    @log_function
     def _handle_new_event(self, origin, event, state=None, auth_events=None,
                           backfilled=False):
         context = yield self._prep_event(
@@ -1417,25 +1564,26 @@ class FederationHandler(BaseHandler):
             auth_events=auth_events,
         )
 
+        # reraise does not allow inlineCallbacks to preserve the stacktrace, so we
+        # hack around with a try/finally instead.
+        success = False
         try:
             if not event.internal_metadata.is_outlier() and not backfilled:
                 yield self.action_generator.handle_push_actions_for_event(
                     event, context
                 )
 
-            yield self._persist_events(
+            yield self.persist_events_and_notify(
                 [(event, context)],
                 backfilled=backfilled,
             )
-        except:  # noqa: E722, as we reraise the exception this is fine.
-            tp, value, tb = sys.exc_info()
-
-            logcontext.run_in_background(
-                self.store.remove_push_actions_from_staging,
-                event.event_id,
-            )
-
-            six.reraise(tp, value, tb)
+            success = True
+        finally:
+            if not success:
+                logcontext.run_in_background(
+                    self.store.remove_push_actions_from_staging,
+                    event.event_id,
+                )
 
         defer.returnValue(context)
 
@@ -1448,20 +1596,27 @@ class FederationHandler(BaseHandler):
 
         Notifies about the events where appropriate.
         """
-        contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults(
-            [
-                logcontext.run_in_background(
-                    self._prep_event,
+
+        @defer.inlineCallbacks
+        def prep(ev_info):
+            event = ev_info["event"]
+            with logcontext.nested_logging_context(suffix=event.event_id):
+                res = yield self._prep_event(
                     origin,
-                    ev_info["event"],
+                    event,
                     state=ev_info.get("state"),
                     auth_events=ev_info.get("auth_events"),
                 )
+            defer.returnValue(res)
+
+        contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults(
+            [
+                logcontext.run_in_background(prep, ev_info)
                 for ev_info in event_infos
             ], consumeErrors=True,
         ))
 
-        yield self._persist_events(
+        yield self.persist_events_and_notify(
             [
                 (ev_info["event"], context)
                 for ev_info, context in zip(event_infos, contexts)
@@ -1506,7 +1661,7 @@ class FederationHandler(BaseHandler):
 
         missing_auth_events = set()
         for e in itertools.chain(auth_events, state, [event]):
-            for e_id, _ in e.auth_events:
+            for e_id in e.auth_event_ids():
                 if e_id not in event_map:
                     missing_auth_events.add(e_id)
 
@@ -1525,7 +1680,7 @@ class FederationHandler(BaseHandler):
         for e in itertools.chain(auth_events, state, [event]):
             auth_for_e = {
                 (event_map[e_id].type, event_map[e_id].state_key): event_map[e_id]
-                for e_id, _ in e.auth_events
+                for e_id in e.auth_event_ids()
                 if e_id in event_map
             }
             if create_event:
@@ -1549,7 +1704,7 @@ class FederationHandler(BaseHandler):
                     raise
                 events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR
 
-        yield self._persist_events(
+        yield self.persist_events_and_notify(
             [
                 (e, events_to_context[e.event_id])
                 for e in itertools.chain(auth_events, state)
@@ -1560,7 +1715,7 @@ class FederationHandler(BaseHandler):
             event, old_state=state
         )
 
-        yield self._persist_events(
+        yield self.persist_events_and_notify(
             [(event, new_event_context)],
         )
 
@@ -1593,10 +1748,10 @@ class FederationHandler(BaseHandler):
 
         # This is a hack to fix some old rooms where the initial join event
         # didn't reference the create event in its auth events.
-        if event.type == EventTypes.Member and not event.auth_events:
-            if len(event.prev_events) == 1 and event.depth < 5:
+        if event.type == EventTypes.Member and not event.auth_event_ids():
+            if len(event.prev_event_ids()) == 1 and event.depth < 5:
                 c = yield self.store.get_event(
-                    event.prev_events[0][0],
+                    event.prev_event_ids()[0],
                     allow_none=True,
                 )
                 if c and c.type == EventTypes.Create:
@@ -1608,8 +1763,8 @@ class FederationHandler(BaseHandler):
             )
         except AuthError as e:
             logger.warn(
-                "Rejecting %s because %s",
-                event.event_id, e.msg
+                "[%s %s] Rejecting: %s",
+                event.room_id, event.event_id, e.msg
             )
 
             context.rejected = RejectedReason.AUTH_ERROR
@@ -1643,7 +1798,7 @@ class FederationHandler(BaseHandler):
 
         # Now get the current auth_chain for the event.
         local_auth_chain = yield self.store.get_auth_chain(
-            [auth_id for auth_id, _ in event.auth_events],
+            [auth_id for auth_id in event.auth_event_ids()],
             include_given=True
         )
 
@@ -1660,7 +1815,7 @@ class FederationHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def on_get_missing_events(self, origin, room_id, earliest_events,
-                              latest_events, limit, min_depth):
+                              latest_events, limit):
         in_room = yield self.auth.check_host_in_room(
             room_id,
             origin
@@ -1669,14 +1824,12 @@ class FederationHandler(BaseHandler):
             raise AuthError(403, "Host not in room.")
 
         limit = min(limit, 20)
-        min_depth = max(min_depth, 0)
 
         missing_events = yield self.store.get_missing_events(
             room_id=room_id,
             earliest_events=earliest_events,
             latest_events=latest_events,
             limit=limit,
-            min_depth=min_depth,
         )
 
         missing_events = yield filter_events_for_server(
@@ -1701,7 +1854,7 @@ class FederationHandler(BaseHandler):
         """
         # Check if we have all the auth events.
         current_state = set(e.event_id for e in auth_events.values())
-        event_auth_events = set(e_id for e_id, _ in event.auth_events)
+        event_auth_events = set(event.auth_event_ids())
 
         if event.is_state():
             event_key = (event.type, event.state_key)
@@ -1745,7 +1898,7 @@ class FederationHandler(BaseHandler):
                         continue
 
                     try:
-                        auth_ids = [e_id for e_id, _ in e.auth_events]
+                        auth_ids = e.auth_event_ids()
                         auth = {
                             (e.type, e.state_key): e for e in remote_auth_chain
                             if e.event_id in auth_ids or e.type == EventTypes.Create
@@ -1766,7 +1919,7 @@ class FederationHandler(BaseHandler):
                         pass
 
                 have_events = yield self.store.get_seen_events_with_rejections(
-                    [e_id for e_id, _ in event.auth_events]
+                    event.auth_event_ids()
                 )
                 seen_events = set(have_events.keys())
             except Exception:
@@ -1802,7 +1955,10 @@ class FederationHandler(BaseHandler):
                     (d.type, d.state_key): d for d in different_events if d
                 })
 
-                new_state = self.state_handler.resolve_events(
+                room_version = yield self.store.get_room_version(event.room_id)
+
+                new_state = yield self.state_handler.resolve_events(
+                    room_version,
                     [list(local_view.values()), list(remote_view.values())],
                     event
                 )
@@ -1865,7 +2021,7 @@ class FederationHandler(BaseHandler):
                             continue
 
                         try:
-                            auth_ids = [e_id for e_id, _ in ev.auth_events]
+                            auth_ids = ev.auth_event_ids()
                             auth = {
                                 (e.type, e.state_key): e
                                 for e in result["auth_chain"]
@@ -2057,7 +2213,7 @@ class FederationHandler(BaseHandler):
         missing_remote_ids = [e.event_id for e in missing_remotes]
         base_remote_rejected = list(missing_remotes)
         for e in missing_remotes:
-            for e_id, _ in e.auth_events:
+            for e_id in e.auth_event_ids():
                 if e_id in missing_remote_ids:
                     try:
                         base_remote_rejected.remove(e)
@@ -2288,7 +2444,7 @@ class FederationHandler(BaseHandler):
                 for revocation.
         """
         try:
-            response = yield self.hs.get_simple_http_client().get_json(
+            response = yield self.http_client.get_json(
                 url,
                 {"public_key": public_key}
             )
@@ -2301,7 +2457,7 @@ class FederationHandler(BaseHandler):
             raise AuthError(403, "Third party certificate was invalid")
 
     @defer.inlineCallbacks
-    def _persist_events(self, event_and_contexts, backfilled=False):
+    def persist_events_and_notify(self, event_and_contexts, backfilled=False):
         """Persists events and tells the notifier/pushers about them, if
         necessary.
 
@@ -2313,14 +2469,21 @@ class FederationHandler(BaseHandler):
         Returns:
             Deferred
         """
-        max_stream_id = yield self.store.persist_events(
-            event_and_contexts,
-            backfilled=backfilled,
-        )
+        if self.config.worker_app:
+            yield self._send_events_to_master(
+                store=self.store,
+                event_and_contexts=event_and_contexts,
+                backfilled=backfilled
+            )
+        else:
+            max_stream_id = yield self.store.persist_events(
+                event_and_contexts,
+                backfilled=backfilled,
+            )
 
-        if not backfilled:  # Never notify for backfilled events
-            for event, _ in event_and_contexts:
-                self._notify_persisted_event(event, max_stream_id)
+            if not backfilled:  # Never notify for backfilled events
+                for event, _ in event_and_contexts:
+                    yield self._notify_persisted_event(event, max_stream_id)
 
     def _notify_persisted_event(self, event, max_stream_id):
         """Checks to see if notifier/pushers should be notified about the
@@ -2353,15 +2516,30 @@ class FederationHandler(BaseHandler):
             extra_users=extra_users
         )
 
-        logcontext.run_in_background(
-            self.pusher_pool.on_new_notifications,
+        return self.pusher_pool.on_new_notifications(
             event_stream_id, max_stream_id,
         )
 
     def _clean_room_for_join(self, room_id):
-        return self.store.clean_room_for_join(room_id)
+        """Called to clean up any data in DB for a given room, ready for the
+        server to join the room.
+
+        Args:
+            room_id (str)
+        """
+        if self.config.worker_app:
+            return self._clean_room_for_join_client(room_id)
+        else:
+            return self.store.clean_room_for_join(room_id)
 
     def user_joined_room(self, user, room_id):
         """Called when a new user has joined the room
         """
-        return user_joined_room(self.distributor, user, room_id)
+        if self.config.worker_app:
+            return self._notify_user_membership_change(
+                room_id=room_id,
+                user_id=user.to_string(),
+                change="joined",
+            )
+        else:
+            return user_joined_room(self.distributor, user, room_id)
diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py
index 53e5e2648b..173315af6c 100644
--- a/synapse/handlers/groups_local.py
+++ b/synapse/handlers/groups_local.py
@@ -20,7 +20,7 @@ from six import iteritems
 
 from twisted.internet import defer
 
-from synapse.api.errors import SynapseError
+from synapse.api.errors import HttpResponseException, SynapseError
 from synapse.types import get_domain_from_id
 
 logger = logging.getLogger(__name__)
@@ -37,9 +37,23 @@ def _create_rerouter(func_name):
             )
         else:
             destination = get_domain_from_id(group_id)
-            return getattr(self.transport_client, func_name)(
+            d = getattr(self.transport_client, func_name)(
                 destination, group_id, *args, **kwargs
             )
+
+            # Capture errors returned by the remote homeserver and
+            # re-throw specific errors as SynapseErrors. This is so
+            # when the remote end responds with things like 403 Not
+            # In Group, we can communicate that to the client instead
+            # of a 500.
+            def h(failure):
+                failure.trap(HttpResponseException)
+                e = failure.value
+                if e.code == 403:
+                    raise e.to_synapse_error()
+                return failure
+            d.addErrback(h)
+            return d
     return f
 
 
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index 1d36d967c3..5feb3f22a6 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -137,15 +137,19 @@ class IdentityHandler(BaseHandler):
         defer.returnValue(data)
 
     @defer.inlineCallbacks
-    def unbind_threepid(self, mxid, threepid):
-        """
-        Removes a binding from an identity server
+    def try_unbind_threepid(self, mxid, threepid):
+        """Removes a binding from an identity server
+
         Args:
             mxid (str): Matrix user ID of binding to be removed
             threepid (dict): Dict with medium & address of binding to be removed
 
+        Raises:
+            SynapseError: If we failed to contact the identity server
+
         Returns:
-            Deferred[bool]: True on success, otherwise False
+            Deferred[bool]: True on success, otherwise False if the identity
+            server doesn't support unbinding
         """
         logger.debug("unbinding threepid %r from %s", threepid, mxid)
         if not self.trusted_id_servers:
@@ -175,11 +179,21 @@ class IdentityHandler(BaseHandler):
             content=content,
             destination_is=id_server,
         )
-        yield self.http_client.post_json_get_json(
-            url,
-            content,
-            headers,
-        )
+        try:
+            yield self.http_client.post_json_get_json(
+                url,
+                content,
+                headers,
+            )
+        except HttpResponseException as e:
+            if e.code in (400, 404, 501,):
+                # The remote server probably doesn't support unbinding (yet)
+                logger.warn("Received %d response while unbinding threepid", e.code)
+                defer.returnValue(False)
+            else:
+                logger.error("Failed to unbind threepid on identity server: %s", e)
+                raise SynapseError(502, "Failed to contact identity server")
+
         defer.returnValue(True)
 
     @defer.inlineCallbacks
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index 40e7580a61..563bb3cea3 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -25,7 +25,7 @@ from synapse.handlers.presence import format_user_presence_state
 from synapse.streams.config import PaginationConfig
 from synapse.types import StreamToken, UserID
 from synapse.util import unwrapFirstError
-from synapse.util.async import concurrently_execute
+from synapse.util.async_helpers import concurrently_execute
 from synapse.util.caches.snapshot_cache import SnapshotCache
 from synapse.util.logcontext import make_deferred_yieldable, run_in_background
 from synapse.visibility import filter_events_for_client
@@ -156,7 +156,7 @@ class InitialSyncHandler(BaseHandler):
                     room_end_token = "s%d" % (event.stream_ordering,)
                     deferred_room_state = run_in_background(
                         self.store.get_state_for_events,
-                        [event.event_id], None,
+                        [event.event_id],
                     )
                     deferred_room_state.addCallback(
                         lambda states: states[event.event_id]
@@ -301,7 +301,7 @@ class InitialSyncHandler(BaseHandler):
     def _room_initial_sync_parted(self, user_id, room_id, pagin_config,
                                   membership, member_event_id, is_peeking):
         room_state = yield self.store.get_state_for_events(
-            [member_event_id], None
+            [member_event_id],
         )
 
         room_state = room_state[member_event_id]
@@ -372,6 +372,10 @@ class InitialSyncHandler(BaseHandler):
 
         @defer.inlineCallbacks
         def get_presence():
+            # If presence is disabled, return an empty list
+            if not self.hs.config.use_presence:
+                defer.returnValue([])
+
             states = yield presence_handler.get_states(
                 [m.user_id for m in room_members],
                 as_event=True,
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 39d7724778..a7cd779b02 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -14,9 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
-import sys
 
-import six
 from six import iteritems, itervalues, string_types
 
 from canonicaljson import encode_canonical_json, json
@@ -25,17 +23,25 @@ from twisted.internet import defer
 from twisted.internet.defer import succeed
 
 from synapse.api.constants import MAX_DEPTH, EventTypes, Membership
-from synapse.api.errors import AuthError, Codes, ConsentNotGivenError, SynapseError
+from synapse.api.errors import (
+    AuthError,
+    Codes,
+    ConsentNotGivenError,
+    NotFoundError,
+    SynapseError,
+)
 from synapse.api.urls import ConsentURIBuilder
 from synapse.crypto.event_signing import add_hashes_and_signatures
 from synapse.events.utils import serialize_event
 from synapse.events.validator import EventValidator
-from synapse.replication.http.send_event import send_event_to_master
+from synapse.replication.http.send_event import ReplicationSendEventRestServlet
+from synapse.storage.state import StateFilter
 from synapse.types import RoomAlias, UserID
-from synapse.util.async import Linearizer
+from synapse.util.async_helpers import Linearizer
 from synapse.util.frozenutils import frozendict_json_encoder
 from synapse.util.logcontext import run_in_background
 from synapse.util.metrics import measure_func
+from synapse.visibility import filter_events_for_client
 
 from ._base import BaseHandler
 
@@ -75,35 +81,87 @@ class MessageHandler(object):
         elif membership == Membership.LEAVE:
             key = (event_type, state_key)
             room_state = yield self.store.get_state_for_events(
-                [membership_event_id], [key]
+                [membership_event_id], StateFilter.from_types([key])
             )
             data = room_state[membership_event_id].get(key)
 
         defer.returnValue(data)
 
     @defer.inlineCallbacks
-    def get_state_events(self, user_id, room_id, is_guest=False):
+    def get_state_events(
+        self, user_id, room_id, state_filter=StateFilter.all(),
+        at_token=None, is_guest=False,
+    ):
         """Retrieve all state events for a given room. If the user is
         joined to the room then return the current state. If the user has
-        left the room return the state events from when they left.
+        left the room return the state events from when they left. If an explicit
+        'at' parameter is passed, return the state events as of that event, if
+        visible.
 
         Args:
             user_id(str): The user requesting state events.
             room_id(str): The room ID to get all state events from.
+            state_filter (StateFilter): The state filter used to fetch state
+                from the database.
+            at_token(StreamToken|None): the stream token of the at which we are requesting
+                the stats. If the user is not allowed to view the state as of that
+                stream token, we raise a 403 SynapseError. If None, returns the current
+                state based on the current_state_events table.
+            is_guest(bool): whether this user is a guest
         Returns:
             A list of dicts representing state events. [{}, {}, {}]
+        Raises:
+            NotFoundError (404) if the at token does not yield an event
+
+            AuthError (403) if the user doesn't have permission to view
+            members of this room.
         """
-        membership, membership_event_id = yield self.auth.check_in_room_or_world_readable(
-            room_id, user_id
-        )
+        if at_token:
+            # FIXME this claims to get the state at a stream position, but
+            # get_recent_events_for_room operates by topo ordering. This therefore
+            # does not reliably give you the state at the given stream position.
+            # (https://github.com/matrix-org/synapse/issues/3305)
+            last_events, _ = yield self.store.get_recent_events_for_room(
+                room_id, end_token=at_token.room_key, limit=1,
+            )
 
-        if membership == Membership.JOIN:
-            room_state = yield self.state.get_current_state(room_id)
-        elif membership == Membership.LEAVE:
-            room_state = yield self.store.get_state_for_events(
-                [membership_event_id], None
+            if not last_events:
+                raise NotFoundError("Can't find event for token %s" % (at_token, ))
+
+            visible_events = yield filter_events_for_client(
+                self.store, user_id, last_events,
+            )
+
+            event = last_events[0]
+            if visible_events:
+                room_state = yield self.store.get_state_for_events(
+                    [event.event_id], state_filter=state_filter,
+                )
+                room_state = room_state[event.event_id]
+            else:
+                raise AuthError(
+                    403,
+                    "User %s not allowed to view events in room %s at token %s" % (
+                        user_id, room_id, at_token,
+                    )
+                )
+        else:
+            membership, membership_event_id = (
+                yield self.auth.check_in_room_or_world_readable(
+                    room_id, user_id,
+                )
             )
-            room_state = room_state[membership_event_id]
+
+            if membership == Membership.JOIN:
+                state_ids = yield self.store.get_filtered_current_state_ids(
+                    room_id, state_filter=state_filter,
+                )
+                room_state = yield self.store.get_events(state_ids.values())
+            elif membership == Membership.LEAVE:
+                room_state = yield self.store.get_state_for_events(
+                    [membership_event_id], state_filter=state_filter,
+                )
+                room_state = room_state[membership_event_id]
 
         now = self.clock.time_msec()
         defer.returnValue(
@@ -171,7 +229,7 @@ class EventCreationHandler(object):
         self.notifier = hs.get_notifier()
         self.config = hs.config
 
-        self.http_client = hs.get_simple_http_client()
+        self.send_event_to_master = ReplicationSendEventRestServlet.make_client(hs)
 
         # This is only used to get at ratelimit function, and maybe_kick_guest_users
         self.base_handler = BaseHandler(hs)
@@ -212,10 +270,14 @@ class EventCreationHandler(object):
                 where *hashes* is a map from algorithm to hash.
 
                 If None, they will be requested from the database.
-
+        Raises:
+            ResourceLimitError if server is blocked to some resource being
+            exceeded
         Returns:
             Tuple of created event (FrozenEvent), Context
         """
+        yield self.auth.check_auth_blocking(requester.user.to_string())
+
         builder = self.event_builder_factory.new(event_dict)
 
         self.validator.validate_new(builder)
@@ -365,6 +427,9 @@ class EventCreationHandler(object):
 
         if event.is_state():
             prev_state = yield self.deduplicate_state_event(event, context)
+            logger.info(
+                "Not bothering to persist duplicate state event %s", event.event_id,
+            )
             if prev_state is not None:
                 defer.returnValue(prev_state)
 
@@ -556,21 +621,22 @@ class EventCreationHandler(object):
             event, context
         )
 
+        # reraise does not allow inlineCallbacks to preserve the stacktrace, so we
+        # hack around with a try/finally instead.
+        success = False
         try:
             # If we're a worker we need to hit out to the master.
             if self.config.worker_app:
-                yield send_event_to_master(
-                    clock=self.hs.get_clock(),
+                yield self.send_event_to_master(
+                    event_id=event.event_id,
                     store=self.store,
-                    client=self.http_client,
-                    host=self.config.worker_replication_host,
-                    port=self.config.worker_replication_http_port,
                     requester=requester,
                     event=event,
                     context=context,
                     ratelimit=ratelimit,
                     extra_users=extra_users,
                 )
+                success = True
                 return
 
             yield self.persist_and_notify_client_event(
@@ -580,17 +646,16 @@ class EventCreationHandler(object):
                 ratelimit=ratelimit,
                 extra_users=extra_users,
             )
-        except:  # noqa: E722, as we reraise the exception this is fine.
-            # Ensure that we actually remove the entries in the push actions
-            # staging area, if we calculated them.
-            tp, value, tb = sys.exc_info()
-
-            run_in_background(
-                self.store.remove_push_actions_from_staging,
-                event.event_id,
-            )
 
-            six.reraise(tp, value, tb)
+            success = True
+        finally:
+            if not success:
+                # Ensure that we actually remove the entries in the push actions
+                # staging area, if we calculated them.
+                run_in_background(
+                    self.store.remove_push_actions_from_staging,
+                    event.event_id,
+                )
 
     @defer.inlineCallbacks
     def persist_and_notify_client_event(
@@ -713,11 +778,8 @@ class EventCreationHandler(object):
             event, context=context
         )
 
-        # this intentionally does not yield: we don't care about the result
-        # and don't need to wait for it.
-        run_in_background(
-            self.pusher_pool.on_new_notifications,
-            event_stream_id, max_stream_id
+        yield self.pusher_pool.on_new_notifications(
+            event_stream_id, max_stream_id,
         )
 
         def _notify():
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index b2849783ed..43f81bd607 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -18,11 +18,12 @@ import logging
 from twisted.internet import defer
 from twisted.python.failure import Failure
 
-from synapse.api.constants import Membership
+from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import SynapseError
 from synapse.events.utils import serialize_event
+from synapse.storage.state import StateFilter
 from synapse.types import RoomStreamToken
-from synapse.util.async import ReadWriteLock
+from synapse.util.async_helpers import ReadWriteLock
 from synapse.util.logcontext import run_in_background
 from synapse.util.stringutils import random_string
 from synapse.visibility import filter_events_for_client
@@ -251,6 +252,24 @@ class PaginationHandler(object):
             is_peeking=(member_event_id is None),
         )
 
+        state = None
+        if event_filter and event_filter.lazy_load_members():
+            # TODO: remove redundant members
+
+            # FIXME: we also care about invite targets etc.
+            state_filter = StateFilter.from_types(
+                (EventTypes.Member, event.sender)
+                for event in events
+            )
+
+            state_ids = yield self.store.get_state_ids_for_event(
+                events[0].event_id, state_filter=state_filter,
+            )
+
+            if state_ids:
+                state = yield self.store.get_events(list(state_ids.values()))
+                state = state.values()
+
         time_now = self.clock.time_msec()
 
         chunk = {
@@ -262,4 +281,10 @@ class PaginationHandler(object):
             "end": next_token.to_string(),
         }
 
+        if state:
+            chunk["state"] = [
+                serialize_event(e, time_now, as_client_event)
+                for e in state
+            ]
+
         defer.returnValue(chunk)
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 3732830194..ba3856674d 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -36,7 +36,7 @@ from synapse.api.errors import SynapseError
 from synapse.metrics import LaterGauge
 from synapse.storage.presence import UserPresenceState
 from synapse.types import UserID, get_domain_from_id
-from synapse.util.async import Linearizer
+from synapse.util.async_helpers import Linearizer
 from synapse.util.caches.descriptors import cachedInlineCallbacks
 from synapse.util.logcontext import run_in_background
 from synapse.util.logutils import log_function
@@ -95,6 +95,7 @@ class PresenceHandler(object):
         Args:
             hs (synapse.server.HomeServer):
         """
+        self.hs = hs
         self.is_mine = hs.is_mine
         self.is_mine_id = hs.is_mine_id
         self.clock = hs.get_clock()
@@ -230,6 +231,10 @@ class PresenceHandler(object):
         earlier than they should when synapse is restarted. This affect of this
         is some spurious presence changes that will self-correct.
         """
+        # If the DB pool has already terminated, don't try updating
+        if not self.hs.get_db_pool().running:
+            return
+
         logger.info(
             "Performing _on_shutdown. Persisting %d unpersisted changes",
             len(self.user_to_current_state)
@@ -390,6 +395,10 @@ class PresenceHandler(object):
         """We've seen the user do something that indicates they're interacting
         with the app.
         """
+        # If presence is disabled, no-op
+        if not self.hs.config.use_presence:
+            return
+
         user_id = user.to_string()
 
         bump_active_time_counter.inc()
@@ -419,6 +428,11 @@ class PresenceHandler(object):
                 Useful for streams that are not associated with an actual
                 client that is being used by a user.
         """
+        # Override if it should affect the user's presence, if presence is
+        # disabled.
+        if not self.hs.config.use_presence:
+            affect_presence = False
+
         if affect_presence:
             curr_sync = self.user_to_num_current_syncs.get(user_id, 0)
             self.user_to_num_current_syncs[user_id] = curr_sync + 1
@@ -464,13 +478,16 @@ class PresenceHandler(object):
         Returns:
             set(str): A set of user_id strings.
         """
-        syncing_user_ids = {
-            user_id for user_id, count in self.user_to_num_current_syncs.items()
-            if count
-        }
-        for user_ids in self.external_process_to_current_syncs.values():
-            syncing_user_ids.update(user_ids)
-        return syncing_user_ids
+        if self.hs.config.use_presence:
+            syncing_user_ids = {
+                user_id for user_id, count in self.user_to_num_current_syncs.items()
+                if count
+            }
+            for user_ids in self.external_process_to_current_syncs.values():
+                syncing_user_ids.update(user_ids)
+            return syncing_user_ids
+        else:
+            return set()
 
     @defer.inlineCallbacks
     def update_external_syncs_row(self, process_id, user_id, is_syncing, sync_time_msec):
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 9af2e8f869..1dfbde84fd 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -32,12 +32,16 @@ from ._base import BaseHandler
 logger = logging.getLogger(__name__)
 
 
-class ProfileHandler(BaseHandler):
-    PROFILE_UPDATE_MS = 60 * 1000
-    PROFILE_UPDATE_EVERY_MS = 24 * 60 * 60 * 1000
+class BaseProfileHandler(BaseHandler):
+    """Handles fetching and updating user profile information.
+
+    BaseProfileHandler can be instantiated directly on workers and will
+    delegate to master when necessary. The master process should use the
+    subclass MasterProfileHandler
+    """
 
     def __init__(self, hs):
-        super(ProfileHandler, self).__init__(hs)
+        super(BaseProfileHandler, self).__init__(hs)
 
         self.federation = hs.get_federation_client()
         hs.get_federation_registry().register_query_handler(
@@ -46,11 +50,6 @@ class ProfileHandler(BaseHandler):
 
         self.user_directory_handler = hs.get_user_directory_handler()
 
-        if hs.config.worker_app is None:
-            self.clock.looping_call(
-                self._start_update_remote_profile_cache, self.PROFILE_UPDATE_MS,
-            )
-
     @defer.inlineCallbacks
     def get_profile(self, user_id):
         target_user = UserID.from_string(user_id)
@@ -143,10 +142,8 @@ class ProfileHandler(BaseHandler):
                 if e.code != 404:
                     logger.exception("Failed to get displayname")
                 raise
-            except Exception:
-                logger.exception("Failed to get displayname")
-            else:
-                defer.returnValue(result["displayname"])
+
+            defer.returnValue(result["displayname"])
 
     @defer.inlineCallbacks
     def set_displayname(self, target_user, requester, new_displayname, by_admin=False):
@@ -200,8 +197,6 @@ class ProfileHandler(BaseHandler):
                 if e.code != 404:
                     logger.exception("Failed to get avatar_url")
                 raise
-            except Exception:
-                logger.exception("Failed to get avatar_url")
 
             defer.returnValue(result["avatar_url"])
 
@@ -279,9 +274,23 @@ class ProfileHandler(BaseHandler):
             except Exception as e:
                 logger.warn(
                     "Failed to update join event for room %s - %s",
-                    room_id, str(e.message)
+                    room_id, str(e)
                 )
 
+
+class MasterProfileHandler(BaseProfileHandler):
+    PROFILE_UPDATE_MS = 60 * 1000
+    PROFILE_UPDATE_EVERY_MS = 24 * 60 * 60 * 1000
+
+    def __init__(self, hs):
+        super(MasterProfileHandler, self).__init__(hs)
+
+        assert hs.config.worker_app is None
+
+        self.clock.looping_call(
+            self._start_update_remote_profile_cache, self.PROFILE_UPDATE_MS,
+        )
+
     def _start_update_remote_profile_cache(self):
         return run_as_background_process(
             "Update remote profile", self._update_remote_profile_cache,
diff --git a/synapse/handlers/read_marker.py b/synapse/handlers/read_marker.py
index 995460f82a..32108568c6 100644
--- a/synapse/handlers/read_marker.py
+++ b/synapse/handlers/read_marker.py
@@ -17,7 +17,7 @@ import logging
 
 from twisted.internet import defer
 
-from synapse.util.async import Linearizer
+from synapse.util.async_helpers import Linearizer
 
 from ._base import BaseHandler
 
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index cb905a3903..4c2690ba26 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -18,7 +18,6 @@ from twisted.internet import defer
 
 from synapse.types import get_domain_from_id
 from synapse.util import logcontext
-from synapse.util.logcontext import PreserveLoggingContext
 
 from ._base import BaseHandler
 
@@ -116,16 +115,15 @@ class ReceiptsHandler(BaseHandler):
 
         affected_room_ids = list(set([r["room_id"] for r in receipts]))
 
-        with PreserveLoggingContext():
-            self.notifier.on_new_event(
-                "receipt_key", max_batch_id, rooms=affected_room_ids
-            )
-            # Note that the min here shouldn't be relied upon to be accurate.
-            self.hs.get_pusherpool().on_new_receipts(
-                min_batch_id, max_batch_id, affected_room_ids
-            )
+        self.notifier.on_new_event(
+            "receipt_key", max_batch_id, rooms=affected_room_ids
+        )
+        # Note that the min here shouldn't be relied upon to be accurate.
+        yield self.hs.get_pusherpool().on_new_receipts(
+            min_batch_id, max_batch_id, affected_room_ids,
+        )
 
-            defer.returnValue(True)
+        defer.returnValue(True)
 
     @logcontext.preserve_fn   # caller should not yield on this
     @defer.inlineCallbacks
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 289704b241..015909bb26 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -28,7 +28,7 @@ from synapse.api.errors import (
 )
 from synapse.http.client import CaptchaServerHttpClient
 from synapse.types import RoomAlias, RoomID, UserID, create_requester
-from synapse.util.async import Linearizer
+from synapse.util.async_helpers import Linearizer
 from synapse.util.threepids import check_3pid_allowed
 
 from ._base import BaseHandler
@@ -125,6 +125,7 @@ class RegistrationHandler(BaseHandler):
         guest_access_token=None,
         make_guest=False,
         admin=False,
+        threepid=None,
     ):
         """Registers a new client on the server.
 
@@ -144,7 +145,8 @@ class RegistrationHandler(BaseHandler):
         Raises:
             RegistrationError if there was a problem registering.
         """
-        yield self._check_mau_limits()
+
+        yield self.auth.check_auth_blocking(threepid=threepid)
         password_hash = None
         if password:
             password_hash = yield self.auth_handler().hash(password)
@@ -215,19 +217,65 @@ class RegistrationHandler(BaseHandler):
                     user_id = None
                     token = None
                     attempts += 1
+        if not self.hs.config.user_consent_at_registration:
+            yield self._auto_join_rooms(user_id)
+
+        defer.returnValue((user_id, token))
+
+    @defer.inlineCallbacks
+    def _auto_join_rooms(self, user_id):
+        """Automatically joins users to auto join rooms - creating the room in the first place
+        if the user is the first to be created.
 
+        Args:
+            user_id(str): The user to join
+        """
         # auto-join the user to any rooms we're supposed to dump them into
         fake_requester = create_requester(user_id)
+
+        # try to create the room if we're the first user on the server
+        should_auto_create_rooms = False
+        if self.hs.config.autocreate_auto_join_rooms:
+            count = yield self.store.count_all_users()
+            should_auto_create_rooms = count == 1
         for r in self.hs.config.auto_join_rooms:
             try:
-                yield self._join_user_to_room(fake_requester, r)
+                if should_auto_create_rooms:
+                    room_alias = RoomAlias.from_string(r)
+                    if self.hs.hostname != room_alias.domain:
+                        logger.warning(
+                            'Cannot create room alias %s, '
+                            'it does not match server domain',
+                            r,
+                        )
+                    else:
+                        # create room expects the localpart of the room alias
+                        room_alias_localpart = room_alias.localpart
+
+                        # getting the RoomCreationHandler during init gives a dependency
+                        # loop
+                        yield self.hs.get_room_creation_handler().create_room(
+                            fake_requester,
+                            config={
+                                "preset": "public_chat",
+                                "room_alias_name": room_alias_localpart
+                            },
+                            ratelimit=False,
+                        )
+                else:
+                    yield self._join_user_to_room(fake_requester, r)
             except Exception as e:
                 logger.error("Failed to join new user to %r: %r", r, e)
 
-        # We used to generate default identicons here, but nowadays
-        # we want clients to generate their own as part of their branding
-        # rather than there being consistent matrix-wide ones, so we don't.
-        defer.returnValue((user_id, token))
+    @defer.inlineCallbacks
+    def post_consent_actions(self, user_id):
+        """A series of registration actions that can only be carried out once consent
+        has been granted
+
+        Args:
+            user_id (str): The user to join
+        """
+        yield self._auto_join_rooms(user_id)
 
     @defer.inlineCallbacks
     def appservice_register(self, user_localpart, as_token):
@@ -289,7 +337,7 @@ class RegistrationHandler(BaseHandler):
                 400,
                 "User ID can only contain characters a-z, 0-9, or '=_-./'",
             )
-        yield self._check_mau_limits()
+        yield self.auth.check_auth_blocking()
         user = UserID(localpart, self.hs.hostname)
         user_id = user.to_string()
 
@@ -439,7 +487,7 @@ class RegistrationHandler(BaseHandler):
         """
         if localpart is None:
             raise SynapseError(400, "Request must include user id")
-        yield self._check_mau_limits()
+        yield self.auth.check_auth_blocking()
         need_register = True
 
         try:
@@ -532,17 +580,5 @@ class RegistrationHandler(BaseHandler):
             room_id=room_id,
             remote_room_hosts=remote_room_hosts,
             action="join",
+            ratelimit=False,
         )
-
-    @defer.inlineCallbacks
-    def _check_mau_limits(self):
-        """
-        Do not accept registrations if monthly active user limits exceeded
-         and limiting is enabled
-        """
-        if self.hs.config.limit_usage_by_mau is True:
-            current_mau = yield self.store.count_monthly_users()
-            if current_mau >= self.hs.config.max_mau_value:
-                raise RegistrationError(
-                    403, "MAU Limit Exceeded", Codes.MAU_LIMIT_EXCEEDED
-                )
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 7b7804d9b2..3928faa6e7 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -21,12 +21,22 @@ import math
 import string
 from collections import OrderedDict
 
+from six import iteritems, string_types
+
 from twisted.internet import defer
 
-from synapse.api.constants import EventTypes, JoinRules, RoomCreationPreset
-from synapse.api.errors import AuthError, Codes, StoreError, SynapseError
+from synapse.api.constants import (
+    DEFAULT_ROOM_VERSION,
+    KNOWN_ROOM_VERSIONS,
+    EventTypes,
+    JoinRules,
+    RoomCreationPreset,
+)
+from synapse.api.errors import AuthError, Codes, NotFoundError, StoreError, SynapseError
+from synapse.storage.state import StateFilter
 from synapse.types import RoomAlias, RoomID, RoomStreamToken, StreamToken, UserID
 from synapse.util import stringutils
+from synapse.util.async_helpers import Linearizer
 from synapse.visibility import filter_events_for_client
 
 from ._base import BaseHandler
@@ -64,6 +74,334 @@ class RoomCreationHandler(BaseHandler):
 
         self.spam_checker = hs.get_spam_checker()
         self.event_creation_handler = hs.get_event_creation_handler()
+        self.room_member_handler = hs.get_room_member_handler()
+
+        # linearizer to stop two upgrades happening at once
+        self._upgrade_linearizer = Linearizer("room_upgrade_linearizer")
+
+    @defer.inlineCallbacks
+    def upgrade_room(self, requester, old_room_id, new_version):
+        """Replace a room with a new room with a different version
+
+        Args:
+            requester (synapse.types.Requester): the user requesting the upgrade
+            old_room_id (unicode): the id of the room to be replaced
+            new_version (unicode): the new room version to use
+
+        Returns:
+            Deferred[unicode]: the new room id
+        """
+        yield self.ratelimit(requester)
+
+        user_id = requester.user.to_string()
+
+        with (yield self._upgrade_linearizer.queue(old_room_id)):
+            # start by allocating a new room id
+            r = yield self.store.get_room(old_room_id)
+            if r is None:
+                raise NotFoundError("Unknown room id %s" % (old_room_id,))
+            new_room_id = yield self._generate_room_id(
+                creator_id=user_id, is_public=r["is_public"],
+            )
+
+            logger.info("Creating new room %s to replace %s", new_room_id, old_room_id)
+
+            # we create and auth the tombstone event before properly creating the new
+            # room, to check our user has perms in the old room.
+            tombstone_event, tombstone_context = (
+                yield self.event_creation_handler.create_event(
+                    requester, {
+                        "type": EventTypes.Tombstone,
+                        "state_key": "",
+                        "room_id": old_room_id,
+                        "sender": user_id,
+                        "content": {
+                            "body": "This room has been replaced",
+                            "replacement_room": new_room_id,
+                        }
+                    },
+                    token_id=requester.access_token_id,
+                )
+            )
+            yield self.auth.check_from_context(tombstone_event, tombstone_context)
+
+            yield self.clone_exiting_room(
+                requester,
+                old_room_id=old_room_id,
+                new_room_id=new_room_id,
+                new_room_version=new_version,
+                tombstone_event_id=tombstone_event.event_id,
+            )
+
+            # now send the tombstone
+            yield self.event_creation_handler.send_nonmember_event(
+                requester, tombstone_event, tombstone_context,
+            )
+
+            old_room_state = yield tombstone_context.get_current_state_ids(self.store)
+
+            # update any aliases
+            yield self._move_aliases_to_new_room(
+                requester, old_room_id, new_room_id, old_room_state,
+            )
+
+            # and finally, shut down the PLs in the old room, and update them in the new
+            # room.
+            yield self._update_upgraded_room_pls(
+                requester, old_room_id, new_room_id, old_room_state,
+            )
+
+            defer.returnValue(new_room_id)
+
+    @defer.inlineCallbacks
+    def _update_upgraded_room_pls(
+            self, requester, old_room_id, new_room_id, old_room_state,
+    ):
+        """Send updated power levels in both rooms after an upgrade
+
+        Args:
+            requester (synapse.types.Requester): the user requesting the upgrade
+            old_room_id (unicode): the id of the room to be replaced
+            new_room_id (unicode): the id of the replacement room
+            old_room_state (dict[tuple[str, str], str]): the state map for the old room
+
+        Returns:
+            Deferred
+        """
+        old_room_pl_event_id = old_room_state.get((EventTypes.PowerLevels, ""))
+
+        if old_room_pl_event_id is None:
+            logger.warning(
+                "Not supported: upgrading a room with no PL event. Not setting PLs "
+                "in old room.",
+            )
+            return
+
+        old_room_pl_state = yield self.store.get_event(old_room_pl_event_id)
+
+        # we try to stop regular users from speaking by setting the PL required
+        # to send regular events and invites to 'Moderator' level. That's normally
+        # 50, but if the default PL in a room is 50 or more, then we set the
+        # required PL above that.
+
+        pl_content = dict(old_room_pl_state.content)
+        users_default = int(pl_content.get("users_default", 0))
+        restricted_level = max(users_default + 1, 50)
+
+        updated = False
+        for v in ("invite", "events_default"):
+            current = int(pl_content.get(v, 0))
+            if current < restricted_level:
+                logger.info(
+                    "Setting level for %s in %s to %i (was %i)",
+                    v, old_room_id, restricted_level, current,
+                )
+                pl_content[v] = restricted_level
+                updated = True
+            else:
+                logger.info(
+                    "Not setting level for %s (already %i)",
+                    v, current,
+                )
+
+        if updated:
+            try:
+                yield self.event_creation_handler.create_and_send_nonmember_event(
+                    requester, {
+                        "type": EventTypes.PowerLevels,
+                        "state_key": '',
+                        "room_id": old_room_id,
+                        "sender": requester.user.to_string(),
+                        "content": pl_content,
+                    }, ratelimit=False,
+                )
+            except AuthError as e:
+                logger.warning("Unable to update PLs in old room: %s", e)
+
+        logger.info("Setting correct PLs in new room")
+        yield self.event_creation_handler.create_and_send_nonmember_event(
+            requester, {
+                "type": EventTypes.PowerLevels,
+                "state_key": '',
+                "room_id": new_room_id,
+                "sender": requester.user.to_string(),
+                "content": old_room_pl_state.content,
+            }, ratelimit=False,
+        )
+
+    @defer.inlineCallbacks
+    def clone_exiting_room(
+            self, requester, old_room_id, new_room_id, new_room_version,
+            tombstone_event_id,
+    ):
+        """Populate a new room based on an old room
+
+        Args:
+            requester (synapse.types.Requester): the user requesting the upgrade
+            old_room_id (unicode): the id of the room to be replaced
+            new_room_id (unicode): the id to give the new room (should already have been
+                created with _gemerate_room_id())
+            new_room_version (unicode): the new room version to use
+            tombstone_event_id (unicode|str): the ID of the tombstone event in the old
+                room.
+        Returns:
+            Deferred[None]
+        """
+        user_id = requester.user.to_string()
+
+        if not self.spam_checker.user_may_create_room(user_id):
+            raise SynapseError(403, "You are not permitted to create rooms")
+
+        creation_content = {
+            "room_version": new_room_version,
+            "predecessor": {
+                "room_id": old_room_id,
+                "event_id": tombstone_event_id,
+            }
+        }
+
+        initial_state = dict()
+
+        types_to_copy = (
+            (EventTypes.JoinRules, ""),
+            (EventTypes.Name, ""),
+            (EventTypes.Topic, ""),
+            (EventTypes.RoomHistoryVisibility, ""),
+            (EventTypes.GuestAccess, ""),
+            (EventTypes.RoomAvatar, ""),
+        )
+
+        old_room_state_ids = yield self.store.get_filtered_current_state_ids(
+            old_room_id, StateFilter.from_types(types_to_copy),
+        )
+        # map from event_id to BaseEvent
+        old_room_state_events = yield self.store.get_events(old_room_state_ids.values())
+
+        for k, old_event_id in iteritems(old_room_state_ids):
+            old_event = old_room_state_events.get(old_event_id)
+            if old_event:
+                initial_state[k] = old_event.content
+
+        yield self._send_events_for_new_room(
+            requester,
+            new_room_id,
+
+            # we expect to override all the presets with initial_state, so this is
+            # somewhat arbitrary.
+            preset_config=RoomCreationPreset.PRIVATE_CHAT,
+
+            invite_list=[],
+            initial_state=initial_state,
+            creation_content=creation_content,
+        )
+
+        # XXX invites/joins
+        # XXX 3pid invites
+
+    @defer.inlineCallbacks
+    def _move_aliases_to_new_room(
+            self, requester, old_room_id, new_room_id, old_room_state,
+    ):
+        directory_handler = self.hs.get_handlers().directory_handler
+
+        aliases = yield self.store.get_aliases_for_room(old_room_id)
+
+        # check to see if we have a canonical alias.
+        canonical_alias = None
+        canonical_alias_event_id = old_room_state.get((EventTypes.CanonicalAlias, ""))
+        if canonical_alias_event_id:
+            canonical_alias_event = yield self.store.get_event(canonical_alias_event_id)
+            if canonical_alias_event:
+                canonical_alias = canonical_alias_event.content.get("alias", "")
+
+        # first we try to remove the aliases from the old room (we suppress sending
+        # the room_aliases event until the end).
+        #
+        # Note that we'll only be able to remove aliases that (a) aren't owned by an AS,
+        # and (b) unless the user is a server admin, which the user created.
+        #
+        # This is probably correct - given we don't allow such aliases to be deleted
+        # normally, it would be odd to allow it in the case of doing a room upgrade -
+        # but it makes the upgrade less effective, and you have to wonder why a room
+        # admin can't remove aliases that point to that room anyway.
+        # (cf https://github.com/matrix-org/synapse/issues/2360)
+        #
+        removed_aliases = []
+        for alias_str in aliases:
+            alias = RoomAlias.from_string(alias_str)
+            try:
+                yield directory_handler.delete_association(
+                    requester, alias, send_event=False,
+                )
+                removed_aliases.append(alias_str)
+            except SynapseError as e:
+                logger.warning(
+                    "Unable to remove alias %s from old room: %s",
+                    alias, e,
+                )
+
+        # if we didn't find any aliases, or couldn't remove anyway, we can skip the rest
+        # of this.
+        if not removed_aliases:
+            return
+
+        try:
+            # this can fail if, for some reason, our user doesn't have perms to send
+            # m.room.aliases events in the old room (note that we've already checked that
+            # they have perms to send a tombstone event, so that's not terribly likely).
+            #
+            # If that happens, it's regrettable, but we should carry on: it's the same
+            # as when you remove an alias from the directory normally - it just means that
+            # the aliases event gets out of sync with the directory
+            # (cf https://github.com/vector-im/riot-web/issues/2369)
+            yield directory_handler.send_room_alias_update_event(
+                requester, old_room_id,
+            )
+        except AuthError as e:
+            logger.warning(
+                "Failed to send updated alias event on old room: %s", e,
+            )
+
+        # we can now add any aliases we successfully removed to the new room.
+        for alias in removed_aliases:
+            try:
+                yield directory_handler.create_association(
+                    requester, RoomAlias.from_string(alias),
+                    new_room_id, servers=(self.hs.hostname, ),
+                    send_event=False,
+                )
+                logger.info("Moved alias %s to new room", alias)
+            except SynapseError as e:
+                # I'm not really expecting this to happen, but it could if the spam
+                # checking module decides it shouldn't, or similar.
+                logger.error(
+                    "Error adding alias %s to new room: %s",
+                    alias, e,
+                )
+
+        try:
+            if canonical_alias and (canonical_alias in removed_aliases):
+                yield self.event_creation_handler.create_and_send_nonmember_event(
+                    requester,
+                    {
+                        "type": EventTypes.CanonicalAlias,
+                        "state_key": "",
+                        "room_id": new_room_id,
+                        "sender": requester.user.to_string(),
+                        "content": {"alias": canonical_alias, },
+                    },
+                    ratelimit=False
+                )
+
+            yield directory_handler.send_room_alias_update_event(
+                requester, new_room_id,
+            )
+        except SynapseError as e:
+            # again I'm not really expecting this to fail, but if it does, I'd rather
+            # we returned the new room to the client at this point.
+            logger.error(
+                "Unable to send updated alias events in new room: %s", e,
+            )
 
     @defer.inlineCallbacks
     def create_room(self, requester, config, ratelimit=True,
@@ -90,15 +428,34 @@ class RoomCreationHandler(BaseHandler):
         Raises:
             SynapseError if the room ID couldn't be stored, or something went
             horribly wrong.
+            ResourceLimitError if server is blocked to some resource being
+            exceeded
         """
         user_id = requester.user.to_string()
 
+        self.auth.check_auth_blocking(user_id)
+
         if not self.spam_checker.user_may_create_room(user_id):
             raise SynapseError(403, "You are not permitted to create rooms")
 
         if ratelimit:
             yield self.ratelimit(requester)
 
+        room_version = config.get("room_version", DEFAULT_ROOM_VERSION)
+        if not isinstance(room_version, string_types):
+            raise SynapseError(
+                400,
+                "room_version must be a string",
+                Codes.BAD_JSON,
+            )
+
+        if room_version not in KNOWN_ROOM_VERSIONS:
+            raise SynapseError(
+                400,
+                "Your homeserver does not support this room version",
+                Codes.UNSUPPORTED_ROOM_VERSION,
+            )
+
         if "room_alias_name" in config:
             for wchar in string.whitespace:
                 if wchar in config["room_alias_name"]:
@@ -137,36 +494,16 @@ class RoomCreationHandler(BaseHandler):
         visibility = config.get("visibility", None)
         is_public = visibility == "public"
 
-        # autogen room IDs and try to create it. We may clash, so just
-        # try a few times till one goes through, giving up eventually.
-        attempts = 0
-        room_id = None
-        while attempts < 5:
-            try:
-                random_string = stringutils.random_string(18)
-                gen_room_id = RoomID(
-                    random_string,
-                    self.hs.hostname,
-                )
-                yield self.store.store_room(
-                    room_id=gen_room_id.to_string(),
-                    room_creator_user_id=user_id,
-                    is_public=is_public
-                )
-                room_id = gen_room_id.to_string()
-                break
-            except StoreError:
-                attempts += 1
-        if not room_id:
-            raise StoreError(500, "Couldn't generate a room ID.")
+        room_id = yield self._generate_room_id(creator_id=user_id, is_public=is_public)
 
         if room_alias:
             directory_handler = self.hs.get_handlers().directory_handler
             yield directory_handler.create_association(
-                user_id=user_id,
+                requester=requester,
                 room_id=room_id,
                 room_alias=room_alias,
                 servers=[self.hs.hostname],
+                send_event=False,
             )
 
         preset_config = config.get(
@@ -184,18 +521,18 @@ class RoomCreationHandler(BaseHandler):
 
         creation_content = config.get("creation_content", {})
 
-        room_member_handler = self.hs.get_room_member_handler()
+        # override any attempt to set room versions via the creation_content
+        creation_content["room_version"] = room_version
 
         yield self._send_events_for_new_room(
             requester,
             room_id,
-            room_member_handler,
             preset_config=preset_config,
             invite_list=invite_list,
             initial_state=initial_state,
             creation_content=creation_content,
             room_alias=room_alias,
-            power_level_content_override=config.get("power_level_content_override", {}),
+            power_level_content_override=config.get("power_level_content_override"),
             creator_join_profile=creator_join_profile,
         )
 
@@ -231,7 +568,7 @@ class RoomCreationHandler(BaseHandler):
             if is_direct:
                 content["is_direct"] = is_direct
 
-            yield room_member_handler.update_membership(
+            yield self.room_member_handler.update_membership(
                 requester,
                 UserID.from_string(invitee),
                 room_id,
@@ -259,7 +596,7 @@ class RoomCreationHandler(BaseHandler):
         if room_alias:
             result["room_alias"] = room_alias.to_string()
             yield directory_handler.send_room_alias_update_event(
-                requester, user_id, room_id
+                requester, room_id
             )
 
         defer.returnValue(result)
@@ -269,14 +606,13 @@ class RoomCreationHandler(BaseHandler):
             self,
             creator,  # A Requester object.
             room_id,
-            room_member_handler,
             preset_config,
             invite_list,
             initial_state,
             creation_content,
-            room_alias,
-            power_level_content_override,
-            creator_join_profile,
+            room_alias=None,
+            power_level_content_override=None,
+            creator_join_profile=None,
     ):
         def create(etype, content, **kwargs):
             e = {
@@ -292,6 +628,7 @@ class RoomCreationHandler(BaseHandler):
         @defer.inlineCallbacks
         def send(etype, content, **kwargs):
             event = create(etype, content, **kwargs)
+            logger.info("Sending %s in new room", etype)
             yield self.event_creation_handler.create_and_send_nonmember_event(
                 creator,
                 event,
@@ -314,7 +651,8 @@ class RoomCreationHandler(BaseHandler):
             content=creation_content,
         )
 
-        yield room_member_handler.update_membership(
+        logger.info("Sending %s in new room", EventTypes.Member)
+        yield self.room_member_handler.update_membership(
             creator,
             creator.user,
             room_id,
@@ -356,7 +694,8 @@ class RoomCreationHandler(BaseHandler):
                 for invitee in invite_list:
                     power_level_content["users"][invitee] = 100
 
-            power_level_content.update(power_level_content_override)
+            if power_level_content_override:
+                power_level_content.update(power_level_content_override)
 
             yield send(
                 etype=EventTypes.PowerLevels,
@@ -395,6 +734,30 @@ class RoomCreationHandler(BaseHandler):
                 content=content,
             )
 
+    @defer.inlineCallbacks
+    def _generate_room_id(self, creator_id, is_public):
+        # autogen room IDs and try to create it. We may clash, so just
+        # try a few times till one goes through, giving up eventually.
+        attempts = 0
+        while attempts < 5:
+            try:
+                random_string = stringutils.random_string(18)
+                gen_room_id = RoomID(
+                    random_string,
+                    self.hs.hostname,
+                ).to_string()
+                if isinstance(gen_room_id, bytes):
+                    gen_room_id = gen_room_id.decode('utf-8')
+                yield self.store.store_room(
+                    room_id=gen_room_id,
+                    room_creator_user_id=creator_id,
+                    is_public=is_public,
+                )
+                defer.returnValue(gen_room_id)
+            except StoreError:
+                attempts += 1
+        raise StoreError(500, "Couldn't generate a room ID.")
+
 
 class RoomContextHandler(object):
     def __init__(self, hs):
@@ -458,23 +821,24 @@ class RoomContextHandler(object):
         else:
             last_event_id = event_id
 
-        types = None
-        filtered_types = None
         if event_filter and event_filter.lazy_load_members():
-            members = set(ev.sender for ev in itertools.chain(
-                results["events_before"],
-                (results["event"],),
-                results["events_after"],
-            ))
-            filtered_types = [EventTypes.Member]
-            types = [(EventTypes.Member, member) for member in members]
+            state_filter = StateFilter.from_lazy_load_member_list(
+                ev.sender
+                for ev in itertools.chain(
+                    results["events_before"],
+                    (results["event"],),
+                    results["events_after"],
+                )
+            )
+        else:
+            state_filter = StateFilter.all()
 
         # XXX: why do we return the state as of the last event rather than the
         # first? Shouldn't we be consistent with /sync?
         # https://github.com/matrix-org/matrix-doc/issues/687
 
         state = yield self.store.get_state_for_events(
-            [last_event_id], types, filtered_types=filtered_types,
+            [last_event_id], state_filter=state_filter,
         )
         results["state"] = list(state[last_event_id].values())
 
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index 828229f5c3..dc88620885 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -16,7 +16,7 @@
 import logging
 from collections import namedtuple
 
-from six import iteritems
+from six import PY3, iteritems
 from six.moves import range
 
 import msgpack
@@ -26,7 +26,7 @@ from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, JoinRules
 from synapse.types import ThirdPartyInstanceID
-from synapse.util.async import concurrently_execute
+from synapse.util.async_helpers import concurrently_execute
 from synapse.util.caches.descriptors import cachedInlineCallbacks
 from synapse.util.caches.response_cache import ResponseCache
 
@@ -162,7 +162,7 @@ class RoomListHandler(BaseHandler):
         # Filter out rooms that we don't want to return
         rooms_to_scan = [
             r for r in sorted_rooms
-            if r not in newly_unpublished and rooms_to_num_joined[room_id] > 0
+            if r not in newly_unpublished and rooms_to_num_joined[r] > 0
         ]
 
         total_room_count = len(rooms_to_scan)
@@ -444,9 +444,16 @@ class RoomListNextBatch(namedtuple("RoomListNextBatch", (
 
     @classmethod
     def from_token(cls, token):
+        if PY3:
+            # The argument raw=False is only available on new versions of
+            # msgpack, and only really needed on Python 3. Gate it behind
+            # a PY3 check to avoid causing issues on Debian-packaged versions.
+            decoded = msgpack.loads(decode_base64(token), raw=False)
+        else:
+            decoded = msgpack.loads(decode_base64(token))
         return RoomListNextBatch(**{
             cls.REVERSE_KEY_DICT[key]: val
-            for key, val in msgpack.loads(decode_base64(token)).items()
+            for key, val in decoded.items()
         })
 
     def to_token(self):
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 0d4a3f4677..07fd3e82fc 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -30,7 +30,7 @@ import synapse.types
 from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import AuthError, Codes, SynapseError
 from synapse.types import RoomID, UserID
-from synapse.util.async import Linearizer
+from synapse.util.async_helpers import Linearizer
 from synapse.util.distributor import user_joined_room, user_left_room
 
 logger = logging.getLogger(__name__)
@@ -344,6 +344,7 @@ class RoomMemberHandler(object):
         latest_event_ids = (
             event_id for (event_id, _, _) in prev_events_and_hashes
         )
+
         current_state_ids = yield self.state_handler.get_current_state_ids(
             room_id, latest_event_ids=latest_event_ids,
         )
@@ -582,6 +583,11 @@ class RoomMemberHandler(object):
         room_id = mapping["room_id"]
         servers = mapping["servers"]
 
+        # put the server which owns the alias at the front of the server list.
+        if room_alias.domain in servers:
+            servers.remove(room_alias.domain)
+        servers.insert(0, room_alias.domain)
+
         defer.returnValue((RoomID.from_string(room_id), servers))
 
     @defer.inlineCallbacks
diff --git a/synapse/handlers/room_member_worker.py b/synapse/handlers/room_member_worker.py
index 22d8b4b0d3..acc6eb8099 100644
--- a/synapse/handlers/room_member_worker.py
+++ b/synapse/handlers/room_member_worker.py
@@ -20,16 +20,24 @@ from twisted.internet import defer
 from synapse.api.errors import SynapseError
 from synapse.handlers.room_member import RoomMemberHandler
 from synapse.replication.http.membership import (
-    get_or_register_3pid_guest,
-    notify_user_membership_change,
-    remote_join,
-    remote_reject_invite,
+    ReplicationRegister3PIDGuestRestServlet as Repl3PID,
+    ReplicationRemoteJoinRestServlet as ReplRemoteJoin,
+    ReplicationRemoteRejectInviteRestServlet as ReplRejectInvite,
+    ReplicationUserJoinedLeftRoomRestServlet as ReplJoinedLeft,
 )
 
 logger = logging.getLogger(__name__)
 
 
 class RoomMemberWorkerHandler(RoomMemberHandler):
+    def __init__(self, hs):
+        super(RoomMemberWorkerHandler, self).__init__(hs)
+
+        self._get_register_3pid_client = Repl3PID.make_client(hs)
+        self._remote_join_client = ReplRemoteJoin.make_client(hs)
+        self._remote_reject_client = ReplRejectInvite.make_client(hs)
+        self._notify_change_client = ReplJoinedLeft.make_client(hs)
+
     @defer.inlineCallbacks
     def _remote_join(self, requester, remote_room_hosts, room_id, user, content):
         """Implements RoomMemberHandler._remote_join
@@ -37,10 +45,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
         if len(remote_room_hosts) == 0:
             raise SynapseError(404, "No known servers")
 
-        ret = yield remote_join(
-            self.simple_http_client,
-            host=self.config.worker_replication_host,
-            port=self.config.worker_replication_http_port,
+        ret = yield self._remote_join_client(
             requester=requester,
             remote_room_hosts=remote_room_hosts,
             room_id=room_id,
@@ -55,10 +60,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
     def _remote_reject_invite(self, requester, remote_room_hosts, room_id, target):
         """Implements RoomMemberHandler._remote_reject_invite
         """
-        return remote_reject_invite(
-            self.simple_http_client,
-            host=self.config.worker_replication_host,
-            port=self.config.worker_replication_http_port,
+        return self._remote_reject_client(
             requester=requester,
             remote_room_hosts=remote_room_hosts,
             room_id=room_id,
@@ -68,10 +70,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
     def _user_joined_room(self, target, room_id):
         """Implements RoomMemberHandler._user_joined_room
         """
-        return notify_user_membership_change(
-            self.simple_http_client,
-            host=self.config.worker_replication_host,
-            port=self.config.worker_replication_http_port,
+        return self._notify_change_client(
             user_id=target.to_string(),
             room_id=room_id,
             change="joined",
@@ -80,10 +79,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
     def _user_left_room(self, target, room_id):
         """Implements RoomMemberHandler._user_left_room
         """
-        return notify_user_membership_change(
-            self.simple_http_client,
-            host=self.config.worker_replication_host,
-            port=self.config.worker_replication_http_port,
+        return self._notify_change_client(
             user_id=target.to_string(),
             room_id=room_id,
             change="left",
@@ -92,10 +88,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
     def get_or_register_3pid_guest(self, requester, medium, address, inviter_user_id):
         """Implements RoomMemberHandler.get_or_register_3pid_guest
         """
-        return get_or_register_3pid_guest(
-            self.simple_http_client,
-            host=self.config.worker_replication_host,
-            port=self.config.worker_replication_http_port,
+        return self._get_register_3pid_client(
             requester=requester,
             medium=medium,
             address=address,
diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
index c464adbd0b..ec936bbb4e 100644
--- a/synapse/handlers/search.py
+++ b/synapse/handlers/search.py
@@ -24,6 +24,7 @@ from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import SynapseError
 from synapse.api.filtering import Filter
 from synapse.events.utils import serialize_event
+from synapse.storage.state import StateFilter
 from synapse.visibility import filter_events_for_client
 
 from ._base import BaseHandler
@@ -49,12 +50,15 @@ class SearchHandler(BaseHandler):
             dict to be returned to the client with results of search
         """
 
+        if not self.hs.config.enable_search:
+            raise SynapseError(400, "Search is disabled on this homeserver")
+
         batch_group = None
         batch_group_key = None
         batch_token = None
         if batch:
             try:
-                b = decode_base64(batch)
+                b = decode_base64(batch).decode('ascii')
                 batch_group, batch_group_key, batch_token = b.split("\n")
 
                 assert batch_group is not None
@@ -258,18 +262,18 @@ class SearchHandler(BaseHandler):
                 # it returns more from the same group (if applicable) rather
                 # than reverting to searching all results again.
                 if batch_group and batch_group_key:
-                    global_next_batch = encode_base64("%s\n%s\n%s" % (
+                    global_next_batch = encode_base64(("%s\n%s\n%s" % (
                         batch_group, batch_group_key, pagination_token
-                    ))
+                    )).encode('ascii'))
                 else:
-                    global_next_batch = encode_base64("%s\n%s\n%s" % (
+                    global_next_batch = encode_base64(("%s\n%s\n%s" % (
                         "all", "", pagination_token
-                    ))
+                    )).encode('ascii'))
 
                 for room_id, group in room_groups.items():
-                    group["next_batch"] = encode_base64("%s\n%s\n%s" % (
+                    group["next_batch"] = encode_base64(("%s\n%s\n%s" % (
                         "room_id", room_id, pagination_token
-                    ))
+                    )).encode('ascii'))
 
             allowed_events.extend(room_events)
 
@@ -324,9 +328,12 @@ class SearchHandler(BaseHandler):
                     else:
                         last_event_id = event.event_id
 
+                    state_filter = StateFilter.from_types(
+                        [(EventTypes.Member, sender) for sender in senders]
+                    )
+
                     state = yield self.store.get_state_for_event(
-                        last_event_id,
-                        types=[(EventTypes.Member, sender) for sender in senders]
+                        last_event_id, state_filter
                     )
 
                     res["profile_info"] = {
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index dff1f67dcb..09739f2862 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -20,12 +20,16 @@ import logging
 
 from six import iteritems, itervalues
 
+from prometheus_client import Counter
+
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, Membership
 from synapse.push.clientformat import format_push_rules_for_user
+from synapse.storage.roommember import MemberSummary
+from synapse.storage.state import StateFilter
 from synapse.types import RoomStreamToken
-from synapse.util.async import concurrently_execute
+from synapse.util.async_helpers import concurrently_execute
 from synapse.util.caches.expiringcache import ExpiringCache
 from synapse.util.caches.lrucache import LruCache
 from synapse.util.caches.response_cache import ResponseCache
@@ -35,6 +39,19 @@ from synapse.visibility import filter_events_for_client
 
 logger = logging.getLogger(__name__)
 
+
+# Counts the number of times we returned a non-empty sync. `type` is one of
+# "initial_sync", "full_state_sync" or "incremental_sync", `lazy_loaded` is
+# "true" or "false" depending on if the request asked for lazy loaded members or
+# not.
+non_empty_sync_counter = Counter(
+    "synapse_handlers_sync_nonempty_total",
+    "Count of non empty sync responses. type is initial_sync/full_state_sync"
+    "/incremental_sync. lazy_loaded indicates if lazy loaded members were "
+    "enabled for that request.",
+    ["type", "lazy_loaded"],
+)
+
 # Store the cache that tracks which lazy-loaded members have been sent to a given
 # client for no more than 30 minutes.
 LAZY_LOADED_MEMBERS_CACHE_MAX_AGE = 30 * 60 * 1000
@@ -75,6 +92,7 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
     "ephemeral",
     "account_data",
     "unread_notifications",
+    "summary",
 ])):
     __slots__ = []
 
@@ -184,6 +202,7 @@ class SyncResult(collections.namedtuple("SyncResult", [
 class SyncHandler(object):
 
     def __init__(self, hs):
+        self.hs_config = hs.config
         self.store = hs.get_datastore()
         self.notifier = hs.get_notifier()
         self.presence_handler = hs.get_presence_handler()
@@ -191,6 +210,7 @@ class SyncHandler(object):
         self.clock = hs.get_clock()
         self.response_cache = ResponseCache(hs, "sync")
         self.state = hs.get_state_handler()
+        self.auth = hs.get_auth()
 
         # ExpiringCache((User, Device)) -> LruCache(state_key => event_id)
         self.lazy_loaded_members_cache = ExpiringCache(
@@ -198,31 +218,41 @@ class SyncHandler(object):
             max_len=0, expiry_ms=LAZY_LOADED_MEMBERS_CACHE_MAX_AGE,
         )
 
+    @defer.inlineCallbacks
     def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0,
                                full_state=False):
         """Get the sync for a client if we have new data for it now. Otherwise
         wait for new data to arrive on the server. If the timeout expires, then
         return an empty sync result.
         Returns:
-            A Deferred SyncResult.
+            Deferred[SyncResult]
         """
-        return self.response_cache.wrap(
+        # If the user is not part of the mau group, then check that limits have
+        # not been exceeded (if not part of the group by this point, almost certain
+        # auth_blocking will occur)
+        user_id = sync_config.user.to_string()
+        yield self.auth.check_auth_blocking(user_id)
+
+        res = yield self.response_cache.wrap(
             sync_config.request_key,
             self._wait_for_sync_for_user,
             sync_config, since_token, timeout, full_state,
         )
+        defer.returnValue(res)
 
     @defer.inlineCallbacks
     def _wait_for_sync_for_user(self, sync_config, since_token, timeout,
                                 full_state):
+        if since_token is None:
+            sync_type = "initial_sync"
+        elif full_state:
+            sync_type = "full_state_sync"
+        else:
+            sync_type = "incremental_sync"
+
         context = LoggingContext.current_context()
         if context:
-            if since_token is None:
-                context.tag = "initial_sync"
-            elif full_state:
-                context.tag = "full_state_sync"
-            else:
-                context.tag = "incremental_sync"
+            context.tag = sync_type
 
         if timeout == 0 or since_token is None or full_state:
             # we are going to return immediately, so don't bother calling
@@ -230,7 +260,6 @@ class SyncHandler(object):
             result = yield self.current_sync_for_user(
                 sync_config, since_token, full_state=full_state,
             )
-            defer.returnValue(result)
         else:
             def current_sync_callback(before_token, after_token):
                 return self.current_sync_for_user(sync_config, since_token)
@@ -239,7 +268,15 @@ class SyncHandler(object):
                 sync_config.user.to_string(), timeout, current_sync_callback,
                 from_token=since_token,
             )
-            defer.returnValue(result)
+
+        if result:
+            if sync_config.filter_collection.lazy_load_members():
+                lazy_loaded = "true"
+            else:
+                lazy_loaded = "false"
+            non_empty_sync_counter.labels(sync_type, lazy_loaded).inc()
+
+        defer.returnValue(result)
 
     def current_sync_for_user(self, sync_config, since_token=None,
                               full_state=False):
@@ -433,25 +470,20 @@ class SyncHandler(object):
         ))
 
     @defer.inlineCallbacks
-    def get_state_after_event(self, event, types=None, filtered_types=None):
+    def get_state_after_event(self, event, state_filter=StateFilter.all()):
         """
         Get the room state after the given event
 
         Args:
             event(synapse.events.EventBase): event of interest
-            types(list[(str, str|None)]|None): List of (type, state_key) tuples
-                which are used to filter the state fetched. If `state_key` is None,
-                all events are returned of the given type.
-                May be None, which matches any key.
-            filtered_types(list[str]|None): Only apply filtering via `types` to this
-                list of event types.  Other types of events are returned unfiltered.
-                If None, `types` filtering is applied to all events.
+            state_filter (StateFilter): The state filter used to fetch state
+                from the database.
 
         Returns:
             A Deferred map from ((type, state_key)->Event)
         """
         state_ids = yield self.store.get_state_ids_for_event(
-            event.event_id, types, filtered_types=filtered_types,
+            event.event_id, state_filter=state_filter,
         )
         if event.is_state():
             state_ids = state_ids.copy()
@@ -459,18 +491,14 @@ class SyncHandler(object):
         defer.returnValue(state_ids)
 
     @defer.inlineCallbacks
-    def get_state_at(self, room_id, stream_position, types=None, filtered_types=None):
+    def get_state_at(self, room_id, stream_position, state_filter=StateFilter.all()):
         """ Get the room state at a particular stream position
 
         Args:
             room_id(str): room for which to get state
             stream_position(StreamToken): point at which to get state
-            types(list[(str, str|None)]|None): List of (type, state_key) tuples
-                which are used to filter the state fetched. If `state_key` is None,
-                all events are returned of the given type.
-            filtered_types(list[str]|None): Only apply filtering via `types` to this
-                list of event types.  Other types of events are returned unfiltered.
-                If None, `types` filtering is applied to all events.
+            state_filter (StateFilter): The state filter used to fetch state
+                from the database.
 
         Returns:
             A Deferred map from ((type, state_key)->Event)
@@ -486,7 +514,7 @@ class SyncHandler(object):
         if last_events:
             last_event = last_events[-1]
             state = yield self.get_state_after_event(
-                last_event, types, filtered_types=filtered_types,
+                last_event, state_filter=state_filter,
             )
 
         else:
@@ -495,9 +523,171 @@ class SyncHandler(object):
         defer.returnValue(state)
 
     @defer.inlineCallbacks
+    def compute_summary(self, room_id, sync_config, batch, state, now_token):
+        """ Works out a room summary block for this room, summarising the number
+        of joined members in the room, and providing the 'hero' members if the
+        room has no name so clients can consistently name rooms.  Also adds
+        state events to 'state' if needed to describe the heroes.
+
+        Args:
+            room_id(str):
+            sync_config(synapse.handlers.sync.SyncConfig):
+            batch(synapse.handlers.sync.TimelineBatch): The timeline batch for
+                the room that will be sent to the user.
+            state(dict): dict of (type, state_key) -> Event as returned by
+                compute_state_delta
+            now_token(str): Token of the end of the current batch.
+
+        Returns:
+             A deferred dict describing the room summary
+        """
+
+        # FIXME: we could/should get this from room_stats when matthew/stats lands
+
+        # FIXME: this promulgates https://github.com/matrix-org/synapse/issues/3305
+        last_events, _ = yield self.store.get_recent_event_ids_for_room(
+            room_id, end_token=now_token.room_key, limit=1,
+        )
+
+        if not last_events:
+            defer.returnValue(None)
+            return
+
+        last_event = last_events[-1]
+        state_ids = yield self.store.get_state_ids_for_event(
+            last_event.event_id,
+            state_filter=StateFilter.from_types([
+                (EventTypes.Name, ''),
+                (EventTypes.CanonicalAlias, ''),
+            ]),
+        )
+
+        # this is heavily cached, thus: fast.
+        details = yield self.store.get_room_summary(room_id)
+
+        name_id = state_ids.get((EventTypes.Name, ''))
+        canonical_alias_id = state_ids.get((EventTypes.CanonicalAlias, ''))
+
+        summary = {}
+        empty_ms = MemberSummary([], 0)
+
+        # TODO: only send these when they change.
+        summary["m.joined_member_count"] = (
+            details.get(Membership.JOIN, empty_ms).count
+        )
+        summary["m.invited_member_count"] = (
+            details.get(Membership.INVITE, empty_ms).count
+        )
+
+        # if the room has a name or canonical_alias set, we can skip
+        # calculating heroes.  we assume that if the event has contents, it'll
+        # be a valid name or canonical_alias - i.e. we're checking that they
+        # haven't been "deleted" by blatting {} over the top.
+        if name_id:
+            name = yield self.store.get_event(name_id, allow_none=True)
+            if name and name.content:
+                defer.returnValue(summary)
+
+        if canonical_alias_id:
+            canonical_alias = yield self.store.get_event(
+                canonical_alias_id, allow_none=True,
+            )
+            if canonical_alias and canonical_alias.content:
+                defer.returnValue(summary)
+
+        joined_user_ids = [
+            r[0] for r in details.get(Membership.JOIN, empty_ms).members
+        ]
+        invited_user_ids = [
+            r[0] for r in details.get(Membership.INVITE, empty_ms).members
+        ]
+        gone_user_ids = (
+            [r[0] for r in details.get(Membership.LEAVE, empty_ms).members] +
+            [r[0] for r in details.get(Membership.BAN, empty_ms).members]
+        )
+
+        # FIXME: only build up a member_ids list for our heroes
+        member_ids = {}
+        for membership in (
+            Membership.JOIN,
+            Membership.INVITE,
+            Membership.LEAVE,
+            Membership.BAN
+        ):
+            for user_id, event_id in details.get(membership, empty_ms).members:
+                member_ids[user_id] = event_id
+
+        # FIXME: order by stream ordering rather than as returned by SQL
+        me = sync_config.user.to_string()
+        if (joined_user_ids or invited_user_ids):
+            summary['m.heroes'] = sorted(
+                [
+                    user_id
+                    for user_id in (joined_user_ids + invited_user_ids)
+                    if user_id != me
+                ]
+            )[0:5]
+        else:
+            summary['m.heroes'] = sorted(
+                [
+                    user_id
+                    for user_id in gone_user_ids
+                    if user_id != me
+                ]
+            )[0:5]
+
+        if not sync_config.filter_collection.lazy_load_members():
+            defer.returnValue(summary)
+
+        # ensure we send membership events for heroes if needed
+        cache_key = (sync_config.user.to_string(), sync_config.device_id)
+        cache = self.get_lazy_loaded_members_cache(cache_key)
+
+        # track which members the client should already know about via LL:
+        # Ones which are already in state...
+        existing_members = set(
+            user_id for (typ, user_id) in state.keys()
+            if typ == EventTypes.Member
+        )
+
+        # ...or ones which are in the timeline...
+        for ev in batch.events:
+            if ev.type == EventTypes.Member:
+                existing_members.add(ev.state_key)
+
+        # ...and then ensure any missing ones get included in state.
+        missing_hero_event_ids = [
+            member_ids[hero_id]
+            for hero_id in summary['m.heroes']
+            if (
+                cache.get(hero_id) != member_ids[hero_id] and
+                hero_id not in existing_members
+            )
+        ]
+
+        missing_hero_state = yield self.store.get_events(missing_hero_event_ids)
+        missing_hero_state = missing_hero_state.values()
+
+        for s in missing_hero_state:
+            cache.set(s.state_key, s.event_id)
+            state[(EventTypes.Member, s.state_key)] = s
+
+        defer.returnValue(summary)
+
+    def get_lazy_loaded_members_cache(self, cache_key):
+        cache = self.lazy_loaded_members_cache.get(cache_key)
+        if cache is None:
+            logger.debug("creating LruCache for %r", cache_key)
+            cache = LruCache(LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE)
+            self.lazy_loaded_members_cache[cache_key] = cache
+        else:
+            logger.debug("found LruCache for %r", cache_key)
+        return cache
+
+    @defer.inlineCallbacks
     def compute_state_delta(self, room_id, batch, sync_config, since_token, now_token,
                             full_state):
-        """ Works out the differnce in state between the start of the timeline
+        """ Works out the difference in state between the start of the timeline
         and the previous sync.
 
         Args:
@@ -511,7 +701,7 @@ class SyncHandler(object):
             full_state(bool): Whether to force returning the full state.
 
         Returns:
-             A deferred new event dictionary
+             A deferred dict of (type, state_key) -> Event
         """
         # TODO(mjark) Check if the state events were received by the server
         # after the previous sync, since we need to include those state
@@ -520,8 +710,7 @@ class SyncHandler(object):
 
         with Measure(self.clock, "compute_state_delta"):
 
-            types = None
-            filtered_types = None
+            members_to_fetch = None
 
             lazy_load_members = sync_config.filter_collection.lazy_load_members()
             include_redundant_members = (
@@ -532,16 +721,21 @@ class SyncHandler(object):
                 # We only request state for the members needed to display the
                 # timeline:
 
-                types = [
-                    (EventTypes.Member, state_key)
-                    for state_key in set(
-                        event.sender  # FIXME: we also care about invite targets etc.
-                        for event in batch.events
-                    )
-                ]
+                members_to_fetch = set(
+                    event.sender  # FIXME: we also care about invite targets etc.
+                    for event in batch.events
+                )
 
-                # only apply the filtering to room members
-                filtered_types = [EventTypes.Member]
+                if full_state:
+                    # always make sure we LL ourselves so we know we're in the room
+                    # (if we are) to fix https://github.com/vector-im/riot-web/issues/7209
+                    # We only need apply this on full state syncs given we disabled
+                    # LL for incr syncs in #3840.
+                    members_to_fetch.add(sync_config.user.to_string())
+
+                state_filter = StateFilter.from_lazy_load_member_list(members_to_fetch)
+            else:
+                state_filter = StateFilter.all()
 
             timeline_state = {
                 (event.type, event.state_key): event.event_id
@@ -551,19 +745,17 @@ class SyncHandler(object):
             if full_state:
                 if batch:
                     current_state_ids = yield self.store.get_state_ids_for_event(
-                        batch.events[-1].event_id, types=types,
-                        filtered_types=filtered_types,
+                        batch.events[-1].event_id, state_filter=state_filter,
                     )
 
                     state_ids = yield self.store.get_state_ids_for_event(
-                        batch.events[0].event_id, types=types,
-                        filtered_types=filtered_types,
+                        batch.events[0].event_id, state_filter=state_filter,
                     )
 
                 else:
                     current_state_ids = yield self.get_state_at(
-                        room_id, stream_position=now_token, types=types,
-                        filtered_types=filtered_types,
+                        room_id, stream_position=now_token,
+                        state_filter=state_filter,
                     )
 
                     state_ids = current_state_ids
@@ -576,19 +768,31 @@ class SyncHandler(object):
                     lazy_load_members=lazy_load_members,
                 )
             elif batch.limited:
-                state_at_previous_sync = yield self.get_state_at(
-                    room_id, stream_position=since_token, types=types,
-                    filtered_types=filtered_types,
+                state_at_timeline_start = yield self.store.get_state_ids_for_event(
+                    batch.events[0].event_id, state_filter=state_filter,
                 )
 
-                current_state_ids = yield self.store.get_state_ids_for_event(
-                    batch.events[-1].event_id, types=types,
-                    filtered_types=filtered_types,
+                # for now, we disable LL for gappy syncs - see
+                # https://github.com/vector-im/riot-web/issues/7211#issuecomment-419976346
+                # N.B. this slows down incr syncs as we are now processing way
+                # more state in the server than if we were LLing.
+                #
+                # We still have to filter timeline_start to LL entries (above) in order
+                # for _calculate_state's LL logic to work, as we have to include LL
+                # members for timeline senders in case they weren't loaded in the initial
+                # sync.  We do this by (counterintuitively) by filtering timeline_start
+                # members to just be ones which were timeline senders, which then ensures
+                # all of the rest get included in the state block (if we need to know
+                # about them).
+                state_filter = StateFilter.all()
+
+                state_at_previous_sync = yield self.get_state_at(
+                    room_id, stream_position=since_token,
+                    state_filter=state_filter,
                 )
 
-                state_at_timeline_start = yield self.store.get_state_ids_for_event(
-                    batch.events[0].event_id, types=types,
-                    filtered_types=filtered_types,
+                current_state_ids = yield self.store.get_state_ids_for_event(
+                    batch.events[-1].event_id, state_filter=state_filter,
                 )
 
                 state_ids = _calculate_state(
@@ -596,26 +800,33 @@ class SyncHandler(object):
                     timeline_start=state_at_timeline_start,
                     previous=state_at_previous_sync,
                     current=current_state_ids,
+                    # we have to include LL members in case LL initial sync missed them
                     lazy_load_members=lazy_load_members,
                 )
             else:
                 state_ids = {}
                 if lazy_load_members:
-                    if types:
+                    if members_to_fetch and batch.events:
+                        # We're returning an incremental sync, with no
+                        # "gap" since the previous sync, so normally there would be
+                        # no state to return.
+                        # But we're lazy-loading, so the client might need some more
+                        # member events to understand the events in this timeline.
+                        # So we fish out all the member events corresponding to the
+                        # timeline here, and then dedupe any redundant ones below.
+
                         state_ids = yield self.store.get_state_ids_for_event(
-                            batch.events[0].event_id, types=types,
-                            filtered_types=filtered_types,
+                            batch.events[0].event_id,
+                            # we only want members!
+                            state_filter=StateFilter.from_types(
+                                (EventTypes.Member, member)
+                                for member in members_to_fetch
+                            ),
                         )
 
             if lazy_load_members and not include_redundant_members:
                 cache_key = (sync_config.user.to_string(), sync_config.device_id)
-                cache = self.lazy_loaded_members_cache.get(cache_key)
-                if cache is None:
-                    logger.debug("creating LruCache for %r", cache_key)
-                    cache = LruCache(LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE)
-                    self.lazy_loaded_members_cache[cache_key] = cache
-                else:
-                    logger.debug("found LruCache for %r", cache_key)
+                cache = self.get_lazy_loaded_members_cache(cache_key)
 
                 # if it's a new sync sequence, then assume the client has had
                 # amnesia and doesn't want any recent lazy-loaded members
@@ -630,7 +841,7 @@ class SyncHandler(object):
                     logger.debug("filtering state from %r...", state_ids)
                     state_ids = {
                         t: event_id
-                        for t, event_id in state_ids.iteritems()
+                        for t, event_id in iteritems(state_ids)
                         if cache.get(t[1]) != event_id
                     }
                     logger.debug("...to %r", state_ids)
@@ -724,7 +935,7 @@ class SyncHandler(object):
             since_token is None and
             sync_config.filter_collection.blocks_all_presence()
         )
-        if not block_all_presence_data:
+        if self.hs_config.use_presence and not block_all_presence_data:
             yield self._generate_sync_entry_for_presence(
                 sync_result_builder, newly_joined_rooms, newly_joined_users
             )
@@ -1416,7 +1627,6 @@ class SyncHandler(object):
             if events == [] and tags is None:
                 return
 
-        since_token = sync_result_builder.since_token
         now_token = sync_result_builder.now_token
         sync_config = sync_result_builder.sync_config
 
@@ -1432,6 +1642,19 @@ class SyncHandler(object):
             newly_joined_room=newly_joined,
         )
 
+        # When we join the room (or the client requests full_state), we should
+        # send down any existing tags. Usually the user won't have tags in a
+        # newly joined room, unless either a) they've joined before or b) the
+        # tag was added by synapse e.g. for server notice rooms.
+        if full_state:
+            user_id = sync_result_builder.sync_config.user.to_string()
+            tags = yield self.store.get_tags_for_room(user_id, room_id)
+
+            # If there aren't any tags, don't send the empty tags list down
+            # sync
+            if not tags:
+                tags = None
+
         account_data_events = []
         if tags is not None:
             account_data_events.append({
@@ -1459,6 +1682,32 @@ class SyncHandler(object):
             full_state=full_state
         )
 
+        summary = {}
+
+        # we include a summary in room responses when we're lazy loading
+        # members (as the client otherwise doesn't have enough info to form
+        # the name itself).
+        if (
+            sync_config.filter_collection.lazy_load_members() and
+            (
+                # we recalulate the summary:
+                #   if there are membership changes in the timeline, or
+                #   if membership has changed during a gappy sync, or
+                #   if this is an initial sync.
+                any(ev.type == EventTypes.Member for ev in batch.events) or
+                (
+                    # XXX: this may include false positives in the form of LL
+                    # members which have snuck into state
+                    batch.limited and
+                    any(t == EventTypes.Member for (t, k) in state)
+                ) or
+                since_token is None
+            )
+        ):
+            summary = yield self.compute_summary(
+                room_id, sync_config, batch, state, now_token
+            )
+
         if room_builder.rtype == "joined":
             unread_notifications = {}
             room_sync = JoinedSyncResult(
@@ -1468,6 +1717,7 @@ class SyncHandler(object):
                 ephemeral=ephemeral,
                 account_data=account_data_events,
                 unread_notifications=unread_notifications,
+                summary=summary,
             )
 
             if room_sync or always_include:
@@ -1480,6 +1730,16 @@ class SyncHandler(object):
                     unread_notifications["highlight_count"] = notifs["highlight_count"]
 
                 sync_result_builder.joined.append(room_sync)
+
+            if batch.limited and since_token:
+                user_id = sync_result_builder.sync_config.user.to_string()
+                logger.info(
+                    "Incremental gappy sync of %s for user %s with %d state events" % (
+                        room_id,
+                        user_id,
+                        len(state),
+                    )
+                )
         elif room_builder.rtype == "archived":
             room_sync = ArchivedSyncResult(
                 room_id=room_id,
@@ -1573,17 +1833,17 @@ def _calculate_state(
     event_id_to_key = {
         e: key
         for key, e in itertools.chain(
-            timeline_contains.items(),
-            previous.items(),
-            timeline_start.items(),
-            current.items(),
+            iteritems(timeline_contains),
+            iteritems(previous),
+            iteritems(timeline_start),
+            iteritems(current),
         )
     }
 
-    c_ids = set(e for e in current.values())
-    ts_ids = set(e for e in timeline_start.values())
-    p_ids = set(e for e in previous.values())
-    tc_ids = set(e for e in timeline_contains.values())
+    c_ids = set(e for e in itervalues(current))
+    ts_ids = set(e for e in itervalues(timeline_start))
+    p_ids = set(e for e in itervalues(previous))
+    tc_ids = set(e for e in itervalues(timeline_contains))
 
     # If we are lazyloading room members, we explicitly add the membership events
     # for the senders in the timeline into the state block returned by /sync,
@@ -1597,7 +1857,7 @@ def _calculate_state(
 
     if lazy_load_members:
         p_ids.difference_update(
-            e for t, e in timeline_start.iteritems()
+            e for t, e in iteritems(timeline_start)
             if t[0] == EventTypes.Member
         )
 
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 2d2d3d5a0d..a61bbf9392 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -20,6 +20,7 @@ from twisted.internet import defer
 
 from synapse.api.errors import AuthError, SynapseError
 from synapse.types import UserID, get_domain_from_id
+from synapse.util.caches.stream_change_cache import StreamChangeCache
 from synapse.util.logcontext import run_in_background
 from synapse.util.metrics import Measure
 from synapse.util.wheel_timer import WheelTimer
@@ -62,17 +63,28 @@ class TypingHandler(object):
         self._member_typing_until = {}  # clock time we expect to stop
         self._member_last_federation_poke = {}
 
-        # map room IDs to serial numbers
-        self._room_serials = {}
         self._latest_room_serial = 0
-        # map room IDs to sets of users currently typing
-        self._room_typing = {}
+        self._reset()
+
+        # caches which room_ids changed at which serials
+        self._typing_stream_change_cache = StreamChangeCache(
+            "TypingStreamChangeCache", self._latest_room_serial,
+        )
 
         self.clock.looping_call(
             self._handle_timeouts,
             5000,
         )
 
+    def _reset(self):
+        """
+        Reset the typing handler's data caches.
+        """
+        # map room IDs to serial numbers
+        self._room_serials = {}
+        # map room IDs to sets of users currently typing
+        self._room_typing = {}
+
     def _handle_timeouts(self):
         logger.info("Checking for typing timeouts")
 
@@ -218,6 +230,7 @@ class TypingHandler(object):
 
             for domain in set(get_domain_from_id(u) for u in users):
                 if domain != self.server_name:
+                    logger.debug("sending typing update to %s", domain)
                     self.federation.send_edu(
                         destination=domain,
                         edu_type="m.typing",
@@ -274,19 +287,29 @@ class TypingHandler(object):
 
         self._latest_room_serial += 1
         self._room_serials[member.room_id] = self._latest_room_serial
+        self._typing_stream_change_cache.entity_has_changed(
+            member.room_id, self._latest_room_serial,
+        )
 
         self.notifier.on_new_event(
             "typing_key", self._latest_room_serial, rooms=[member.room_id]
         )
 
     def get_all_typing_updates(self, last_id, current_id):
-        # TODO: Work out a way to do this without scanning the entire state.
         if last_id == current_id:
             return []
 
+        changed_rooms = self._typing_stream_change_cache.get_all_entities_changed(
+            last_id,
+        )
+
+        if changed_rooms is None:
+            changed_rooms = self._room_serials
+
         rows = []
-        for room_id, serial in self._room_serials.items():
-            if last_id < serial and serial <= current_id:
+        for room_id in changed_rooms:
+            serial = self._room_serials[room_id]
+            if last_id < serial <= current_id:
                 typing = self._room_typing[room_id]
                 rows.append((serial, room_id, list(typing)))
         rows.sort()
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index 37dda64587..f11b430126 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -20,6 +20,7 @@ from six import iteritems
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, JoinRules, Membership
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage.roommember import ProfileInfo
 from synapse.types import get_localpart_from_id
 from synapse.util.metrics import Measure
@@ -98,7 +99,6 @@ class UserDirectoryHandler(object):
         """
         return self.store.search_user_dir(user_id, search_term, limit)
 
-    @defer.inlineCallbacks
     def notify_new_event(self):
         """Called when there may be more deltas to process
         """
@@ -108,17 +108,23 @@ class UserDirectoryHandler(object):
         if self._is_processing:
             return
 
+        @defer.inlineCallbacks
+        def process():
+            try:
+                yield self._unsafe_process()
+            finally:
+                self._is_processing = False
+
         self._is_processing = True
-        try:
-            yield self._unsafe_process()
-        finally:
-            self._is_processing = False
+        run_as_background_process("user_directory.notify_new_event", process)
 
     @defer.inlineCallbacks
     def handle_local_profile_change(self, user_id, profile):
         """Called to update index of our local user profiles when they change
         irrespective of any rooms the user may be in.
         """
+        # FIXME(#3714): We should probably do this in the same worker as all
+        # the other changes.
         yield self.store.update_profile_in_user_dir(
             user_id, profile.display_name, profile.avatar_url, None,
         )
@@ -127,6 +133,8 @@ class UserDirectoryHandler(object):
     def handle_user_deactivated(self, user_id):
         """Called when a user ID is deactivated
         """
+        # FIXME(#3714): We should probably do this in the same worker as all
+        # the other changes.
         yield self.store.remove_from_user_dir(user_id)
         yield self.store.remove_from_user_in_public_room(user_id)