summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/appservice.py17
-rw-r--r--synapse/handlers/auth.py47
-rw-r--r--synapse/handlers/deactivate_account.py4
-rw-r--r--synapse/handlers/directory.py135
-rw-r--r--synapse/handlers/e2e_keys.py7
-rw-r--r--synapse/handlers/e2e_room_keys.py289
-rw-r--r--synapse/handlers/federation.py413
-rw-r--r--synapse/handlers/groups_local.py18
-rw-r--r--synapse/handlers/initial_sync.py4
-rw-r--r--synapse/handlers/message.py50
-rw-r--r--synapse/handlers/pagination.py24
-rw-r--r--synapse/handlers/profile.py10
-rw-r--r--synapse/handlers/receipts.py2
-rw-r--r--synapse/handlers/register.py36
-rw-r--r--synapse/handlers/room.py428
-rw-r--r--synapse/handlers/room_list.py13
-rw-r--r--synapse/handlers/room_member.py5
-rw-r--r--synapse/handlers/search.py22
-rw-r--r--synapse/handlers/sync.py290
-rw-r--r--synapse/handlers/typing.py23
-rw-r--r--synapse/handlers/user_directory.py14
21 files changed, 1438 insertions, 413 deletions
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index f0f89af7dc..17eedf4dbf 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -28,6 +28,7 @@ from synapse.metrics import (
     event_processing_loop_room_count,
 )
 from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.util import log_failure
 from synapse.util.logcontext import make_deferred_yieldable, run_in_background
 from synapse.util.metrics import Measure
 
@@ -36,17 +37,6 @@ logger = logging.getLogger(__name__)
 events_processed_counter = Counter("synapse_handlers_appservice_events_processed", "")
 
 
-def log_failure(failure):
-    logger.error(
-        "Application Services Failure",
-        exc_info=(
-            failure.type,
-            failure.value,
-            failure.getTracebackObject()
-        )
-    )
-
-
 class ApplicationServicesHandler(object):
 
     def __init__(self, hs):
@@ -112,7 +102,10 @@ class ApplicationServicesHandler(object):
 
                         if not self.started_scheduler:
                             def start_scheduler():
-                                return self.scheduler.start().addErrback(log_failure)
+                                return self.scheduler.start().addErrback(
+                                    log_failure, "Application Services Failure",
+                                )
+
                             run_as_background_process("as_scheduler", start_scheduler)
                             self.started_scheduler = True
 
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 4a81bd2ba9..85fc1fc525 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -22,7 +22,7 @@ import bcrypt
 import pymacaroons
 from canonicaljson import json
 
-from twisted.internet import defer, threads
+from twisted.internet import defer
 from twisted.web.client import PartialDownloadError
 
 import synapse.util.stringutils as stringutils
@@ -37,8 +37,8 @@ from synapse.api.errors import (
 )
 from synapse.module_api import ModuleApi
 from synapse.types import UserID
+from synapse.util import logcontext
 from synapse.util.caches.expiringcache import ExpiringCache
-from synapse.util.logcontext import make_deferred_yieldable
 
 from ._base import BaseHandler
 
@@ -59,6 +59,7 @@ class AuthHandler(BaseHandler):
             LoginType.EMAIL_IDENTITY: self._check_email_identity,
             LoginType.MSISDN: self._check_msisdn,
             LoginType.DUMMY: self._check_dummy_auth,
+            LoginType.TERMS: self._check_terms_auth,
         }
         self.bcrypt_rounds = hs.config.bcrypt_rounds
 
@@ -431,6 +432,9 @@ class AuthHandler(BaseHandler):
     def _check_dummy_auth(self, authdict, _):
         return defer.succeed(True)
 
+    def _check_terms_auth(self, authdict, _):
+        return defer.succeed(True)
+
     @defer.inlineCallbacks
     def _check_threepid(self, medium, authdict):
         if 'threepid_creds' not in authdict:
@@ -462,6 +466,22 @@ class AuthHandler(BaseHandler):
     def _get_params_recaptcha(self):
         return {"public_key": self.hs.config.recaptcha_public_key}
 
+    def _get_params_terms(self):
+        return {
+            "policies": {
+                "privacy_policy": {
+                    "version": self.hs.config.user_consent_version,
+                    "en": {
+                        "name": "Privacy Policy",
+                        "url": "%s/_matrix/consent?v=%s" % (
+                            self.hs.config.public_baseurl,
+                            self.hs.config.user_consent_version,
+                        ),
+                    },
+                },
+            },
+        }
+
     def _auth_dict_for_flows(self, flows, session):
         public_flows = []
         for f in flows:
@@ -469,6 +489,7 @@ class AuthHandler(BaseHandler):
 
         get_params = {
             LoginType.RECAPTCHA: self._get_params_recaptcha,
+            LoginType.TERMS: self._get_params_terms,
         }
 
         params = {}
@@ -884,40 +905,32 @@ class AuthHandler(BaseHandler):
                 bcrypt.gensalt(self.bcrypt_rounds),
             ).decode('ascii')
 
-        return make_deferred_yieldable(
-            threads.deferToThreadPool(
-                self.hs.get_reactor(), self.hs.get_reactor().getThreadPool(), _do_hash
-            ),
-        )
+        return logcontext.defer_to_thread(self.hs.get_reactor(), _do_hash)
 
     def validate_hash(self, password, stored_hash):
         """Validates that self.hash(password) == stored_hash.
 
         Args:
             password (unicode): Password to hash.
-            stored_hash (unicode): Expected hash value.
+            stored_hash (bytes): Expected hash value.
 
         Returns:
             Deferred(bool): Whether self.hash(password) == stored_hash.
         """
-
         def _do_validate_hash():
             # Normalise the Unicode in the password
             pw = unicodedata.normalize("NFKC", password)
 
             return bcrypt.checkpw(
                 pw.encode('utf8') + self.hs.config.password_pepper.encode("utf8"),
-                stored_hash.encode('utf8')
+                stored_hash
             )
 
         if stored_hash:
-            return make_deferred_yieldable(
-                threads.deferToThreadPool(
-                    self.hs.get_reactor(),
-                    self.hs.get_reactor().getThreadPool(),
-                    _do_validate_hash,
-                ),
-            )
+            if not isinstance(stored_hash, bytes):
+                stored_hash = stored_hash.encode('ascii')
+
+            return logcontext.defer_to_thread(self.hs.get_reactor(), _do_validate_hash)
         else:
             return defer.succeed(False)
 
diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py
index b078df4a76..75fe50c42c 100644
--- a/synapse/handlers/deactivate_account.py
+++ b/synapse/handlers/deactivate_account.py
@@ -17,8 +17,8 @@ import logging
 from twisted.internet import defer
 
 from synapse.api.errors import SynapseError
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.types import UserID, create_requester
-from synapse.util.logcontext import run_in_background
 
 from ._base import BaseHandler
 
@@ -121,7 +121,7 @@ class DeactivateAccountHandler(BaseHandler):
             None
         """
         if not self._user_parter_running:
-            run_in_background(self._user_parter_loop)
+            run_as_background_process("user_parter_loop", self._user_parter_loop)
 
     @defer.inlineCallbacks
     def _user_parter_loop(self):
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index ef866da1b6..0699731c13 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -20,7 +20,14 @@ import string
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes
-from synapse.api.errors import AuthError, CodeMessageException, Codes, SynapseError
+from synapse.api.errors import (
+    AuthError,
+    CodeMessageException,
+    Codes,
+    NotFoundError,
+    StoreError,
+    SynapseError,
+)
 from synapse.types import RoomAlias, UserID, get_domain_from_id
 
 from ._base import BaseHandler
@@ -36,6 +43,7 @@ class DirectoryHandler(BaseHandler):
         self.state = hs.get_state_handler()
         self.appservice_handler = hs.get_application_service_handler()
         self.event_creation_handler = hs.get_event_creation_handler()
+        self.config = hs.config
 
         self.federation = hs.get_federation_client()
         hs.get_federation_registry().register_query_handler(
@@ -73,43 +81,96 @@ class DirectoryHandler(BaseHandler):
         )
 
     @defer.inlineCallbacks
-    def create_association(self, user_id, room_alias, room_id, servers=None):
-        # association creation for human users
-        # TODO(erikj): Do user auth.
+    def create_association(self, requester, room_alias, room_id, servers=None,
+                           send_event=True):
+        """Attempt to create a new alias
 
-        if not self.spam_checker.user_may_create_room_alias(user_id, room_alias):
-            raise SynapseError(
-                403, "This user is not permitted to create this alias",
-            )
+        Args:
+            requester (Requester)
+            room_alias (RoomAlias)
+            room_id (str)
+            servers (list[str]|None): List of servers that others servers
+                should try and join via
+            send_event (bool): Whether to send an updated m.room.aliases event
 
-        can_create = yield self.can_modify_alias(
-            room_alias,
-            user_id=user_id
-        )
-        if not can_create:
-            raise SynapseError(
-                400, "This alias is reserved by an application service.",
-                errcode=Codes.EXCLUSIVE
+        Returns:
+            Deferred
+        """
+
+        user_id = requester.user.to_string()
+
+        service = requester.app_service
+        if service:
+            if not service.is_interested_in_alias(room_alias.to_string()):
+                raise SynapseError(
+                    400, "This application service has not reserved"
+                    " this kind of alias.", errcode=Codes.EXCLUSIVE
+                )
+        else:
+            if not self.spam_checker.user_may_create_room_alias(user_id, room_alias):
+                raise AuthError(
+                    403, "This user is not permitted to create this alias",
+                )
+
+            if not self.config.is_alias_creation_allowed(user_id, room_alias.to_string()):
+                # Lets just return a generic message, as there may be all sorts of
+                # reasons why we said no. TODO: Allow configurable error messages
+                # per alias creation rule?
+                raise SynapseError(
+                    403, "Not allowed to create alias",
+                )
+
+            can_create = yield self.can_modify_alias(
+                room_alias,
+                user_id=user_id
             )
+            if not can_create:
+                raise AuthError(
+                    400, "This alias is reserved by an application service.",
+                    errcode=Codes.EXCLUSIVE
+                )
+
         yield self._create_association(room_alias, room_id, servers, creator=user_id)
+        if send_event:
+            yield self.send_room_alias_update_event(
+                requester,
+                room_id
+            )
 
     @defer.inlineCallbacks
-    def create_appservice_association(self, service, room_alias, room_id,
-                                      servers=None):
-        if not service.is_interested_in_alias(room_alias.to_string()):
-            raise SynapseError(
-                400, "This application service has not reserved"
-                " this kind of alias.", errcode=Codes.EXCLUSIVE
-            )
+    def delete_association(self, requester, room_alias, send_event=True):
+        """Remove an alias from the directory
 
-        # association creation for app services
-        yield self._create_association(room_alias, room_id, servers)
+        (this is only meant for human users; AS users should call
+        delete_appservice_association)
 
-    @defer.inlineCallbacks
-    def delete_association(self, requester, user_id, room_alias):
-        # association deletion for human users
+        Args:
+            requester (Requester):
+            room_alias (RoomAlias):
+            send_event (bool): Whether to send an updated m.room.aliases event.
+                Note that, if we delete the canonical alias, we will always attempt
+                to send an m.room.canonical_alias event
+
+        Returns:
+            Deferred[unicode]: room id that the alias used to point to
+
+        Raises:
+            NotFoundError: if the alias doesn't exist
+
+            AuthError: if the user doesn't have perms to delete the alias (ie, the user
+                is neither the creator of the alias, nor a server admin.
+
+            SynapseError: if the alias belongs to an AS
+        """
+        user_id = requester.user.to_string()
+
+        try:
+            can_delete = yield self._user_can_delete_alias(room_alias, user_id)
+        except StoreError as e:
+            if e.code == 404:
+                raise NotFoundError("Unknown room alias")
+            raise
 
-        can_delete = yield self._user_can_delete_alias(room_alias, user_id)
         if not can_delete:
             raise AuthError(
                 403, "You don't have permission to delete the alias.",
@@ -128,11 +189,11 @@ class DirectoryHandler(BaseHandler):
         room_id = yield self._delete_association(room_alias)
 
         try:
-            yield self.send_room_alias_update_event(
-                requester,
-                requester.user.to_string(),
-                room_id
-            )
+            if send_event:
+                yield self.send_room_alias_update_event(
+                    requester,
+                    room_id
+                )
 
             yield self._update_canonical_alias(
                 requester,
@@ -248,7 +309,7 @@ class DirectoryHandler(BaseHandler):
             )
 
     @defer.inlineCallbacks
-    def send_room_alias_update_event(self, requester, user_id, room_id):
+    def send_room_alias_update_event(self, requester, room_id):
         aliases = yield self.store.get_aliases_for_room(room_id)
 
         yield self.event_creation_handler.create_and_send_nonmember_event(
@@ -257,7 +318,7 @@ class DirectoryHandler(BaseHandler):
                 "type": EventTypes.Aliases,
                 "state_key": self.hs.hostname,
                 "room_id": room_id,
-                "sender": user_id,
+                "sender": requester.user.to_string(),
                 "content": {"aliases": aliases},
             },
             ratelimit=False
@@ -320,7 +381,7 @@ class DirectoryHandler(BaseHandler):
     def _user_can_delete_alias(self, alias, user_id):
         creator = yield self.store.get_room_alias_creator(alias.to_string())
 
-        if creator and creator == user_id:
+        if creator is not None and creator == user_id:
             defer.returnValue(True)
 
         is_admin = yield self.auth.is_server_admin(UserID.from_string(user_id))
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 5816bf8b4f..9dc46aa15f 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -330,7 +330,8 @@ class E2eKeysHandler(object):
                         (algorithm, key_id, ex_json, key)
                     )
             else:
-                new_keys.append((algorithm, key_id, encode_canonical_json(key)))
+                new_keys.append((
+                    algorithm, key_id, encode_canonical_json(key).decode('ascii')))
 
         yield self.store.add_e2e_one_time_keys(
             user_id, device_id, time_now, new_keys
@@ -340,7 +341,7 @@ class E2eKeysHandler(object):
 def _exception_to_failure(e):
     if isinstance(e, CodeMessageException):
         return {
-            "status": e.code, "message": e.message,
+            "status": e.code, "message": str(e),
         }
 
     if isinstance(e, NotRetryingDestination):
@@ -358,7 +359,7 @@ def _exception_to_failure(e):
     # Note that some Exceptions (notably twisted's ResponseFailed etc) don't
     # give a string for e.message, which json then fails to serialize.
     return {
-        "status": 503, "message": str(e.message),
+        "status": 503, "message": str(e),
     }
 
 
diff --git a/synapse/handlers/e2e_room_keys.py b/synapse/handlers/e2e_room_keys.py
new file mode 100644
index 0000000000..5edb3cfe04
--- /dev/null
+++ b/synapse/handlers/e2e_room_keys.py
@@ -0,0 +1,289 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017, 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from six import iteritems
+
+from twisted.internet import defer
+
+from synapse.api.errors import RoomKeysVersionError, StoreError, SynapseError
+from synapse.util.async_helpers import Linearizer
+
+logger = logging.getLogger(__name__)
+
+
+class E2eRoomKeysHandler(object):
+    """
+    Implements an optional realtime backup mechanism for encrypted E2E megolm room keys.
+    This gives a way for users to store and recover their megolm keys if they lose all
+    their clients. It should also extend easily to future room key mechanisms.
+    The actual payload of the encrypted keys is completely opaque to the handler.
+    """
+
+    def __init__(self, hs):
+        self.store = hs.get_datastore()
+
+        # Used to lock whenever a client is uploading key data.  This prevents collisions
+        # between clients trying to upload the details of a new session, given all
+        # clients belonging to a user will receive and try to upload a new session at
+        # roughly the same time.  Also used to lock out uploads when the key is being
+        # changed.
+        self._upload_linearizer = Linearizer("upload_room_keys_lock")
+
+    @defer.inlineCallbacks
+    def get_room_keys(self, user_id, version, room_id=None, session_id=None):
+        """Bulk get the E2E room keys for a given backup, optionally filtered to a given
+        room, or a given session.
+        See EndToEndRoomKeyStore.get_e2e_room_keys for full details.
+
+        Args:
+            user_id(str): the user whose keys we're getting
+            version(str): the version ID of the backup we're getting keys from
+            room_id(string): room ID to get keys for, for None to get keys for all rooms
+            session_id(string): session ID to get keys for, for None to get keys for all
+                sessions
+        Returns:
+            A deferred list of dicts giving the session_data and message metadata for
+            these room keys.
+        """
+
+        # we deliberately take the lock to get keys so that changing the version
+        # works atomically
+        with (yield self._upload_linearizer.queue(user_id)):
+            results = yield self.store.get_e2e_room_keys(
+                user_id, version, room_id, session_id
+            )
+
+            if results['rooms'] == {}:
+                raise SynapseError(404, "No room_keys found")
+
+            defer.returnValue(results)
+
+    @defer.inlineCallbacks
+    def delete_room_keys(self, user_id, version, room_id=None, session_id=None):
+        """Bulk delete the E2E room keys for a given backup, optionally filtered to a given
+        room or a given session.
+        See EndToEndRoomKeyStore.delete_e2e_room_keys for full details.
+
+        Args:
+            user_id(str): the user whose backup we're deleting
+            version(str): the version ID of the backup we're deleting
+            room_id(string): room ID to delete keys for, for None to delete keys for all
+                rooms
+            session_id(string): session ID to delete keys for, for None to delete keys
+                for all sessions
+        Returns:
+            A deferred of the deletion transaction
+        """
+
+        # lock for consistency with uploading
+        with (yield self._upload_linearizer.queue(user_id)):
+            yield self.store.delete_e2e_room_keys(user_id, version, room_id, session_id)
+
+    @defer.inlineCallbacks
+    def upload_room_keys(self, user_id, version, room_keys):
+        """Bulk upload a list of room keys into a given backup version, asserting
+        that the given version is the current backup version.  room_keys are merged
+        into the current backup as described in RoomKeysServlet.on_PUT().
+
+        Args:
+            user_id(str): the user whose backup we're setting
+            version(str): the version ID of the backup we're updating
+            room_keys(dict): a nested dict describing the room_keys we're setting:
+
+        {
+            "rooms": {
+                "!abc:matrix.org": {
+                    "sessions": {
+                        "c0ff33": {
+                            "first_message_index": 1,
+                            "forwarded_count": 1,
+                            "is_verified": false,
+                            "session_data": "SSBBTSBBIEZJU0gK"
+                        }
+                    }
+                }
+            }
+        }
+
+        Raises:
+            SynapseError: with code 404 if there are no versions defined
+            RoomKeysVersionError: if the uploaded version is not the current version
+        """
+
+        # TODO: Validate the JSON to make sure it has the right keys.
+
+        # XXX: perhaps we should use a finer grained lock here?
+        with (yield self._upload_linearizer.queue(user_id)):
+
+            # Check that the version we're trying to upload is the current version
+            try:
+                version_info = yield self.store.get_e2e_room_keys_version_info(user_id)
+            except StoreError as e:
+                if e.code == 404:
+                    raise SynapseError(404, "Version '%s' not found" % (version,))
+                else:
+                    raise
+
+            if version_info['version'] != version:
+                # Check that the version we're trying to upload actually exists
+                try:
+                    version_info = yield self.store.get_e2e_room_keys_version_info(
+                        user_id, version,
+                    )
+                    # if we get this far, the version must exist
+                    raise RoomKeysVersionError(current_version=version_info['version'])
+                except StoreError as e:
+                    if e.code == 404:
+                        raise SynapseError(404, "Version '%s' not found" % (version,))
+                    else:
+                        raise
+
+            # go through the room_keys.
+            # XXX: this should/could be done concurrently, given we're in a lock.
+            for room_id, room in iteritems(room_keys['rooms']):
+                for session_id, session in iteritems(room['sessions']):
+                    yield self._upload_room_key(
+                        user_id, version, room_id, session_id, session
+                    )
+
+    @defer.inlineCallbacks
+    def _upload_room_key(self, user_id, version, room_id, session_id, room_key):
+        """Upload a given room_key for a given room and session into a given
+        version of the backup.  Merges the key with any which might already exist.
+
+        Args:
+            user_id(str): the user whose backup we're setting
+            version(str): the version ID of the backup we're updating
+            room_id(str): the ID of the room whose keys we're setting
+            session_id(str): the session whose room_key we're setting
+            room_key(dict): the room_key being set
+        """
+
+        # get the room_key for this particular row
+        current_room_key = None
+        try:
+            current_room_key = yield self.store.get_e2e_room_key(
+                user_id, version, room_id, session_id
+            )
+        except StoreError as e:
+            if e.code == 404:
+                pass
+            else:
+                raise
+
+        if self._should_replace_room_key(current_room_key, room_key):
+            yield self.store.set_e2e_room_key(
+                user_id, version, room_id, session_id, room_key
+            )
+
+    @staticmethod
+    def _should_replace_room_key(current_room_key, room_key):
+        """
+        Determine whether to replace a given current_room_key (if any)
+        with a newly uploaded room_key backup
+
+        Args:
+            current_room_key (dict): Optional, the current room_key dict if any
+            room_key (dict): The new room_key dict which may or may not be fit to
+                replace the current_room_key
+
+        Returns:
+            True if current_room_key should be replaced by room_key in the backup
+        """
+
+        if current_room_key:
+            # spelt out with if/elifs rather than nested boolean expressions
+            # purely for legibility.
+
+            if room_key['is_verified'] and not current_room_key['is_verified']:
+                return True
+            elif (
+                room_key['first_message_index'] <
+                current_room_key['first_message_index']
+            ):
+                return True
+            elif room_key['forwarded_count'] < current_room_key['forwarded_count']:
+                return True
+            else:
+                return False
+        return True
+
+    @defer.inlineCallbacks
+    def create_version(self, user_id, version_info):
+        """Create a new backup version.  This automatically becomes the new
+        backup version for the user's keys; previous backups will no longer be
+        writeable to.
+
+        Args:
+            user_id(str): the user whose backup version we're creating
+            version_info(dict): metadata about the new version being created
+
+        {
+            "algorithm": "m.megolm_backup.v1",
+            "auth_data": "dGhpcyBzaG91bGQgYWN0dWFsbHkgYmUgZW5jcnlwdGVkIGpzb24K"
+        }
+
+        Returns:
+            A deferred of a string that gives the new version number.
+        """
+
+        # TODO: Validate the JSON to make sure it has the right keys.
+
+        # lock everyone out until we've switched version
+        with (yield self._upload_linearizer.queue(user_id)):
+            new_version = yield self.store.create_e2e_room_keys_version(
+                user_id, version_info
+            )
+            defer.returnValue(new_version)
+
+    @defer.inlineCallbacks
+    def get_version_info(self, user_id, version=None):
+        """Get the info about a given version of the user's backup
+
+        Args:
+            user_id(str): the user whose current backup version we're querying
+            version(str): Optional; if None gives the most recent version
+                otherwise a historical one.
+        Raises:
+            StoreError: code 404 if the requested backup version doesn't exist
+        Returns:
+            A deferred of a info dict that gives the info about the new version.
+
+        {
+            "version": "1234",
+            "algorithm": "m.megolm_backup.v1",
+            "auth_data": "dGhpcyBzaG91bGQgYWN0dWFsbHkgYmUgZW5jcnlwdGVkIGpzb24K"
+        }
+        """
+
+        with (yield self._upload_linearizer.queue(user_id)):
+            res = yield self.store.get_e2e_room_keys_version_info(user_id, version)
+            defer.returnValue(res)
+
+    @defer.inlineCallbacks
+    def delete_version(self, user_id, version=None):
+        """Deletes a given version of the user's e2e_room_keys backup
+
+        Args:
+            user_id(str): the user whose current backup version we're deleting
+            version(str): the version id of the backup being deleted
+        Raises:
+            StoreError: code 404 if this backup version doesn't exist
+        """
+
+        with (yield self._upload_linearizer.queue(user_id)):
+            yield self.store.delete_e2e_room_keys_version(user_id, version)
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 3fa7a98445..cd5b9bbb19 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -18,7 +18,6 @@
 
 import itertools
 import logging
-import sys
 
 import six
 from six import iteritems, itervalues
@@ -54,7 +53,7 @@ from synapse.replication.http.federation import (
     ReplicationFederationSendEventsRestServlet,
 )
 from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet
-from synapse.state import resolve_events_with_factory
+from synapse.state import StateResolutionStore, resolve_events_with_store
 from synapse.types import UserID, get_domain_from_id
 from synapse.util import logcontext, unwrapFirstError
 from synapse.util.async_helpers import Linearizer
@@ -69,6 +68,27 @@ from ._base import BaseHandler
 logger = logging.getLogger(__name__)
 
 
+def shortstr(iterable, maxitems=5):
+    """If iterable has maxitems or fewer, return the stringification of a list
+    containing those items.
+
+    Otherwise, return the stringification of a a list with the first maxitems items,
+    followed by "...".
+
+    Args:
+        iterable (Iterable): iterable to truncate
+        maxitems (int): number of items to return before truncating
+
+    Returns:
+        unicode
+    """
+
+    items = list(itertools.islice(iterable, maxitems + 1))
+    if len(items) <= maxitems:
+        return str(items)
+    return u"[" + u", ".join(repr(r) for r in items[:maxitems]) + u", ...]"
+
+
 class FederationHandler(BaseHandler):
     """Handles events that originated from federation.
         Responsible for:
@@ -85,7 +105,7 @@ class FederationHandler(BaseHandler):
 
         self.hs = hs
 
-        self.store = hs.get_datastore()
+        self.store = hs.get_datastore()  # type: synapse.storage.DataStore
         self.federation_client = hs.get_federation_client()
         self.state_handler = hs.get_state_handler()
         self.server_name = hs.hostname
@@ -114,9 +134,8 @@ class FederationHandler(BaseHandler):
         self._room_pdu_linearizer = Linearizer("fed_room_pdu")
 
     @defer.inlineCallbacks
-    @log_function
     def on_receive_pdu(
-            self, origin, pdu, get_missing=True, sent_to_us_directly=False,
+            self, origin, pdu, sent_to_us_directly=False,
     ):
         """ Process a PDU received via a federation /send/ transaction, or
         via backfill of missing prev_events
@@ -125,14 +144,23 @@ class FederationHandler(BaseHandler):
             origin (str): server which initiated the /send/ transaction. Will
                 be used to fetch missing events or state.
             pdu (FrozenEvent): received PDU
-            get_missing (bool): True if we should fetch missing prev_events
+            sent_to_us_directly (bool): True if this event was pushed to us; False if
+                we pulled it as the result of a missing prev_event.
 
         Returns (Deferred): completes with None
         """
 
+        room_id = pdu.room_id
+        event_id = pdu.event_id
+
+        logger.info(
+            "[%s %s] handling received PDU: %s",
+            room_id, event_id, pdu,
+        )
+
         # We reprocess pdus when we have seen them only as outliers
         existing = yield self.store.get_event(
-            pdu.event_id,
+            event_id,
             allow_none=True,
             allow_rejected=True,
         )
@@ -147,7 +175,7 @@ class FederationHandler(BaseHandler):
             )
         )
         if already_seen:
-            logger.debug("Already seen pdu %s", pdu.event_id)
+            logger.debug("[%s %s]: Already seen pdu", room_id, event_id)
             return
 
         # do some initial sanity-checking of the event. In particular, make
@@ -156,6 +184,7 @@ class FederationHandler(BaseHandler):
         try:
             self._sanity_check_event(pdu)
         except SynapseError as err:
+            logger.warn("[%s %s] Received event failed sanity checks", room_id, event_id)
             raise FederationError(
                 "ERROR",
                 err.code,
@@ -165,10 +194,12 @@ class FederationHandler(BaseHandler):
 
         # If we are currently in the process of joining this room, then we
         # queue up events for later processing.
-        if pdu.room_id in self.room_queues:
-            logger.info("Ignoring PDU %s for room %s from %s for now; join "
-                        "in progress", pdu.event_id, pdu.room_id, origin)
-            self.room_queues[pdu.room_id].append((pdu, origin))
+        if room_id in self.room_queues:
+            logger.info(
+                "[%s %s] Queuing PDU from %s for now: join in progress",
+                room_id, event_id, origin,
+            )
+            self.room_queues[room_id].append((pdu, origin))
             return
 
         # If we're no longer in the room just ditch the event entirely. This
@@ -179,7 +210,7 @@ class FederationHandler(BaseHandler):
         # we should check if we *are* in fact in the room. If we are then we
         # can magically rejoin the room.
         is_in_room = yield self.auth.check_host_in_room(
-            pdu.room_id,
+            room_id,
             self.server_name
         )
         if not is_in_room:
@@ -188,8 +219,8 @@ class FederationHandler(BaseHandler):
             )
             if was_in_room:
                 logger.info(
-                    "Ignoring PDU %s for room %s from %s as we've left the room!",
-                    pdu.event_id, pdu.room_id, origin,
+                    "[%s %s] Ignoring PDU from %s as we've left the room",
+                    room_id, event_id, origin,
                 )
                 defer.returnValue(None)
 
@@ -204,8 +235,8 @@ class FederationHandler(BaseHandler):
             )
 
             logger.debug(
-                "_handle_new_pdu min_depth for %s: %d",
-                pdu.room_id, min_depth
+                "[%s %s] min_depth: %d",
+                room_id, event_id, min_depth,
             )
 
             prevs = {e_id for e_id, _ in pdu.prev_events}
@@ -218,17 +249,18 @@ class FederationHandler(BaseHandler):
                 # send to the clients.
                 pdu.internal_metadata.outlier = True
             elif min_depth and pdu.depth > min_depth:
-                if get_missing and prevs - seen:
+                missing_prevs = prevs - seen
+                if sent_to_us_directly and missing_prevs:
                     # If we're missing stuff, ensure we only fetch stuff one
                     # at a time.
                     logger.info(
-                        "Acquiring lock for room %r to fetch %d missing events: %r...",
-                        pdu.room_id, len(prevs - seen), list(prevs - seen)[:5],
+                        "[%s %s] Acquiring room lock to fetch %d missing prev_events: %s",
+                        room_id, event_id, len(missing_prevs), shortstr(missing_prevs),
                     )
                     with (yield self._room_pdu_linearizer.queue(pdu.room_id)):
                         logger.info(
-                            "Acquired lock for room %r to fetch %d missing events",
-                            pdu.room_id, len(prevs - seen),
+                            "[%s %s] Acquired room lock to fetch %d missing prev_events",
+                            room_id, event_id, len(missing_prevs),
                         )
 
                         yield self._get_missing_events_for_pdu(
@@ -241,69 +273,150 @@ class FederationHandler(BaseHandler):
 
                         if not prevs - seen:
                             logger.info(
-                                "Found all missing prev events for %s", pdu.event_id
+                                "[%s %s] Found all missing prev_events",
+                                room_id, event_id,
                             )
-                elif prevs - seen:
+                elif missing_prevs:
                     logger.info(
-                        "Not fetching %d missing events for room %r,event %s: %r...",
-                        len(prevs - seen), pdu.room_id, pdu.event_id,
-                        list(prevs - seen)[:5],
+                        "[%s %s] Not recursively fetching %d missing prev_events: %s",
+                        room_id, event_id, len(missing_prevs), shortstr(missing_prevs),
                     )
 
-            if sent_to_us_directly and prevs - seen:
-                # If they have sent it to us directly, and the server
-                # isn't telling us about the auth events that it's
-                # made a message referencing, we explode
-                raise FederationError(
-                    "ERROR",
-                    403,
-                    (
-                        "Your server isn't divulging details about prev_events "
-                        "referenced in this event."
-                    ),
-                    affected=pdu.event_id,
-                )
-            elif prevs - seen:
-                # Calculate the state of the previous events, and
-                # de-conflict them to find the current state.
-                state_groups = []
+            if prevs - seen:
+                # We've still not been able to get all of the prev_events for this event.
+                #
+                # In this case, we need to fall back to asking another server in the
+                # federation for the state at this event. That's ok provided we then
+                # resolve the state against other bits of the DAG before using it (which
+                # will ensure that you can't just take over a room by sending an event,
+                # withholding its prev_events, and declaring yourself to be an admin in
+                # the subsequent state request).
+                #
+                # Now, if we're pulling this event as a missing prev_event, then clearly
+                # this event is not going to become the only forward-extremity and we are
+                # guaranteed to resolve its state against our existing forward
+                # extremities, so that should be fine.
+                #
+                # On the other hand, if this event was pushed to us, it is possible for
+                # it to become the only forward-extremity in the room, and we would then
+                # trust its state to be the state for the whole room. This is very bad.
+                # Further, if the event was pushed to us, there is no excuse for us not to
+                # have all the prev_events. We therefore reject any such events.
+                #
+                # XXX this really feels like it could/should be merged with the above,
+                # but there is an interaction with min_depth that I'm not really
+                # following.
+
+                if sent_to_us_directly:
+                    logger.warn(
+                        "[%s %s] Rejecting: failed to fetch %d prev events: %s",
+                        room_id, event_id, len(prevs - seen), shortstr(prevs - seen)
+                    )
+                    raise FederationError(
+                        "ERROR",
+                        403,
+                        (
+                            "Your server isn't divulging details about prev_events "
+                            "referenced in this event."
+                        ),
+                        affected=pdu.event_id,
+                    )
+
+                # Calculate the state after each of the previous events, and
+                # resolve them to find the correct state at the current event.
                 auth_chains = set()
+                event_map = {
+                    event_id: pdu,
+                }
                 try:
                     # Get the state of the events we know about
-                    ours = yield self.store.get_state_groups(pdu.room_id, list(seen))
-                    state_groups.append(ours)
+                    ours = yield self.store.get_state_groups_ids(room_id, seen)
+
+                    # state_maps is a list of mappings from (type, state_key) to event_id
+                    # type: list[dict[tuple[str, str], str]]
+                    state_maps = list(ours.values())
+
+                    # we don't need this any more, let's delete it.
+                    del ours
 
                     # Ask the remote server for the states we don't
                     # know about
                     for p in prevs - seen:
-                        state, got_auth_chain = (
-                            yield self.federation_client.get_state_for_room(
-                                origin, pdu.room_id, p
-                            )
-                        )
-                        auth_chains.update(got_auth_chain)
-                        state_group = {(x.type, x.state_key): x.event_id for x in state}
-                        state_groups.append(state_group)
-
-                    # Resolve any conflicting state
-                    def fetch(ev_ids):
-                        return self.store.get_events(
-                            ev_ids, get_prev_content=False, check_redacted=False
+                        logger.info(
+                            "[%s %s] Requesting state at missing prev_event %s",
+                            room_id, event_id, p,
                         )
 
-                    room_version = yield self.store.get_room_version(pdu.room_id)
-                    state_map = yield resolve_events_with_factory(
-                        room_version, state_groups, {pdu.event_id: pdu}, fetch
+                        with logcontext.nested_logging_context(p):
+                            # note that if any of the missing prevs share missing state or
+                            # auth events, the requests to fetch those events are deduped
+                            # by the get_pdu_cache in federation_client.
+                            remote_state, got_auth_chain = (
+                                yield self.federation_client.get_state_for_room(
+                                    origin, room_id, p,
+                                )
+                            )
+
+                            # we want the state *after* p; get_state_for_room returns the
+                            # state *before* p.
+                            remote_event = yield self.federation_client.get_pdu(
+                                [origin], p, outlier=True,
+                            )
+
+                            if remote_event is None:
+                                raise Exception(
+                                    "Unable to get missing prev_event %s" % (p, )
+                                )
+
+                            if remote_event.is_state():
+                                remote_state.append(remote_event)
+
+                            # XXX hrm I'm not convinced that duplicate events will compare
+                            # for equality, so I'm not sure this does what the author
+                            # hoped.
+                            auth_chains.update(got_auth_chain)
+
+                            remote_state_map = {
+                                (x.type, x.state_key): x.event_id for x in remote_state
+                            }
+                            state_maps.append(remote_state_map)
+
+                            for x in remote_state:
+                                event_map[x.event_id] = x
+
+                    room_version = yield self.store.get_room_version(room_id)
+                    state_map = yield resolve_events_with_store(
+                        room_version, state_maps, event_map,
+                        state_res_store=StateResolutionStore(self.store),
+                    )
+
+                    # We need to give _process_received_pdu the actual state events
+                    # rather than event ids, so generate that now.
+
+                    # First though we need to fetch all the events that are in
+                    # state_map, so we can build up the state below.
+                    evs = yield self.store.get_events(
+                        list(state_map.values()),
+                        get_prev_content=False,
+                        check_redacted=False,
                     )
+                    event_map.update(evs)
 
-                    state = (yield self.store.get_events(state_map.values())).values()
+                    state = [
+                        event_map[e] for e in six.itervalues(state_map)
+                    ]
                     auth_chain = list(auth_chains)
                 except Exception:
+                    logger.warn(
+                        "[%s %s] Error attempting to resolve state at missing "
+                        "prev_events",
+                        room_id, event_id, exc_info=True,
+                    )
                     raise FederationError(
                         "ERROR",
                         403,
                         "We can't get valid state history.",
-                        affected=pdu.event_id,
+                        affected=event_id,
                     )
 
         yield self._process_received_pdu(
@@ -322,15 +435,16 @@ class FederationHandler(BaseHandler):
             prevs (set(str)): List of event ids which we are missing
             min_depth (int): Minimum depth of events to return.
         """
-        # We recalculate seen, since it may have changed.
+
+        room_id = pdu.room_id
+        event_id = pdu.event_id
+
         seen = yield self.store.have_seen_events(prevs)
 
         if not prevs - seen:
             return
 
-        latest = yield self.store.get_latest_event_ids_in_room(
-            pdu.room_id
-        )
+        latest = yield self.store.get_latest_event_ids_in_room(room_id)
 
         # We add the prev events that we have seen to the latest
         # list to ensure the remote server doesn't give them to us
@@ -338,8 +452,8 @@ class FederationHandler(BaseHandler):
         latest |= seen
 
         logger.info(
-            "Missing %d events for room %r pdu %s: %r...",
-            len(prevs - seen), pdu.room_id, pdu.event_id, list(prevs - seen)[:5]
+            "[%s %s]: Requesting missing events between %s and %s",
+            room_id, event_id, shortstr(latest), event_id,
         )
 
         # XXX: we set timeout to 10s to help workaround
@@ -360,49 +474,88 @@ class FederationHandler(BaseHandler):
         # apparently.
         #
         # see https://github.com/matrix-org/synapse/pull/1744
+        #
+        # ----
+        #
+        # Update richvdh 2018/09/18: There are a number of problems with timing this
+        # request out agressively on the client side:
+        #
+        # - it plays badly with the server-side rate-limiter, which starts tarpitting you
+        #   if you send too many requests at once, so you end up with the server carefully
+        #   working through the backlog of your requests, which you have already timed
+        #   out.
+        #
+        # - for this request in particular, we now (as of
+        #   https://github.com/matrix-org/synapse/pull/3456) reject any PDUs where the
+        #   server can't produce a plausible-looking set of prev_events - so we becone
+        #   much more likely to reject the event.
+        #
+        # - contrary to what it says above, we do *not* fall back to fetching fresh state
+        #   for the room if get_missing_events times out. Rather, we give up processing
+        #   the PDU whose prevs we are missing, which then makes it much more likely that
+        #   we'll end up back here for the *next* PDU in the list, which exacerbates the
+        #   problem.
+        #
+        # - the agressive 10s timeout was introduced to deal with incoming federation
+        #   requests taking 8 hours to process. It's not entirely clear why that was going
+        #   on; certainly there were other issues causing traffic storms which are now
+        #   resolved, and I think in any case we may be more sensible about our locking
+        #   now. We're *certainly* more sensible about our logging.
+        #
+        # All that said: Let's try increasing the timout to 60s and see what happens.
 
         missing_events = yield self.federation_client.get_missing_events(
             origin,
-            pdu.room_id,
+            room_id,
             earliest_events_ids=list(latest),
             latest_events=[pdu],
             limit=10,
             min_depth=min_depth,
-            timeout=10000,
+            timeout=60000,
         )
 
         logger.info(
-            "Got %d events: %r...",
-            len(missing_events), [e.event_id for e in missing_events[:5]]
+            "[%s %s]: Got %d prev_events: %s",
+            room_id, event_id, len(missing_events), shortstr(missing_events),
         )
 
         # We want to sort these by depth so we process them and
         # tell clients about them in order.
         missing_events.sort(key=lambda x: x.depth)
 
-        for e in missing_events:
-            logger.info("Handling found event %s", e.event_id)
-            try:
-                yield self.on_receive_pdu(
-                    origin,
-                    e,
-                    get_missing=False
-                )
-            except FederationError as e:
-                if e.code == 403:
-                    logger.warn("Event %s failed history check.")
-                else:
-                    raise
+        for ev in missing_events:
+            logger.info(
+                "[%s %s] Handling received prev_event %s",
+                room_id, event_id, ev.event_id,
+            )
+            with logcontext.nested_logging_context(ev.event_id):
+                try:
+                    yield self.on_receive_pdu(
+                        origin,
+                        ev,
+                        sent_to_us_directly=False,
+                    )
+                except FederationError as e:
+                    if e.code == 403:
+                        logger.warn(
+                            "[%s %s] Received prev_event %s failed history check.",
+                            room_id, event_id, ev.event_id,
+                        )
+                    else:
+                        raise
 
-    @log_function
     @defer.inlineCallbacks
-    def _process_received_pdu(self, origin, pdu, state, auth_chain):
+    def _process_received_pdu(self, origin, event, state, auth_chain):
         """ Called when we have a new pdu. We need to do auth checks and put it
         through the StateHandler.
         """
-        event = pdu
+        room_id = event.room_id
+        event_id = event.event_id
 
-        logger.debug("Processing event: %s", event)
+        logger.debug(
+            "[%s %s] Processing event: %s",
+            room_id, event_id, event,
+        )
 
         # FIXME (erikj): Awful hack to make the case where we are not currently
         # in the room work
@@ -411,15 +564,16 @@ class FederationHandler(BaseHandler):
         # event.
         if state and auth_chain and not event.internal_metadata.is_outlier():
             is_in_room = yield self.auth.check_host_in_room(
-                event.room_id,
+                room_id,
                 self.server_name
             )
         else:
             is_in_room = True
+
         if not is_in_room:
             logger.info(
-                "Got event for room we're not in: %r %r",
-                event.room_id, event.event_id
+                "[%s %s] Got event for room we're not in",
+                room_id, event_id,
             )
 
             try:
@@ -431,7 +585,7 @@ class FederationHandler(BaseHandler):
                     "ERROR",
                     e.code,
                     e.msg,
-                    affected=event.event_id,
+                    affected=event_id,
                 )
 
         else:
@@ -464,6 +618,10 @@ class FederationHandler(BaseHandler):
                     })
                     seen_ids.add(e.event_id)
 
+                logger.info(
+                    "[%s %s] persisting newly-received auth/state events %s",
+                    room_id, event_id, [e["event"].event_id for e in event_infos]
+                )
                 yield self._handle_new_events(origin, event_infos)
 
             try:
@@ -480,12 +638,12 @@ class FederationHandler(BaseHandler):
                     affected=event.event_id,
                 )
 
-        room = yield self.store.get_room(event.room_id)
+        room = yield self.store.get_room(room_id)
 
         if not room:
             try:
                 yield self.store.store_room(
-                    room_id=event.room_id,
+                    room_id=room_id,
                     room_creator_user_id="",
                     is_public=False,
                 )
@@ -513,7 +671,7 @@ class FederationHandler(BaseHandler):
 
                 if newly_joined:
                     user = UserID.from_string(event.state_key)
-                    yield self.user_joined_room(user, event.room_id)
+                    yield self.user_joined_room(user, room_id)
 
     @log_function
     @defer.inlineCallbacks
@@ -594,7 +752,7 @@ class FederationHandler(BaseHandler):
 
         required_auth = set(
             a_id
-            for event in events + state_events.values() + auth_events.values()
+            for event in events + list(state_events.values()) + list(auth_events.values())
             for a_id, _ in event.auth_events
         )
         auth_events.update({
@@ -802,7 +960,7 @@ class FederationHandler(BaseHandler):
                     )
                     continue
                 except NotRetryingDestination as e:
-                    logger.info(e.message)
+                    logger.info(str(e))
                     continue
                 except FederationDeniedError as e:
                     logger.info(e)
@@ -1027,7 +1185,8 @@ class FederationHandler(BaseHandler):
             try:
                 logger.info("Processing queued PDU %s which was received "
                             "while we were joining %s", p.event_id, p.room_id)
-                yield self.on_receive_pdu(origin, p)
+                with logcontext.nested_logging_context(p.event_id):
+                    yield self.on_receive_pdu(origin, p, sent_to_us_directly=True)
             except Exception as e:
                 logger.warn(
                     "Error handling queued PDU %s from %s: %s",
@@ -1358,7 +1517,7 @@ class FederationHandler(BaseHandler):
         )
 
         if state_groups:
-            _, state = state_groups.items().pop()
+            _, state = list(state_groups.items()).pop()
             results = state
 
             if event.is_state():
@@ -1430,12 +1589,10 @@ class FederationHandler(BaseHandler):
         else:
             defer.returnValue(None)
 
-    @log_function
     def get_min_depth_for_context(self, context):
         return self.store.get_min_depth(context)
 
     @defer.inlineCallbacks
-    @log_function
     def _handle_new_event(self, origin, event, state=None, auth_events=None,
                           backfilled=False):
         context = yield self._prep_event(
@@ -1444,6 +1601,9 @@ class FederationHandler(BaseHandler):
             auth_events=auth_events,
         )
 
+        # reraise does not allow inlineCallbacks to preserve the stacktrace, so we
+        # hack around with a try/finally instead.
+        success = False
         try:
             if not event.internal_metadata.is_outlier() and not backfilled:
                 yield self.action_generator.handle_push_actions_for_event(
@@ -1454,15 +1614,13 @@ class FederationHandler(BaseHandler):
                 [(event, context)],
                 backfilled=backfilled,
             )
-        except:  # noqa: E722, as we reraise the exception this is fine.
-            tp, value, tb = sys.exc_info()
-
-            logcontext.run_in_background(
-                self.store.remove_push_actions_from_staging,
-                event.event_id,
-            )
-
-            six.reraise(tp, value, tb)
+            success = True
+        finally:
+            if not success:
+                logcontext.run_in_background(
+                    self.store.remove_push_actions_from_staging,
+                    event.event_id,
+                )
 
         defer.returnValue(context)
 
@@ -1475,15 +1633,22 @@ class FederationHandler(BaseHandler):
 
         Notifies about the events where appropriate.
         """
-        contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults(
-            [
-                logcontext.run_in_background(
-                    self._prep_event,
+
+        @defer.inlineCallbacks
+        def prep(ev_info):
+            event = ev_info["event"]
+            with logcontext.nested_logging_context(suffix=event.event_id):
+                res = yield self._prep_event(
                     origin,
-                    ev_info["event"],
+                    event,
                     state=ev_info.get("state"),
                     auth_events=ev_info.get("auth_events"),
                 )
+            defer.returnValue(res)
+
+        contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults(
+            [
+                logcontext.run_in_background(prep, ev_info)
                 for ev_info in event_infos
             ], consumeErrors=True,
         ))
@@ -1635,8 +1800,8 @@ class FederationHandler(BaseHandler):
             )
         except AuthError as e:
             logger.warn(
-                "Rejecting %s because %s",
-                event.event_id, e.msg
+                "[%s %s] Rejecting: %s",
+                event.room_id, event.event_id, e.msg
             )
 
             context.rejected = RejectedReason.AUTH_ERROR
@@ -1687,7 +1852,7 @@ class FederationHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def on_get_missing_events(self, origin, room_id, earliest_events,
-                              latest_events, limit, min_depth):
+                              latest_events, limit):
         in_room = yield self.auth.check_host_in_room(
             room_id,
             origin
@@ -1696,14 +1861,12 @@ class FederationHandler(BaseHandler):
             raise AuthError(403, "Host not in room.")
 
         limit = min(limit, 20)
-        min_depth = max(min_depth, 0)
 
         missing_events = yield self.store.get_missing_events(
             room_id=room_id,
             earliest_events=earliest_events,
             latest_events=latest_events,
             limit=limit,
-            min_depth=min_depth,
         )
 
         missing_events = yield filter_events_for_server(
@@ -2357,7 +2520,7 @@ class FederationHandler(BaseHandler):
 
             if not backfilled:  # Never notify for backfilled events
                 for event, _ in event_and_contexts:
-                    self._notify_persisted_event(event, max_stream_id)
+                    yield self._notify_persisted_event(event, max_stream_id)
 
     def _notify_persisted_event(self, event, max_stream_id):
         """Checks to see if notifier/pushers should be notified about the
@@ -2390,7 +2553,7 @@ class FederationHandler(BaseHandler):
             extra_users=extra_users
         )
 
-        self.pusher_pool.on_new_notifications(
+        return self.pusher_pool.on_new_notifications(
             event_stream_id, max_stream_id,
         )
 
diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py
index 53e5e2648b..173315af6c 100644
--- a/synapse/handlers/groups_local.py
+++ b/synapse/handlers/groups_local.py
@@ -20,7 +20,7 @@ from six import iteritems
 
 from twisted.internet import defer
 
-from synapse.api.errors import SynapseError
+from synapse.api.errors import HttpResponseException, SynapseError
 from synapse.types import get_domain_from_id
 
 logger = logging.getLogger(__name__)
@@ -37,9 +37,23 @@ def _create_rerouter(func_name):
             )
         else:
             destination = get_domain_from_id(group_id)
-            return getattr(self.transport_client, func_name)(
+            d = getattr(self.transport_client, func_name)(
                 destination, group_id, *args, **kwargs
             )
+
+            # Capture errors returned by the remote homeserver and
+            # re-throw specific errors as SynapseErrors. This is so
+            # when the remote end responds with things like 403 Not
+            # In Group, we can communicate that to the client instead
+            # of a 500.
+            def h(failure):
+                failure.trap(HttpResponseException)
+                e = failure.value
+                if e.code == 403:
+                    raise e.to_synapse_error()
+                return failure
+            d.addErrback(h)
+            return d
     return f
 
 
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index e009395207..563bb3cea3 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -156,7 +156,7 @@ class InitialSyncHandler(BaseHandler):
                     room_end_token = "s%d" % (event.stream_ordering,)
                     deferred_room_state = run_in_background(
                         self.store.get_state_for_events,
-                        [event.event_id], None,
+                        [event.event_id],
                     )
                     deferred_room_state.addCallback(
                         lambda states: states[event.event_id]
@@ -301,7 +301,7 @@ class InitialSyncHandler(BaseHandler):
     def _room_initial_sync_parted(self, user_id, room_id, pagin_config,
                                   membership, member_event_id, is_peeking):
         room_state = yield self.store.get_state_for_events(
-            [member_event_id], None
+            [member_event_id],
         )
 
         room_state = room_state[member_event_id]
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index e484061cc0..a7cd779b02 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -14,9 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
-import sys
 
-import six
 from six import iteritems, itervalues, string_types
 
 from canonicaljson import encode_canonical_json, json
@@ -37,6 +35,7 @@ from synapse.crypto.event_signing import add_hashes_and_signatures
 from synapse.events.utils import serialize_event
 from synapse.events.validator import EventValidator
 from synapse.replication.http.send_event import ReplicationSendEventRestServlet
+from synapse.storage.state import StateFilter
 from synapse.types import RoomAlias, UserID
 from synapse.util.async_helpers import Linearizer
 from synapse.util.frozenutils import frozendict_json_encoder
@@ -82,7 +81,7 @@ class MessageHandler(object):
         elif membership == Membership.LEAVE:
             key = (event_type, state_key)
             room_state = yield self.store.get_state_for_events(
-                [membership_event_id], [key]
+                [membership_event_id], StateFilter.from_types([key])
             )
             data = room_state[membership_event_id].get(key)
 
@@ -90,7 +89,7 @@ class MessageHandler(object):
 
     @defer.inlineCallbacks
     def get_state_events(
-        self, user_id, room_id, types=None, filtered_types=None,
+        self, user_id, room_id, state_filter=StateFilter.all(),
         at_token=None, is_guest=False,
     ):
         """Retrieve all state events for a given room. If the user is
@@ -102,13 +101,8 @@ class MessageHandler(object):
         Args:
             user_id(str): The user requesting state events.
             room_id(str): The room ID to get all state events from.
-            types(list[(str, str|None)]|None): List of (type, state_key) tuples
-                which are used to filter the state fetched. If `state_key` is None,
-                all events are returned of the given type.
-                May be None, which matches any key.
-            filtered_types(list[str]|None): Only apply filtering via `types` to this
-                list of event types.  Other types of events are returned unfiltered.
-                If None, `types` filtering is applied to all events.
+            state_filter (StateFilter): The state filter used to fetch state
+                from the database.
             at_token(StreamToken|None): the stream token of the at which we are requesting
                 the stats. If the user is not allowed to view the state as of that
                 stream token, we raise a 403 SynapseError. If None, returns the current
@@ -141,7 +135,7 @@ class MessageHandler(object):
             event = last_events[0]
             if visible_events:
                 room_state = yield self.store.get_state_for_events(
-                    [event.event_id], types, filtered_types=filtered_types,
+                    [event.event_id], state_filter=state_filter,
                 )
                 room_state = room_state[event.event_id]
             else:
@@ -160,12 +154,12 @@ class MessageHandler(object):
 
             if membership == Membership.JOIN:
                 state_ids = yield self.store.get_filtered_current_state_ids(
-                    room_id, types, filtered_types=filtered_types,
+                    room_id, state_filter=state_filter,
                 )
                 room_state = yield self.store.get_events(state_ids.values())
             elif membership == Membership.LEAVE:
                 room_state = yield self.store.get_state_for_events(
-                    [membership_event_id], types, filtered_types=filtered_types,
+                    [membership_event_id], state_filter=state_filter,
                 )
                 room_state = room_state[membership_event_id]
 
@@ -433,6 +427,9 @@ class EventCreationHandler(object):
 
         if event.is_state():
             prev_state = yield self.deduplicate_state_event(event, context)
+            logger.info(
+                "Not bothering to persist duplicate state event %s", event.event_id,
+            )
             if prev_state is not None:
                 defer.returnValue(prev_state)
 
@@ -624,6 +621,9 @@ class EventCreationHandler(object):
             event, context
         )
 
+        # reraise does not allow inlineCallbacks to preserve the stacktrace, so we
+        # hack around with a try/finally instead.
+        success = False
         try:
             # If we're a worker we need to hit out to the master.
             if self.config.worker_app:
@@ -636,6 +636,7 @@ class EventCreationHandler(object):
                     ratelimit=ratelimit,
                     extra_users=extra_users,
                 )
+                success = True
                 return
 
             yield self.persist_and_notify_client_event(
@@ -645,17 +646,16 @@ class EventCreationHandler(object):
                 ratelimit=ratelimit,
                 extra_users=extra_users,
             )
-        except:  # noqa: E722, as we reraise the exception this is fine.
-            # Ensure that we actually remove the entries in the push actions
-            # staging area, if we calculated them.
-            tp, value, tb = sys.exc_info()
-
-            run_in_background(
-                self.store.remove_push_actions_from_staging,
-                event.event_id,
-            )
 
-            six.reraise(tp, value, tb)
+            success = True
+        finally:
+            if not success:
+                # Ensure that we actually remove the entries in the push actions
+                # staging area, if we calculated them.
+                run_in_background(
+                    self.store.remove_push_actions_from_staging,
+                    event.event_id,
+                )
 
     @defer.inlineCallbacks
     def persist_and_notify_client_event(
@@ -778,7 +778,7 @@ class EventCreationHandler(object):
             event, context=context
         )
 
-        self.pusher_pool.on_new_notifications(
+        yield self.pusher_pool.on_new_notifications(
             event_stream_id, max_stream_id,
         )
 
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 5170d093e3..43f81bd607 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -21,6 +21,7 @@ from twisted.python.failure import Failure
 from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import SynapseError
 from synapse.events.utils import serialize_event
+from synapse.storage.state import StateFilter
 from synapse.types import RoomStreamToken
 from synapse.util.async_helpers import ReadWriteLock
 from synapse.util.logcontext import run_in_background
@@ -255,28 +256,19 @@ class PaginationHandler(object):
         if event_filter and event_filter.lazy_load_members():
             # TODO: remove redundant members
 
-            types = [
-                (EventTypes.Member, state_key)
-                for state_key in set(
-                    event.sender  # FIXME: we also care about invite targets etc.
-                    for event in events
-                )
-            ]
+            # FIXME: we also care about invite targets etc.
+            state_filter = StateFilter.from_types(
+                (EventTypes.Member, event.sender)
+                for event in events
+            )
 
             state_ids = yield self.store.get_state_ids_for_event(
-                events[0].event_id, types=types,
+                events[0].event_id, state_filter=state_filter,
             )
 
             if state_ids:
                 state = yield self.store.get_events(list(state_ids.values()))
-
-            if state:
-                state = yield filter_events_for_client(
-                    self.store,
-                    user_id,
-                    state.values(),
-                    is_peeking=(member_event_id is None),
-                )
+                state = state.values()
 
         time_now = self.clock.time_msec()
 
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 75b8b7ce6a..1dfbde84fd 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -142,10 +142,8 @@ class BaseProfileHandler(BaseHandler):
                 if e.code != 404:
                     logger.exception("Failed to get displayname")
                 raise
-            except Exception:
-                logger.exception("Failed to get displayname")
-            else:
-                defer.returnValue(result["displayname"])
+
+            defer.returnValue(result["displayname"])
 
     @defer.inlineCallbacks
     def set_displayname(self, target_user, requester, new_displayname, by_admin=False):
@@ -199,8 +197,6 @@ class BaseProfileHandler(BaseHandler):
                 if e.code != 404:
                     logger.exception("Failed to get avatar_url")
                 raise
-            except Exception:
-                logger.exception("Failed to get avatar_url")
 
             defer.returnValue(result["avatar_url"])
 
@@ -278,7 +274,7 @@ class BaseProfileHandler(BaseHandler):
             except Exception as e:
                 logger.warn(
                     "Failed to update join event for room %s - %s",
-                    room_id, str(e.message)
+                    room_id, str(e)
                 )
 
 
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index a6f3181f09..4c2690ba26 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -119,7 +119,7 @@ class ReceiptsHandler(BaseHandler):
             "receipt_key", max_batch_id, rooms=affected_room_ids
         )
         # Note that the min here shouldn't be relied upon to be accurate.
-        self.hs.get_pusherpool().on_new_receipts(
+        yield self.hs.get_pusherpool().on_new_receipts(
             min_batch_id, max_batch_id, affected_room_ids,
         )
 
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 1e53f2c635..d2beb275cf 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -220,15 +220,42 @@ class RegistrationHandler(BaseHandler):
 
         # auto-join the user to any rooms we're supposed to dump them into
         fake_requester = create_requester(user_id)
+
+        # try to create the room if we're the first user on the server
+        should_auto_create_rooms = False
+        if self.hs.config.autocreate_auto_join_rooms:
+            count = yield self.store.count_all_users()
+            should_auto_create_rooms = count == 1
+
         for r in self.hs.config.auto_join_rooms:
             try:
-                yield self._join_user_to_room(fake_requester, r)
+                if should_auto_create_rooms:
+                    room_alias = RoomAlias.from_string(r)
+                    if self.hs.hostname != room_alias.domain:
+                        logger.warning(
+                            'Cannot create room alias %s, '
+                            'it does not match server domain',
+                            r,
+                        )
+                    else:
+                        # create room expects the localpart of the room alias
+                        room_alias_localpart = room_alias.localpart
+
+                        # getting the RoomCreationHandler during init gives a dependency
+                        # loop
+                        yield self.hs.get_room_creation_handler().create_room(
+                            fake_requester,
+                            config={
+                                "preset": "public_chat",
+                                "room_alias_name": room_alias_localpart
+                            },
+                            ratelimit=False,
+                        )
+                else:
+                    yield self._join_user_to_room(fake_requester, r)
             except Exception as e:
                 logger.error("Failed to join new user to %r: %r", r, e)
 
-        # We used to generate default identicons here, but nowadays
-        # we want clients to generate their own as part of their branding
-        # rather than there being consistent matrix-wide ones, so we don't.
         defer.returnValue((user_id, token))
 
     @defer.inlineCallbacks
@@ -534,4 +561,5 @@ class RegistrationHandler(BaseHandler):
             room_id=room_id,
             remote_room_hosts=remote_room_hosts,
             action="join",
+            ratelimit=False,
         )
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index c3f820b975..3928faa6e7 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -21,7 +21,7 @@ import math
 import string
 from collections import OrderedDict
 
-from six import string_types
+from six import iteritems, string_types
 
 from twisted.internet import defer
 
@@ -32,9 +32,11 @@ from synapse.api.constants import (
     JoinRules,
     RoomCreationPreset,
 )
-from synapse.api.errors import AuthError, Codes, StoreError, SynapseError
+from synapse.api.errors import AuthError, Codes, NotFoundError, StoreError, SynapseError
+from synapse.storage.state import StateFilter
 from synapse.types import RoomAlias, RoomID, RoomStreamToken, StreamToken, UserID
 from synapse.util import stringutils
+from synapse.util.async_helpers import Linearizer
 from synapse.visibility import filter_events_for_client
 
 from ._base import BaseHandler
@@ -72,6 +74,334 @@ class RoomCreationHandler(BaseHandler):
 
         self.spam_checker = hs.get_spam_checker()
         self.event_creation_handler = hs.get_event_creation_handler()
+        self.room_member_handler = hs.get_room_member_handler()
+
+        # linearizer to stop two upgrades happening at once
+        self._upgrade_linearizer = Linearizer("room_upgrade_linearizer")
+
+    @defer.inlineCallbacks
+    def upgrade_room(self, requester, old_room_id, new_version):
+        """Replace a room with a new room with a different version
+
+        Args:
+            requester (synapse.types.Requester): the user requesting the upgrade
+            old_room_id (unicode): the id of the room to be replaced
+            new_version (unicode): the new room version to use
+
+        Returns:
+            Deferred[unicode]: the new room id
+        """
+        yield self.ratelimit(requester)
+
+        user_id = requester.user.to_string()
+
+        with (yield self._upgrade_linearizer.queue(old_room_id)):
+            # start by allocating a new room id
+            r = yield self.store.get_room(old_room_id)
+            if r is None:
+                raise NotFoundError("Unknown room id %s" % (old_room_id,))
+            new_room_id = yield self._generate_room_id(
+                creator_id=user_id, is_public=r["is_public"],
+            )
+
+            logger.info("Creating new room %s to replace %s", new_room_id, old_room_id)
+
+            # we create and auth the tombstone event before properly creating the new
+            # room, to check our user has perms in the old room.
+            tombstone_event, tombstone_context = (
+                yield self.event_creation_handler.create_event(
+                    requester, {
+                        "type": EventTypes.Tombstone,
+                        "state_key": "",
+                        "room_id": old_room_id,
+                        "sender": user_id,
+                        "content": {
+                            "body": "This room has been replaced",
+                            "replacement_room": new_room_id,
+                        }
+                    },
+                    token_id=requester.access_token_id,
+                )
+            )
+            yield self.auth.check_from_context(tombstone_event, tombstone_context)
+
+            yield self.clone_exiting_room(
+                requester,
+                old_room_id=old_room_id,
+                new_room_id=new_room_id,
+                new_room_version=new_version,
+                tombstone_event_id=tombstone_event.event_id,
+            )
+
+            # now send the tombstone
+            yield self.event_creation_handler.send_nonmember_event(
+                requester, tombstone_event, tombstone_context,
+            )
+
+            old_room_state = yield tombstone_context.get_current_state_ids(self.store)
+
+            # update any aliases
+            yield self._move_aliases_to_new_room(
+                requester, old_room_id, new_room_id, old_room_state,
+            )
+
+            # and finally, shut down the PLs in the old room, and update them in the new
+            # room.
+            yield self._update_upgraded_room_pls(
+                requester, old_room_id, new_room_id, old_room_state,
+            )
+
+            defer.returnValue(new_room_id)
+
+    @defer.inlineCallbacks
+    def _update_upgraded_room_pls(
+            self, requester, old_room_id, new_room_id, old_room_state,
+    ):
+        """Send updated power levels in both rooms after an upgrade
+
+        Args:
+            requester (synapse.types.Requester): the user requesting the upgrade
+            old_room_id (unicode): the id of the room to be replaced
+            new_room_id (unicode): the id of the replacement room
+            old_room_state (dict[tuple[str, str], str]): the state map for the old room
+
+        Returns:
+            Deferred
+        """
+        old_room_pl_event_id = old_room_state.get((EventTypes.PowerLevels, ""))
+
+        if old_room_pl_event_id is None:
+            logger.warning(
+                "Not supported: upgrading a room with no PL event. Not setting PLs "
+                "in old room.",
+            )
+            return
+
+        old_room_pl_state = yield self.store.get_event(old_room_pl_event_id)
+
+        # we try to stop regular users from speaking by setting the PL required
+        # to send regular events and invites to 'Moderator' level. That's normally
+        # 50, but if the default PL in a room is 50 or more, then we set the
+        # required PL above that.
+
+        pl_content = dict(old_room_pl_state.content)
+        users_default = int(pl_content.get("users_default", 0))
+        restricted_level = max(users_default + 1, 50)
+
+        updated = False
+        for v in ("invite", "events_default"):
+            current = int(pl_content.get(v, 0))
+            if current < restricted_level:
+                logger.info(
+                    "Setting level for %s in %s to %i (was %i)",
+                    v, old_room_id, restricted_level, current,
+                )
+                pl_content[v] = restricted_level
+                updated = True
+            else:
+                logger.info(
+                    "Not setting level for %s (already %i)",
+                    v, current,
+                )
+
+        if updated:
+            try:
+                yield self.event_creation_handler.create_and_send_nonmember_event(
+                    requester, {
+                        "type": EventTypes.PowerLevels,
+                        "state_key": '',
+                        "room_id": old_room_id,
+                        "sender": requester.user.to_string(),
+                        "content": pl_content,
+                    }, ratelimit=False,
+                )
+            except AuthError as e:
+                logger.warning("Unable to update PLs in old room: %s", e)
+
+        logger.info("Setting correct PLs in new room")
+        yield self.event_creation_handler.create_and_send_nonmember_event(
+            requester, {
+                "type": EventTypes.PowerLevels,
+                "state_key": '',
+                "room_id": new_room_id,
+                "sender": requester.user.to_string(),
+                "content": old_room_pl_state.content,
+            }, ratelimit=False,
+        )
+
+    @defer.inlineCallbacks
+    def clone_exiting_room(
+            self, requester, old_room_id, new_room_id, new_room_version,
+            tombstone_event_id,
+    ):
+        """Populate a new room based on an old room
+
+        Args:
+            requester (synapse.types.Requester): the user requesting the upgrade
+            old_room_id (unicode): the id of the room to be replaced
+            new_room_id (unicode): the id to give the new room (should already have been
+                created with _gemerate_room_id())
+            new_room_version (unicode): the new room version to use
+            tombstone_event_id (unicode|str): the ID of the tombstone event in the old
+                room.
+        Returns:
+            Deferred[None]
+        """
+        user_id = requester.user.to_string()
+
+        if not self.spam_checker.user_may_create_room(user_id):
+            raise SynapseError(403, "You are not permitted to create rooms")
+
+        creation_content = {
+            "room_version": new_room_version,
+            "predecessor": {
+                "room_id": old_room_id,
+                "event_id": tombstone_event_id,
+            }
+        }
+
+        initial_state = dict()
+
+        types_to_copy = (
+            (EventTypes.JoinRules, ""),
+            (EventTypes.Name, ""),
+            (EventTypes.Topic, ""),
+            (EventTypes.RoomHistoryVisibility, ""),
+            (EventTypes.GuestAccess, ""),
+            (EventTypes.RoomAvatar, ""),
+        )
+
+        old_room_state_ids = yield self.store.get_filtered_current_state_ids(
+            old_room_id, StateFilter.from_types(types_to_copy),
+        )
+        # map from event_id to BaseEvent
+        old_room_state_events = yield self.store.get_events(old_room_state_ids.values())
+
+        for k, old_event_id in iteritems(old_room_state_ids):
+            old_event = old_room_state_events.get(old_event_id)
+            if old_event:
+                initial_state[k] = old_event.content
+
+        yield self._send_events_for_new_room(
+            requester,
+            new_room_id,
+
+            # we expect to override all the presets with initial_state, so this is
+            # somewhat arbitrary.
+            preset_config=RoomCreationPreset.PRIVATE_CHAT,
+
+            invite_list=[],
+            initial_state=initial_state,
+            creation_content=creation_content,
+        )
+
+        # XXX invites/joins
+        # XXX 3pid invites
+
+    @defer.inlineCallbacks
+    def _move_aliases_to_new_room(
+            self, requester, old_room_id, new_room_id, old_room_state,
+    ):
+        directory_handler = self.hs.get_handlers().directory_handler
+
+        aliases = yield self.store.get_aliases_for_room(old_room_id)
+
+        # check to see if we have a canonical alias.
+        canonical_alias = None
+        canonical_alias_event_id = old_room_state.get((EventTypes.CanonicalAlias, ""))
+        if canonical_alias_event_id:
+            canonical_alias_event = yield self.store.get_event(canonical_alias_event_id)
+            if canonical_alias_event:
+                canonical_alias = canonical_alias_event.content.get("alias", "")
+
+        # first we try to remove the aliases from the old room (we suppress sending
+        # the room_aliases event until the end).
+        #
+        # Note that we'll only be able to remove aliases that (a) aren't owned by an AS,
+        # and (b) unless the user is a server admin, which the user created.
+        #
+        # This is probably correct - given we don't allow such aliases to be deleted
+        # normally, it would be odd to allow it in the case of doing a room upgrade -
+        # but it makes the upgrade less effective, and you have to wonder why a room
+        # admin can't remove aliases that point to that room anyway.
+        # (cf https://github.com/matrix-org/synapse/issues/2360)
+        #
+        removed_aliases = []
+        for alias_str in aliases:
+            alias = RoomAlias.from_string(alias_str)
+            try:
+                yield directory_handler.delete_association(
+                    requester, alias, send_event=False,
+                )
+                removed_aliases.append(alias_str)
+            except SynapseError as e:
+                logger.warning(
+                    "Unable to remove alias %s from old room: %s",
+                    alias, e,
+                )
+
+        # if we didn't find any aliases, or couldn't remove anyway, we can skip the rest
+        # of this.
+        if not removed_aliases:
+            return
+
+        try:
+            # this can fail if, for some reason, our user doesn't have perms to send
+            # m.room.aliases events in the old room (note that we've already checked that
+            # they have perms to send a tombstone event, so that's not terribly likely).
+            #
+            # If that happens, it's regrettable, but we should carry on: it's the same
+            # as when you remove an alias from the directory normally - it just means that
+            # the aliases event gets out of sync with the directory
+            # (cf https://github.com/vector-im/riot-web/issues/2369)
+            yield directory_handler.send_room_alias_update_event(
+                requester, old_room_id,
+            )
+        except AuthError as e:
+            logger.warning(
+                "Failed to send updated alias event on old room: %s", e,
+            )
+
+        # we can now add any aliases we successfully removed to the new room.
+        for alias in removed_aliases:
+            try:
+                yield directory_handler.create_association(
+                    requester, RoomAlias.from_string(alias),
+                    new_room_id, servers=(self.hs.hostname, ),
+                    send_event=False,
+                )
+                logger.info("Moved alias %s to new room", alias)
+            except SynapseError as e:
+                # I'm not really expecting this to happen, but it could if the spam
+                # checking module decides it shouldn't, or similar.
+                logger.error(
+                    "Error adding alias %s to new room: %s",
+                    alias, e,
+                )
+
+        try:
+            if canonical_alias and (canonical_alias in removed_aliases):
+                yield self.event_creation_handler.create_and_send_nonmember_event(
+                    requester,
+                    {
+                        "type": EventTypes.CanonicalAlias,
+                        "state_key": "",
+                        "room_id": new_room_id,
+                        "sender": requester.user.to_string(),
+                        "content": {"alias": canonical_alias, },
+                    },
+                    ratelimit=False
+                )
+
+            yield directory_handler.send_room_alias_update_event(
+                requester, new_room_id,
+            )
+        except SynapseError as e:
+            # again I'm not really expecting this to fail, but if it does, I'd rather
+            # we returned the new room to the client at this point.
+            logger.error(
+                "Unable to send updated alias events in new room: %s", e,
+            )
 
     @defer.inlineCallbacks
     def create_room(self, requester, config, ratelimit=True,
@@ -164,36 +494,16 @@ class RoomCreationHandler(BaseHandler):
         visibility = config.get("visibility", None)
         is_public = visibility == "public"
 
-        # autogen room IDs and try to create it. We may clash, so just
-        # try a few times till one goes through, giving up eventually.
-        attempts = 0
-        room_id = None
-        while attempts < 5:
-            try:
-                random_string = stringutils.random_string(18)
-                gen_room_id = RoomID(
-                    random_string,
-                    self.hs.hostname,
-                )
-                yield self.store.store_room(
-                    room_id=gen_room_id.to_string(),
-                    room_creator_user_id=user_id,
-                    is_public=is_public
-                )
-                room_id = gen_room_id.to_string()
-                break
-            except StoreError:
-                attempts += 1
-        if not room_id:
-            raise StoreError(500, "Couldn't generate a room ID.")
+        room_id = yield self._generate_room_id(creator_id=user_id, is_public=is_public)
 
         if room_alias:
             directory_handler = self.hs.get_handlers().directory_handler
             yield directory_handler.create_association(
-                user_id=user_id,
+                requester=requester,
                 room_id=room_id,
                 room_alias=room_alias,
                 servers=[self.hs.hostname],
+                send_event=False,
             )
 
         preset_config = config.get(
@@ -214,18 +524,15 @@ class RoomCreationHandler(BaseHandler):
         # override any attempt to set room versions via the creation_content
         creation_content["room_version"] = room_version
 
-        room_member_handler = self.hs.get_room_member_handler()
-
         yield self._send_events_for_new_room(
             requester,
             room_id,
-            room_member_handler,
             preset_config=preset_config,
             invite_list=invite_list,
             initial_state=initial_state,
             creation_content=creation_content,
             room_alias=room_alias,
-            power_level_content_override=config.get("power_level_content_override", {}),
+            power_level_content_override=config.get("power_level_content_override"),
             creator_join_profile=creator_join_profile,
         )
 
@@ -261,7 +568,7 @@ class RoomCreationHandler(BaseHandler):
             if is_direct:
                 content["is_direct"] = is_direct
 
-            yield room_member_handler.update_membership(
+            yield self.room_member_handler.update_membership(
                 requester,
                 UserID.from_string(invitee),
                 room_id,
@@ -289,7 +596,7 @@ class RoomCreationHandler(BaseHandler):
         if room_alias:
             result["room_alias"] = room_alias.to_string()
             yield directory_handler.send_room_alias_update_event(
-                requester, user_id, room_id
+                requester, room_id
             )
 
         defer.returnValue(result)
@@ -299,14 +606,13 @@ class RoomCreationHandler(BaseHandler):
             self,
             creator,  # A Requester object.
             room_id,
-            room_member_handler,
             preset_config,
             invite_list,
             initial_state,
             creation_content,
-            room_alias,
-            power_level_content_override,
-            creator_join_profile,
+            room_alias=None,
+            power_level_content_override=None,
+            creator_join_profile=None,
     ):
         def create(etype, content, **kwargs):
             e = {
@@ -322,6 +628,7 @@ class RoomCreationHandler(BaseHandler):
         @defer.inlineCallbacks
         def send(etype, content, **kwargs):
             event = create(etype, content, **kwargs)
+            logger.info("Sending %s in new room", etype)
             yield self.event_creation_handler.create_and_send_nonmember_event(
                 creator,
                 event,
@@ -344,7 +651,8 @@ class RoomCreationHandler(BaseHandler):
             content=creation_content,
         )
 
-        yield room_member_handler.update_membership(
+        logger.info("Sending %s in new room", EventTypes.Member)
+        yield self.room_member_handler.update_membership(
             creator,
             creator.user,
             room_id,
@@ -386,7 +694,8 @@ class RoomCreationHandler(BaseHandler):
                 for invitee in invite_list:
                     power_level_content["users"][invitee] = 100
 
-            power_level_content.update(power_level_content_override)
+            if power_level_content_override:
+                power_level_content.update(power_level_content_override)
 
             yield send(
                 etype=EventTypes.PowerLevels,
@@ -425,6 +734,30 @@ class RoomCreationHandler(BaseHandler):
                 content=content,
             )
 
+    @defer.inlineCallbacks
+    def _generate_room_id(self, creator_id, is_public):
+        # autogen room IDs and try to create it. We may clash, so just
+        # try a few times till one goes through, giving up eventually.
+        attempts = 0
+        while attempts < 5:
+            try:
+                random_string = stringutils.random_string(18)
+                gen_room_id = RoomID(
+                    random_string,
+                    self.hs.hostname,
+                ).to_string()
+                if isinstance(gen_room_id, bytes):
+                    gen_room_id = gen_room_id.decode('utf-8')
+                yield self.store.store_room(
+                    room_id=gen_room_id,
+                    room_creator_user_id=creator_id,
+                    is_public=is_public,
+                )
+                defer.returnValue(gen_room_id)
+            except StoreError:
+                attempts += 1
+        raise StoreError(500, "Couldn't generate a room ID.")
+
 
 class RoomContextHandler(object):
     def __init__(self, hs):
@@ -488,23 +821,24 @@ class RoomContextHandler(object):
         else:
             last_event_id = event_id
 
-        types = None
-        filtered_types = None
         if event_filter and event_filter.lazy_load_members():
-            members = set(ev.sender for ev in itertools.chain(
-                results["events_before"],
-                (results["event"],),
-                results["events_after"],
-            ))
-            filtered_types = [EventTypes.Member]
-            types = [(EventTypes.Member, member) for member in members]
+            state_filter = StateFilter.from_lazy_load_member_list(
+                ev.sender
+                for ev in itertools.chain(
+                    results["events_before"],
+                    (results["event"],),
+                    results["events_after"],
+                )
+            )
+        else:
+            state_filter = StateFilter.all()
 
         # XXX: why do we return the state as of the last event rather than the
         # first? Shouldn't we be consistent with /sync?
         # https://github.com/matrix-org/matrix-doc/issues/687
 
         state = yield self.store.get_state_for_events(
-            [last_event_id], types, filtered_types=filtered_types,
+            [last_event_id], state_filter=state_filter,
         )
         results["state"] = list(state[last_event_id].values())
 
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index 37e41afd61..dc88620885 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -16,7 +16,7 @@
 import logging
 from collections import namedtuple
 
-from six import iteritems
+from six import PY3, iteritems
 from six.moves import range
 
 import msgpack
@@ -162,7 +162,7 @@ class RoomListHandler(BaseHandler):
         # Filter out rooms that we don't want to return
         rooms_to_scan = [
             r for r in sorted_rooms
-            if r not in newly_unpublished and rooms_to_num_joined[room_id] > 0
+            if r not in newly_unpublished and rooms_to_num_joined[r] > 0
         ]
 
         total_room_count = len(rooms_to_scan)
@@ -444,9 +444,16 @@ class RoomListNextBatch(namedtuple("RoomListNextBatch", (
 
     @classmethod
     def from_token(cls, token):
+        if PY3:
+            # The argument raw=False is only available on new versions of
+            # msgpack, and only really needed on Python 3. Gate it behind
+            # a PY3 check to avoid causing issues on Debian-packaged versions.
+            decoded = msgpack.loads(decode_base64(token), raw=False)
+        else:
+            decoded = msgpack.loads(decode_base64(token))
         return RoomListNextBatch(**{
             cls.REVERSE_KEY_DICT[key]: val
-            for key, val in msgpack.loads(decode_base64(token)).items()
+            for key, val in decoded.items()
         })
 
     def to_token(self):
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index f643619047..07fd3e82fc 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -583,6 +583,11 @@ class RoomMemberHandler(object):
         room_id = mapping["room_id"]
         servers = mapping["servers"]
 
+        # put the server which owns the alias at the front of the server list.
+        if room_alias.domain in servers:
+            servers.remove(room_alias.domain)
+        servers.insert(0, room_alias.domain)
+
         defer.returnValue((RoomID.from_string(room_id), servers))
 
     @defer.inlineCallbacks
diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
index c464adbd0b..80e7b15de8 100644
--- a/synapse/handlers/search.py
+++ b/synapse/handlers/search.py
@@ -24,6 +24,7 @@ from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import SynapseError
 from synapse.api.filtering import Filter
 from synapse.events.utils import serialize_event
+from synapse.storage.state import StateFilter
 from synapse.visibility import filter_events_for_client
 
 from ._base import BaseHandler
@@ -54,7 +55,7 @@ class SearchHandler(BaseHandler):
         batch_token = None
         if batch:
             try:
-                b = decode_base64(batch)
+                b = decode_base64(batch).decode('ascii')
                 batch_group, batch_group_key, batch_token = b.split("\n")
 
                 assert batch_group is not None
@@ -258,18 +259,18 @@ class SearchHandler(BaseHandler):
                 # it returns more from the same group (if applicable) rather
                 # than reverting to searching all results again.
                 if batch_group and batch_group_key:
-                    global_next_batch = encode_base64("%s\n%s\n%s" % (
+                    global_next_batch = encode_base64(("%s\n%s\n%s" % (
                         batch_group, batch_group_key, pagination_token
-                    ))
+                    )).encode('ascii'))
                 else:
-                    global_next_batch = encode_base64("%s\n%s\n%s" % (
+                    global_next_batch = encode_base64(("%s\n%s\n%s" % (
                         "all", "", pagination_token
-                    ))
+                    )).encode('ascii'))
 
                 for room_id, group in room_groups.items():
-                    group["next_batch"] = encode_base64("%s\n%s\n%s" % (
+                    group["next_batch"] = encode_base64(("%s\n%s\n%s" % (
                         "room_id", room_id, pagination_token
-                    ))
+                    )).encode('ascii'))
 
             allowed_events.extend(room_events)
 
@@ -324,9 +325,12 @@ class SearchHandler(BaseHandler):
                     else:
                         last_event_id = event.event_id
 
+                    state_filter = StateFilter.from_types(
+                        [(EventTypes.Member, sender) for sender in senders]
+                    )
+
                     state = yield self.store.get_state_for_event(
-                        last_event_id,
-                        types=[(EventTypes.Member, sender) for sender in senders]
+                        last_event_id, state_filter
                     )
 
                     res["profile_info"] = {
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index ef20c2296c..09739f2862 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -20,10 +20,14 @@ import logging
 
 from six import iteritems, itervalues
 
+from prometheus_client import Counter
+
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, Membership
 from synapse.push.clientformat import format_push_rules_for_user
+from synapse.storage.roommember import MemberSummary
+from synapse.storage.state import StateFilter
 from synapse.types import RoomStreamToken
 from synapse.util.async_helpers import concurrently_execute
 from synapse.util.caches.expiringcache import ExpiringCache
@@ -35,6 +39,19 @@ from synapse.visibility import filter_events_for_client
 
 logger = logging.getLogger(__name__)
 
+
+# Counts the number of times we returned a non-empty sync. `type` is one of
+# "initial_sync", "full_state_sync" or "incremental_sync", `lazy_loaded` is
+# "true" or "false" depending on if the request asked for lazy loaded members or
+# not.
+non_empty_sync_counter = Counter(
+    "synapse_handlers_sync_nonempty_total",
+    "Count of non empty sync responses. type is initial_sync/full_state_sync"
+    "/incremental_sync. lazy_loaded indicates if lazy loaded members were "
+    "enabled for that request.",
+    ["type", "lazy_loaded"],
+)
+
 # Store the cache that tracks which lazy-loaded members have been sent to a given
 # client for no more than 30 minutes.
 LAZY_LOADED_MEMBERS_CACHE_MAX_AGE = 30 * 60 * 1000
@@ -226,14 +243,16 @@ class SyncHandler(object):
     @defer.inlineCallbacks
     def _wait_for_sync_for_user(self, sync_config, since_token, timeout,
                                 full_state):
+        if since_token is None:
+            sync_type = "initial_sync"
+        elif full_state:
+            sync_type = "full_state_sync"
+        else:
+            sync_type = "incremental_sync"
+
         context = LoggingContext.current_context()
         if context:
-            if since_token is None:
-                context.tag = "initial_sync"
-            elif full_state:
-                context.tag = "full_state_sync"
-            else:
-                context.tag = "incremental_sync"
+            context.tag = sync_type
 
         if timeout == 0 or since_token is None or full_state:
             # we are going to return immediately, so don't bother calling
@@ -241,7 +260,6 @@ class SyncHandler(object):
             result = yield self.current_sync_for_user(
                 sync_config, since_token, full_state=full_state,
             )
-            defer.returnValue(result)
         else:
             def current_sync_callback(before_token, after_token):
                 return self.current_sync_for_user(sync_config, since_token)
@@ -250,7 +268,15 @@ class SyncHandler(object):
                 sync_config.user.to_string(), timeout, current_sync_callback,
                 from_token=since_token,
             )
-            defer.returnValue(result)
+
+        if result:
+            if sync_config.filter_collection.lazy_load_members():
+                lazy_loaded = "true"
+            else:
+                lazy_loaded = "false"
+            non_empty_sync_counter.labels(sync_type, lazy_loaded).inc()
+
+        defer.returnValue(result)
 
     def current_sync_for_user(self, sync_config, since_token=None,
                               full_state=False):
@@ -444,25 +470,20 @@ class SyncHandler(object):
         ))
 
     @defer.inlineCallbacks
-    def get_state_after_event(self, event, types=None, filtered_types=None):
+    def get_state_after_event(self, event, state_filter=StateFilter.all()):
         """
         Get the room state after the given event
 
         Args:
             event(synapse.events.EventBase): event of interest
-            types(list[(str, str|None)]|None): List of (type, state_key) tuples
-                which are used to filter the state fetched. If `state_key` is None,
-                all events are returned of the given type.
-                May be None, which matches any key.
-            filtered_types(list[str]|None): Only apply filtering via `types` to this
-                list of event types.  Other types of events are returned unfiltered.
-                If None, `types` filtering is applied to all events.
+            state_filter (StateFilter): The state filter used to fetch state
+                from the database.
 
         Returns:
             A Deferred map from ((type, state_key)->Event)
         """
         state_ids = yield self.store.get_state_ids_for_event(
-            event.event_id, types, filtered_types=filtered_types,
+            event.event_id, state_filter=state_filter,
         )
         if event.is_state():
             state_ids = state_ids.copy()
@@ -470,18 +491,14 @@ class SyncHandler(object):
         defer.returnValue(state_ids)
 
     @defer.inlineCallbacks
-    def get_state_at(self, room_id, stream_position, types=None, filtered_types=None):
+    def get_state_at(self, room_id, stream_position, state_filter=StateFilter.all()):
         """ Get the room state at a particular stream position
 
         Args:
             room_id(str): room for which to get state
             stream_position(StreamToken): point at which to get state
-            types(list[(str, str|None)]|None): List of (type, state_key) tuples
-                which are used to filter the state fetched. If `state_key` is None,
-                all events are returned of the given type.
-            filtered_types(list[str]|None): Only apply filtering via `types` to this
-                list of event types.  Other types of events are returned unfiltered.
-                If None, `types` filtering is applied to all events.
+            state_filter (StateFilter): The state filter used to fetch state
+                from the database.
 
         Returns:
             A Deferred map from ((type, state_key)->Event)
@@ -497,7 +514,7 @@ class SyncHandler(object):
         if last_events:
             last_event = last_events[-1]
             state = yield self.get_state_after_event(
-                last_event, types, filtered_types=filtered_types,
+                last_event, state_filter=state_filter,
             )
 
         else:
@@ -525,6 +542,8 @@ class SyncHandler(object):
              A deferred dict describing the room summary
         """
 
+        # FIXME: we could/should get this from room_stats when matthew/stats lands
+
         # FIXME: this promulgates https://github.com/matrix-org/synapse/issues/3305
         last_events, _ = yield self.store.get_recent_event_ids_for_room(
             room_id, end_token=now_token.room_key, limit=1,
@@ -536,45 +555,69 @@ class SyncHandler(object):
 
         last_event = last_events[-1]
         state_ids = yield self.store.get_state_ids_for_event(
-            last_event.event_id, [
-                (EventTypes.Member, None),
+            last_event.event_id,
+            state_filter=StateFilter.from_types([
                 (EventTypes.Name, ''),
                 (EventTypes.CanonicalAlias, ''),
-            ]
+            ]),
         )
 
-        member_ids = {
-            state_key: event_id
-            for (t, state_key), event_id in state_ids.iteritems()
-            if t == EventTypes.Member
-        }
+        # this is heavily cached, thus: fast.
+        details = yield self.store.get_room_summary(room_id)
+
         name_id = state_ids.get((EventTypes.Name, ''))
         canonical_alias_id = state_ids.get((EventTypes.CanonicalAlias, ''))
 
         summary = {}
-
-        # FIXME: it feels very heavy to load up every single membership event
-        # just to calculate the counts.
-        member_events = yield self.store.get_events(member_ids.values())
-
-        joined_user_ids = []
-        invited_user_ids = []
-
-        for ev in member_events.values():
-            if ev.content.get("membership") == Membership.JOIN:
-                joined_user_ids.append(ev.state_key)
-            elif ev.content.get("membership") == Membership.INVITE:
-                invited_user_ids.append(ev.state_key)
+        empty_ms = MemberSummary([], 0)
 
         # TODO: only send these when they change.
-        summary["m.joined_member_count"] = len(joined_user_ids)
-        summary["m.invited_member_count"] = len(invited_user_ids)
+        summary["m.joined_member_count"] = (
+            details.get(Membership.JOIN, empty_ms).count
+        )
+        summary["m.invited_member_count"] = (
+            details.get(Membership.INVITE, empty_ms).count
+        )
 
-        if name_id or canonical_alias_id:
-            defer.returnValue(summary)
+        # if the room has a name or canonical_alias set, we can skip
+        # calculating heroes.  we assume that if the event has contents, it'll
+        # be a valid name or canonical_alias - i.e. we're checking that they
+        # haven't been "deleted" by blatting {} over the top.
+        if name_id:
+            name = yield self.store.get_event(name_id, allow_none=True)
+            if name and name.content:
+                defer.returnValue(summary)
+
+        if canonical_alias_id:
+            canonical_alias = yield self.store.get_event(
+                canonical_alias_id, allow_none=True,
+            )
+            if canonical_alias and canonical_alias.content:
+                defer.returnValue(summary)
+
+        joined_user_ids = [
+            r[0] for r in details.get(Membership.JOIN, empty_ms).members
+        ]
+        invited_user_ids = [
+            r[0] for r in details.get(Membership.INVITE, empty_ms).members
+        ]
+        gone_user_ids = (
+            [r[0] for r in details.get(Membership.LEAVE, empty_ms).members] +
+            [r[0] for r in details.get(Membership.BAN, empty_ms).members]
+        )
 
-        # FIXME: order by stream ordering, not alphabetic
+        # FIXME: only build up a member_ids list for our heroes
+        member_ids = {}
+        for membership in (
+            Membership.JOIN,
+            Membership.INVITE,
+            Membership.LEAVE,
+            Membership.BAN
+        ):
+            for user_id, event_id in details.get(membership, empty_ms).members:
+                member_ids[user_id] = event_id
 
+        # FIXME: order by stream ordering rather than as returned by SQL
         me = sync_config.user.to_string()
         if (joined_user_ids or invited_user_ids):
             summary['m.heroes'] = sorted(
@@ -586,7 +629,11 @@ class SyncHandler(object):
             )[0:5]
         else:
             summary['m.heroes'] = sorted(
-                [user_id for user_id in member_ids.keys() if user_id != me]
+                [
+                    user_id
+                    for user_id in gone_user_ids
+                    if user_id != me
+                ]
             )[0:5]
 
         if not sync_config.filter_collection.lazy_load_members():
@@ -663,8 +710,7 @@ class SyncHandler(object):
 
         with Measure(self.clock, "compute_state_delta"):
 
-            types = None
-            filtered_types = None
+            members_to_fetch = None
 
             lazy_load_members = sync_config.filter_collection.lazy_load_members()
             include_redundant_members = (
@@ -675,16 +721,21 @@ class SyncHandler(object):
                 # We only request state for the members needed to display the
                 # timeline:
 
-                types = [
-                    (EventTypes.Member, state_key)
-                    for state_key in set(
-                        event.sender  # FIXME: we also care about invite targets etc.
-                        for event in batch.events
-                    )
-                ]
+                members_to_fetch = set(
+                    event.sender  # FIXME: we also care about invite targets etc.
+                    for event in batch.events
+                )
+
+                if full_state:
+                    # always make sure we LL ourselves so we know we're in the room
+                    # (if we are) to fix https://github.com/vector-im/riot-web/issues/7209
+                    # We only need apply this on full state syncs given we disabled
+                    # LL for incr syncs in #3840.
+                    members_to_fetch.add(sync_config.user.to_string())
 
-                # only apply the filtering to room members
-                filtered_types = [EventTypes.Member]
+                state_filter = StateFilter.from_lazy_load_member_list(members_to_fetch)
+            else:
+                state_filter = StateFilter.all()
 
             timeline_state = {
                 (event.type, event.state_key): event.event_id
@@ -694,19 +745,17 @@ class SyncHandler(object):
             if full_state:
                 if batch:
                     current_state_ids = yield self.store.get_state_ids_for_event(
-                        batch.events[-1].event_id, types=types,
-                        filtered_types=filtered_types,
+                        batch.events[-1].event_id, state_filter=state_filter,
                     )
 
                     state_ids = yield self.store.get_state_ids_for_event(
-                        batch.events[0].event_id, types=types,
-                        filtered_types=filtered_types,
+                        batch.events[0].event_id, state_filter=state_filter,
                     )
 
                 else:
                     current_state_ids = yield self.get_state_at(
-                        room_id, stream_position=now_token, types=types,
-                        filtered_types=filtered_types,
+                        room_id, stream_position=now_token,
+                        state_filter=state_filter,
                     )
 
                     state_ids = current_state_ids
@@ -719,19 +768,31 @@ class SyncHandler(object):
                     lazy_load_members=lazy_load_members,
                 )
             elif batch.limited:
-                state_at_previous_sync = yield self.get_state_at(
-                    room_id, stream_position=since_token, types=types,
-                    filtered_types=filtered_types,
+                state_at_timeline_start = yield self.store.get_state_ids_for_event(
+                    batch.events[0].event_id, state_filter=state_filter,
                 )
 
-                current_state_ids = yield self.store.get_state_ids_for_event(
-                    batch.events[-1].event_id, types=types,
-                    filtered_types=filtered_types,
+                # for now, we disable LL for gappy syncs - see
+                # https://github.com/vector-im/riot-web/issues/7211#issuecomment-419976346
+                # N.B. this slows down incr syncs as we are now processing way
+                # more state in the server than if we were LLing.
+                #
+                # We still have to filter timeline_start to LL entries (above) in order
+                # for _calculate_state's LL logic to work, as we have to include LL
+                # members for timeline senders in case they weren't loaded in the initial
+                # sync.  We do this by (counterintuitively) by filtering timeline_start
+                # members to just be ones which were timeline senders, which then ensures
+                # all of the rest get included in the state block (if we need to know
+                # about them).
+                state_filter = StateFilter.all()
+
+                state_at_previous_sync = yield self.get_state_at(
+                    room_id, stream_position=since_token,
+                    state_filter=state_filter,
                 )
 
-                state_at_timeline_start = yield self.store.get_state_ids_for_event(
-                    batch.events[0].event_id, types=types,
-                    filtered_types=filtered_types,
+                current_state_ids = yield self.store.get_state_ids_for_event(
+                    batch.events[-1].event_id, state_filter=state_filter,
                 )
 
                 state_ids = _calculate_state(
@@ -739,22 +800,28 @@ class SyncHandler(object):
                     timeline_start=state_at_timeline_start,
                     previous=state_at_previous_sync,
                     current=current_state_ids,
+                    # we have to include LL members in case LL initial sync missed them
                     lazy_load_members=lazy_load_members,
                 )
             else:
                 state_ids = {}
                 if lazy_load_members:
-                    if types:
-                        # We're returning an incremental sync, with no "gap" since
-                        # the previous sync, so normally there would be no state to return
+                    if members_to_fetch and batch.events:
+                        # We're returning an incremental sync, with no
+                        # "gap" since the previous sync, so normally there would be
+                        # no state to return.
                         # But we're lazy-loading, so the client might need some more
                         # member events to understand the events in this timeline.
                         # So we fish out all the member events corresponding to the
                         # timeline here, and then dedupe any redundant ones below.
 
                         state_ids = yield self.store.get_state_ids_for_event(
-                            batch.events[0].event_id, types=types,
-                            filtered_types=None,  # we only want members!
+                            batch.events[0].event_id,
+                            # we only want members!
+                            state_filter=StateFilter.from_types(
+                                (EventTypes.Member, member)
+                                for member in members_to_fetch
+                            ),
                         )
 
             if lazy_load_members and not include_redundant_members:
@@ -774,7 +841,7 @@ class SyncHandler(object):
                     logger.debug("filtering state from %r...", state_ids)
                     state_ids = {
                         t: event_id
-                        for t, event_id in state_ids.iteritems()
+                        for t, event_id in iteritems(state_ids)
                         if cache.get(t[1]) != event_id
                     }
                     logger.debug("...to %r", state_ids)
@@ -1575,6 +1642,19 @@ class SyncHandler(object):
             newly_joined_room=newly_joined,
         )
 
+        # When we join the room (or the client requests full_state), we should
+        # send down any existing tags. Usually the user won't have tags in a
+        # newly joined room, unless either a) they've joined before or b) the
+        # tag was added by synapse e.g. for server notice rooms.
+        if full_state:
+            user_id = sync_result_builder.sync_config.user.to_string()
+            tags = yield self.store.get_tags_for_room(user_id, room_id)
+
+            # If there aren't any tags, don't send the empty tags list down
+            # sync
+            if not tags:
+                tags = None
+
         account_data_events = []
         if tags is not None:
             account_data_events.append({
@@ -1603,10 +1683,24 @@ class SyncHandler(object):
         )
 
         summary = {}
+
+        # we include a summary in room responses when we're lazy loading
+        # members (as the client otherwise doesn't have enough info to form
+        # the name itself).
         if (
             sync_config.filter_collection.lazy_load_members() and
             (
+                # we recalulate the summary:
+                #   if there are membership changes in the timeline, or
+                #   if membership has changed during a gappy sync, or
+                #   if this is an initial sync.
                 any(ev.type == EventTypes.Member for ev in batch.events) or
+                (
+                    # XXX: this may include false positives in the form of LL
+                    # members which have snuck into state
+                    batch.limited and
+                    any(t == EventTypes.Member for (t, k) in state)
+                ) or
                 since_token is None
             )
         ):
@@ -1636,6 +1730,16 @@ class SyncHandler(object):
                     unread_notifications["highlight_count"] = notifs["highlight_count"]
 
                 sync_result_builder.joined.append(room_sync)
+
+            if batch.limited and since_token:
+                user_id = sync_result_builder.sync_config.user.to_string()
+                logger.info(
+                    "Incremental gappy sync of %s for user %s with %d state events" % (
+                        room_id,
+                        user_id,
+                        len(state),
+                    )
+                )
         elif room_builder.rtype == "archived":
             room_sync = ArchivedSyncResult(
                 room_id=room_id,
@@ -1729,17 +1833,17 @@ def _calculate_state(
     event_id_to_key = {
         e: key
         for key, e in itertools.chain(
-            timeline_contains.items(),
-            previous.items(),
-            timeline_start.items(),
-            current.items(),
+            iteritems(timeline_contains),
+            iteritems(previous),
+            iteritems(timeline_start),
+            iteritems(current),
         )
     }
 
-    c_ids = set(e for e in current.values())
-    ts_ids = set(e for e in timeline_start.values())
-    p_ids = set(e for e in previous.values())
-    tc_ids = set(e for e in timeline_contains.values())
+    c_ids = set(e for e in itervalues(current))
+    ts_ids = set(e for e in itervalues(timeline_start))
+    p_ids = set(e for e in itervalues(previous))
+    tc_ids = set(e for e in itervalues(timeline_contains))
 
     # If we are lazyloading room members, we explicitly add the membership events
     # for the senders in the timeline into the state block returned by /sync,
@@ -1753,7 +1857,7 @@ def _calculate_state(
 
     if lazy_load_members:
         p_ids.difference_update(
-            e for t, e in timeline_start.iteritems()
+            e for t, e in iteritems(timeline_start)
             if t[0] == EventTypes.Member
         )
 
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 2d2d3d5a0d..c610933dd4 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -20,6 +20,7 @@ from twisted.internet import defer
 
 from synapse.api.errors import AuthError, SynapseError
 from synapse.types import UserID, get_domain_from_id
+from synapse.util.caches.stream_change_cache import StreamChangeCache
 from synapse.util.logcontext import run_in_background
 from synapse.util.metrics import Measure
 from synapse.util.wheel_timer import WheelTimer
@@ -68,6 +69,11 @@ class TypingHandler(object):
         # map room IDs to sets of users currently typing
         self._room_typing = {}
 
+        # caches which room_ids changed at which serials
+        self._typing_stream_change_cache = StreamChangeCache(
+            "TypingStreamChangeCache", self._latest_room_serial,
+        )
+
         self.clock.looping_call(
             self._handle_timeouts,
             5000,
@@ -218,6 +224,7 @@ class TypingHandler(object):
 
             for domain in set(get_domain_from_id(u) for u in users):
                 if domain != self.server_name:
+                    logger.debug("sending typing update to %s", domain)
                     self.federation.send_edu(
                         destination=domain,
                         edu_type="m.typing",
@@ -274,19 +281,29 @@ class TypingHandler(object):
 
         self._latest_room_serial += 1
         self._room_serials[member.room_id] = self._latest_room_serial
+        self._typing_stream_change_cache.entity_has_changed(
+            member.room_id, self._latest_room_serial,
+        )
 
         self.notifier.on_new_event(
             "typing_key", self._latest_room_serial, rooms=[member.room_id]
         )
 
     def get_all_typing_updates(self, last_id, current_id):
-        # TODO: Work out a way to do this without scanning the entire state.
         if last_id == current_id:
             return []
 
+        changed_rooms = self._typing_stream_change_cache.get_all_entities_changed(
+            last_id,
+        )
+
+        if changed_rooms is None:
+            changed_rooms = self._room_serials
+
         rows = []
-        for room_id, serial in self._room_serials.items():
-            if last_id < serial and serial <= current_id:
+        for room_id in changed_rooms:
+            serial = self._room_serials[room_id]
+            if last_id < serial <= current_id:
                 typing = self._room_typing[room_id]
                 rows.append((serial, room_id, list(typing)))
         rows.sort()
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index d8413d6aa7..f11b430126 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -20,6 +20,7 @@ from six import iteritems
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, JoinRules, Membership
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage.roommember import ProfileInfo
 from synapse.types import get_localpart_from_id
 from synapse.util.metrics import Measure
@@ -98,7 +99,6 @@ class UserDirectoryHandler(object):
         """
         return self.store.search_user_dir(user_id, search_term, limit)
 
-    @defer.inlineCallbacks
     def notify_new_event(self):
         """Called when there may be more deltas to process
         """
@@ -108,11 +108,15 @@ class UserDirectoryHandler(object):
         if self._is_processing:
             return
 
+        @defer.inlineCallbacks
+        def process():
+            try:
+                yield self._unsafe_process()
+            finally:
+                self._is_processing = False
+
         self._is_processing = True
-        try:
-            yield self._unsafe_process()
-        finally:
-            self._is_processing = False
+        run_as_background_process("user_directory.notify_new_event", process)
 
     @defer.inlineCallbacks
     def handle_local_profile_change(self, user_id, profile):