summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/api/auth.py6
-rw-r--r--synapse/api/errors.py16
-rwxr-xr-xsynapse/app/homeserver.py22
-rw-r--r--synapse/app/media_repository.py10
-rw-r--r--synapse/app/synchrotron.py3
-rw-r--r--synapse/appservice/__init__.py37
-rw-r--r--synapse/config/appservice.py1
-rw-r--r--synapse/config/homeserver.py3
-rw-r--r--synapse/config/push.py44
-rw-r--r--synapse/config/server.py6
-rw-r--r--synapse/config/user_directory.py44
-rw-r--r--synapse/crypto/event_signing.py13
-rw-r--r--synapse/federation/federation_client.py4
-rw-r--r--synapse/federation/transaction_queue.py10
-rw-r--r--synapse/handlers/appservice.py4
-rw-r--r--synapse/handlers/auth.py212
-rw-r--r--synapse/handlers/deactivate_account.py52
-rw-r--r--synapse/handlers/device.py20
-rw-r--r--synapse/handlers/groups_local.py11
-rw-r--r--synapse/handlers/initial_sync.py4
-rw-r--r--synapse/handlers/presence.py2
-rw-r--r--synapse/handlers/profile.py14
-rw-r--r--synapse/handlers/register.py8
-rw-r--r--synapse/handlers/room_list.py50
-rw-r--r--synapse/handlers/set_password.py56
-rw-r--r--synapse/handlers/user_directory.py72
-rw-r--r--synapse/http/endpoint.py6
-rw-r--r--synapse/http/server.py12
-rw-r--r--synapse/module_api/__init__.py14
-rw-r--r--synapse/notifier.py17
-rw-r--r--synapse/push/httppusher.py3
-rw-r--r--synapse/push/pusherpool.py30
-rw-r--r--synapse/replication/slave/storage/events.py39
-rw-r--r--synapse/replication/tcp/resource.py6
-rw-r--r--synapse/rest/client/v1/admin.py8
-rw-r--r--synapse/rest/client/v1/logout.py27
-rw-r--r--synapse/rest/client/v2_alpha/_base.py41
-rw-r--r--synapse/rest/client/v2_alpha/account.py125
-rw-r--r--synapse/rest/client/v2_alpha/devices.py36
-rw-r--r--synapse/rest/client/v2_alpha/groups.py22
-rw-r--r--synapse/rest/client/v2_alpha/register.py9
-rw-r--r--synapse/rest/client/versions.py1
-rw-r--r--synapse/rest/media/v1/preview_url_resource.py16
-rw-r--r--synapse/server.py56
-rw-r--r--synapse/server.pyi18
-rw-r--r--synapse/storage/_base.py99
-rw-r--r--synapse/storage/account_data.py85
-rw-r--r--synapse/storage/background_updates.py35
-rw-r--r--synapse/storage/media_repository.py16
-rw-r--r--synapse/storage/profile.py27
-rw-r--r--synapse/storage/pusher.py76
-rw-r--r--synapse/storage/registration.py10
-rw-r--r--synapse/storage/schema/delta/44/expire_url_cache.sql5
-rw-r--r--synapse/storage/schema/delta/46/drop_unique_deleted_pushers.sql35
-rw-r--r--synapse/storage/schema/delta/46/local_media_repository_url_idx.sql24
-rw-r--r--synapse/storage/schema/delta/46/user_dir_null_room_ids.sql35
-rw-r--r--synapse/storage/state.py441
-rw-r--r--synapse/storage/stream.py4
-rw-r--r--synapse/storage/user_directory.py48
-rw-r--r--synapse/util/async.py6
-rw-r--r--synapse/util/distributor.py22
-rw-r--r--synapse/util/logcontext.py61
-rw-r--r--synapse/visibility.py4
63 files changed, 1462 insertions, 781 deletions
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index 72858cca1f..ac0a3655a5 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -270,7 +270,11 @@ class Auth(object):
             rights (str): The operation being performed; the access token must
                 allow this.
         Returns:
-            dict : dict that includes the user and the ID of their access token.
+            Deferred[dict]: dict that includes:
+               `user` (UserID)
+               `is_guest` (bool)
+               `token_id` (int|None): access token id. May be None if guest
+               `device_id` (str|None): device corresponding to access token
         Raises:
             AuthError if no user by that token exists or the token is invalid.
         """
diff --git a/synapse/api/errors.py b/synapse/api/errors.py
index d0dfa959dc..79b35b3e7c 100644
--- a/synapse/api/errors.py
+++ b/synapse/api/errors.py
@@ -140,6 +140,22 @@ class RegistrationError(SynapseError):
     pass
 
 
+class InteractiveAuthIncompleteError(Exception):
+    """An error raised when UI auth is not yet complete
+
+    (This indicates we should return a 401 with 'result' as the body)
+
+    Attributes:
+        result (dict): the server response to the request, which should be
+            passed back to the client
+    """
+    def __init__(self, result):
+        super(InteractiveAuthIncompleteError, self).__init__(
+            "Interactive auth not yet complete",
+        )
+        self.result = result
+
+
 class UnrecognizedRequestError(SynapseError):
     """An error indicating we don't understand the request you're trying to make"""
     def __init__(self, *args, **kwargs):
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 9e26146338..6b8875afb4 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -43,7 +43,6 @@ from synapse.rest import ClientRestResource
 from synapse.rest.key.v1.server_key_resource import LocalKey
 from synapse.rest.key.v2 import KeyApiV2Resource
 from synapse.rest.media.v0.content_repository import ContentRepoResource
-from synapse.rest.media.v1.media_repository import MediaRepositoryResource
 from synapse.server import HomeServer
 from synapse.storage import are_all_users_on_domain
 from synapse.storage.engines import IncorrectDatabaseSetup, create_engine
@@ -195,14 +194,19 @@ class SynapseHomeServer(HomeServer):
             })
 
         if name in ["media", "federation", "client"]:
-            media_repo = MediaRepositoryResource(self)
-            resources.update({
-                MEDIA_PREFIX: media_repo,
-                LEGACY_MEDIA_PREFIX: media_repo,
-                CONTENT_REPO_PREFIX: ContentRepoResource(
-                    self, self.config.uploads_path
-                ),
-            })
+            if self.get_config().enable_media_repo:
+                media_repo = self.get_media_repository_resource()
+                resources.update({
+                    MEDIA_PREFIX: media_repo,
+                    LEGACY_MEDIA_PREFIX: media_repo,
+                    CONTENT_REPO_PREFIX: ContentRepoResource(
+                        self, self.config.uploads_path
+                    ),
+                })
+            elif name == "media":
+                raise ConfigError(
+                    "'media' resource conflicts with enable_media_repo=False",
+                )
 
         if name in ["keys", "federation"]:
             resources.update({
diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py
index 36c18bdbcb..c4e5f0965d 100644
--- a/synapse/app/media_repository.py
+++ b/synapse/app/media_repository.py
@@ -35,7 +35,6 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto
 from synapse.replication.slave.storage.transactions import TransactionStore
 from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.rest.media.v0.content_repository import ContentRepoResource
-from synapse.rest.media.v1.media_repository import MediaRepositoryResource
 from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
 from synapse.storage.media_repository import MediaRepositoryStore
@@ -89,7 +88,7 @@ class MediaRepositoryServer(HomeServer):
                 if name == "metrics":
                     resources[METRICS_PREFIX] = MetricsResource(self)
                 elif name == "media":
-                    media_repo = MediaRepositoryResource(self)
+                    media_repo = self.get_media_repository_resource()
                     resources.update({
                         MEDIA_PREFIX: media_repo,
                         LEGACY_MEDIA_PREFIX: media_repo,
@@ -151,6 +150,13 @@ def start(config_options):
 
     assert config.worker_app == "synapse.app.media_repository"
 
+    if config.enable_media_repo:
+        _base.quit_with_error(
+            "enable_media_repo must be disabled in the main synapse process\n"
+            "before the media repo can be run in a separate worker.\n"
+            "Please add ``enable_media_repo: false`` to the main config\n"
+        )
+
     setup_logging(config, use_worker_options=True)
 
     events.USE_FROZEN_DICTS = config.use_frozen_dicts
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 576ac6fb7e..323fddee21 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -340,11 +340,10 @@ class SyncReplicationHandler(ReplicationClientHandler):
 
         self.store = hs.get_datastore()
         self.typing_handler = hs.get_typing_handler()
+        # NB this is a SynchrotronPresence, not a normal PresenceHandler
         self.presence_handler = hs.get_presence_handler()
         self.notifier = hs.get_notifier()
 
-        self.presence_handler.sync_callback = self.send_user_sync
-
     def on_rdata(self, stream_name, token, rows):
         super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows)
 
diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py
index b989007314..d5a7a5ce2f 100644
--- a/synapse/appservice/__init__.py
+++ b/synapse/appservice/__init__.py
@@ -14,6 +14,7 @@
 # limitations under the License.
 from synapse.api.constants import EventTypes
 from synapse.util.caches.descriptors import cachedInlineCallbacks
+from synapse.types import GroupID, get_domain_from_id
 
 from twisted.internet import defer
 
@@ -81,12 +82,13 @@ class ApplicationService(object):
     # values.
     NS_LIST = [NS_USERS, NS_ALIASES, NS_ROOMS]
 
-    def __init__(self, token, url=None, namespaces=None, hs_token=None,
+    def __init__(self, token, hostname, url=None, namespaces=None, hs_token=None,
                  sender=None, id=None, protocols=None, rate_limited=True):
         self.token = token
         self.url = url
         self.hs_token = hs_token
         self.sender = sender
+        self.server_name = hostname
         self.namespaces = self._check_namespaces(namespaces)
         self.id = id
 
@@ -125,6 +127,24 @@ class ApplicationService(object):
                     raise ValueError(
                         "Expected bool for 'exclusive' in ns '%s'" % ns
                     )
+                group_id = regex_obj.get("group_id")
+                if group_id:
+                    if not isinstance(group_id, str):
+                        raise ValueError(
+                            "Expected string for 'group_id' in ns '%s'" % ns
+                        )
+                    try:
+                        GroupID.from_string(group_id)
+                    except Exception:
+                        raise ValueError(
+                            "Expected valid group ID for 'group_id' in ns '%s'" % ns
+                        )
+
+                    if get_domain_from_id(group_id) != self.server_name:
+                        raise ValueError(
+                            "Expected 'group_id' to be this host in ns '%s'" % ns
+                        )
+
                 regex = regex_obj.get("regex")
                 if isinstance(regex, basestring):
                     regex_obj["regex"] = re.compile(regex)  # Pre-compile regex
@@ -251,6 +271,21 @@ class ApplicationService(object):
             if regex_obj["exclusive"]
         ]
 
+    def get_groups_for_user(self, user_id):
+        """Get the groups that this user is associated with by this AS
+
+        Args:
+            user_id (str): The ID of the user.
+
+        Returns:
+            iterable[str]: an iterable that yields group_id strings.
+        """
+        return (
+            regex_obj["group_id"]
+            for regex_obj in self.namespaces[ApplicationService.NS_USERS]
+            if "group_id" in regex_obj and regex_obj["regex"].match(user_id)
+        )
+
     def is_rate_limited(self):
         return self.rate_limited
 
diff --git a/synapse/config/appservice.py b/synapse/config/appservice.py
index 82c50b8240..aba0aec6e8 100644
--- a/synapse/config/appservice.py
+++ b/synapse/config/appservice.py
@@ -154,6 +154,7 @@ def _load_appservice(hostname, as_info, config_filename):
         )
     return ApplicationService(
         token=as_info["as_token"],
+        hostname=hostname,
         url=as_info["url"],
         namespaces=as_info["namespaces"],
         hs_token=as_info["hs_token"],
diff --git a/synapse/config/homeserver.py b/synapse/config/homeserver.py
index 05e242aef6..bf19cfee29 100644
--- a/synapse/config/homeserver.py
+++ b/synapse/config/homeserver.py
@@ -36,6 +36,7 @@ from .workers import WorkerConfig
 from .push import PushConfig
 from .spam_checker import SpamCheckerConfig
 from .groups import GroupsConfig
+from .user_directory import UserDirectoryConfig
 
 
 class HomeServerConfig(TlsConfig, ServerConfig, DatabaseConfig, LoggingConfig,
@@ -44,7 +45,7 @@ class HomeServerConfig(TlsConfig, ServerConfig, DatabaseConfig, LoggingConfig,
                        AppServiceConfig, KeyConfig, SAML2Config, CasConfig,
                        JWTConfig, PasswordConfig, EmailConfig,
                        WorkerConfig, PasswordAuthProviderConfig, PushConfig,
-                       SpamCheckerConfig, GroupsConfig,):
+                       SpamCheckerConfig, GroupsConfig, UserDirectoryConfig,):
     pass
 
 
diff --git a/synapse/config/push.py b/synapse/config/push.py
index 9c68318b40..b7e0d46afa 100644
--- a/synapse/config/push.py
+++ b/synapse/config/push.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2017 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -18,28 +19,43 @@ from ._base import Config
 
 class PushConfig(Config):
     def read_config(self, config):
-        self.push_redact_content = False
+        push_config = config.get("push", {})
+        self.push_include_content = push_config.get("include_content", True)
 
+        # There was a a 'redact_content' setting but mistakenly read from the
+        # 'email'section'. Check for the flag in the 'push' section, and log,
+        # but do not honour it to avoid nasty surprises when people upgrade.
+        if push_config.get("redact_content") is not None:
+            print(
+                "The push.redact_content content option has never worked. "
+                "Please set push.include_content if you want this behaviour"
+            )
+
+        # Now check for the one in the 'email' section and honour it,
+        # with a warning.
         push_config = config.get("email", {})
-        self.push_redact_content = push_config.get("redact_content", False)
+        redact_content = push_config.get("redact_content")
+        if redact_content is not None:
+            print(
+                "The 'email.redact_content' option is deprecated: "
+                "please set push.include_content instead"
+            )
+            self.push_include_content = not redact_content
 
     def default_config(self, config_dir_path, server_name, **kwargs):
         return """
-        # Control how push messages are sent to google/apple to notifications.
-        # Normally every message said in a room with one or more people using
-        # mobile devices will be posted to a push server hosted by matrix.org
-        # which is registered with google and apple in order to allow push
-        # notifications to be sent to these mobile devices.
-        #
-        # Setting redact_content to true will make the push messages contain no
-        # message content which will provide increased privacy. This is a
-        # temporary solution pending improvements to Android and iPhone apps
-        # to get content from the app rather than the notification.
-        #
+        # Clients requesting push notifications can either have the body of
+        # the message sent in the notification poke along with other details
+        # like the sender, or just the event ID and room ID (`event_id_only`).
+        # If clients choose the former, this option controls whether the
+        # notification request includes the content of the event (other details
+        # like the sender are still included). For `event_id_only` push, it
+        # has no effect.
+
         # For modern android devices the notification content will still appear
         # because it is loaded by the app. iPhone, however will send a
         # notification saying only that a message arrived and who it came from.
         #
         #push:
-        #   redact_content: false
+        #   include_content: true
         """
diff --git a/synapse/config/server.py b/synapse/config/server.py
index 4d9193536d..edb90a1348 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -41,6 +41,12 @@ class ServerConfig(Config):
         # false only if we are updating the user directory in a worker
         self.update_user_directory = config.get("update_user_directory", True)
 
+        # whether to enable the media repository endpoints. This should be set
+        # to false if the media repository is running as a separate endpoint;
+        # doing so ensures that we will not run cache cleanup jobs on the
+        # master, potentially causing inconsistency.
+        self.enable_media_repo = config.get("enable_media_repo", True)
+
         self.filter_timeline_limit = config.get("filter_timeline_limit", -1)
 
         # Whether we should block invites sent to users on this server
diff --git a/synapse/config/user_directory.py b/synapse/config/user_directory.py
new file mode 100644
index 0000000000..38e8947843
--- /dev/null
+++ b/synapse/config/user_directory.py
@@ -0,0 +1,44 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import Config
+
+
+class UserDirectoryConfig(Config):
+    """User Directory Configuration
+    Configuration for the behaviour of the /user_directory API
+    """
+
+    def read_config(self, config):
+        self.user_directory_search_all_users = False
+        user_directory_config = config.get("user_directory", None)
+        if user_directory_config:
+            self.user_directory_search_all_users = (
+                user_directory_config.get("search_all_users", False)
+            )
+
+    def default_config(self, config_dir_path, server_name, **kwargs):
+        return """
+        # User Directory configuration
+        #
+        # 'search_all_users' defines whether to search all users visible to your HS
+        # when searching the user directory, rather than limiting to users visible
+        # in public rooms.  Defaults to false.  If you set it True, you'll have to run
+        # UPDATE user_directory_stream_pos SET stream_id = NULL;
+        # on your database to tell it to rebuild the user_directory search indexes.
+        #
+        #user_directory:
+        #   search_all_users: false
+        """
diff --git a/synapse/crypto/event_signing.py b/synapse/crypto/event_signing.py
index 0d0e7b5286..aaa3efaca3 100644
--- a/synapse/crypto/event_signing.py
+++ b/synapse/crypto/event_signing.py
@@ -32,15 +32,22 @@ def check_event_content_hash(event, hash_algorithm=hashlib.sha256):
     """Check whether the hash for this PDU matches the contents"""
     name, expected_hash = compute_content_hash(event, hash_algorithm)
     logger.debug("Expecting hash: %s", encode_base64(expected_hash))
-    if name not in event.hashes:
+
+    # some malformed events lack a 'hashes'. Protect against it being missing
+    # or a weird type by basically treating it the same as an unhashed event.
+    hashes = event.get("hashes")
+    if not isinstance(hashes, dict):
+        raise SynapseError(400, "Malformed 'hashes'", Codes.UNAUTHORIZED)
+
+    if name not in hashes:
         raise SynapseError(
             400,
             "Algorithm %s not in hashes %s" % (
-                name, list(event.hashes),
+                name, list(hashes),
             ),
             Codes.UNAUTHORIZED,
         )
-    message_hash_base64 = event.hashes[name]
+    message_hash_base64 = hashes[name]
     try:
         message_hash_bytes = decode_base64(message_hash_base64)
     except Exception:
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 7c5e5d957f..b8f02f5391 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -25,7 +25,7 @@ from synapse.api.errors import (
 from synapse.util import unwrapFirstError, logcontext
 from synapse.util.caches.expiringcache import ExpiringCache
 from synapse.util.logutils import log_function
-from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
+from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
 from synapse.events import FrozenEvent, builder
 import synapse.metrics
 
@@ -420,7 +420,7 @@ class FederationClient(FederationBase):
                 for e_id in batch
             ]
 
-            res = yield preserve_context_over_deferred(
+            res = yield make_deferred_yieldable(
                 defer.DeferredList(deferreds, consumeErrors=True)
             )
             for success, result in res:
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 7a3c9cbb70..3e7809b04f 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -20,7 +20,7 @@ from .persistence import TransactionActions
 from .units import Transaction, Edu
 
 from synapse.api.errors import HttpResponseException
-from synapse.util import logcontext
+from synapse.util import logcontext, PreserveLoggingContext
 from synapse.util.async import run_on_reactor
 from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
 from synapse.util.metrics import measure_func
@@ -146,7 +146,6 @@ class TransactionQueue(object):
         else:
             return not destination.startswith("localhost")
 
-    @defer.inlineCallbacks
     def notify_new_events(self, current_id):
         """This gets called when we have some new events we might want to
         send out to other servers.
@@ -156,6 +155,13 @@ class TransactionQueue(object):
         if self._is_processing:
             return
 
+        # fire off a processing loop in the background. It's likely it will
+        # outlast the current request, so run it in the sentinel logcontext.
+        with PreserveLoggingContext():
+            self._process_event_queue_loop()
+
+    @defer.inlineCallbacks
+    def _process_event_queue_loop(self):
         try:
             self._is_processing = True
             while True:
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 543bf28aec..feca3e4c10 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -17,7 +17,7 @@ from twisted.internet import defer
 
 from synapse.api.constants import EventTypes
 from synapse.util.metrics import Measure
-from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
+from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
 
 import logging
 
@@ -159,7 +159,7 @@ class ApplicationServicesHandler(object):
     def query_3pe(self, kind, protocol, fields):
         services = yield self._get_services_for_3pn(protocol)
 
-        results = yield preserve_context_over_deferred(defer.DeferredList([
+        results = yield make_deferred_yieldable(defer.DeferredList([
             preserve_fn(self.appservice_api.query_3pe)(service, kind, protocol, fields)
             for service in services
         ], consumeErrors=True))
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 080eb14271..573c9db8a1 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -17,7 +17,10 @@ from twisted.internet import defer
 
 from ._base import BaseHandler
 from synapse.api.constants import LoginType
-from synapse.api.errors import AuthError, LoginError, Codes, StoreError, SynapseError
+from synapse.api.errors import (
+    AuthError, Codes, InteractiveAuthIncompleteError, LoginError, StoreError,
+    SynapseError,
+)
 from synapse.module_api import ModuleApi
 from synapse.types import UserID
 from synapse.util.async import run_on_reactor
@@ -46,7 +49,6 @@ class AuthHandler(BaseHandler):
         """
         super(AuthHandler, self).__init__(hs)
         self.checkers = {
-            LoginType.PASSWORD: self._check_password_auth,
             LoginType.RECAPTCHA: self._check_recaptcha,
             LoginType.EMAIL_IDENTITY: self._check_email_identity,
             LoginType.MSISDN: self._check_msisdn,
@@ -75,15 +77,76 @@ class AuthHandler(BaseHandler):
         self.macaroon_gen = hs.get_macaroon_generator()
         self._password_enabled = hs.config.password_enabled
 
-        login_types = set()
+        # we keep this as a list despite the O(N^2) implication so that we can
+        # keep PASSWORD first and avoid confusing clients which pick the first
+        # type in the list. (NB that the spec doesn't require us to do so and
+        # clients which favour types that they don't understand over those that
+        # they do are technically broken)
+        login_types = []
         if self._password_enabled:
-            login_types.add(LoginType.PASSWORD)
+            login_types.append(LoginType.PASSWORD)
         for provider in self.password_providers:
             if hasattr(provider, "get_supported_login_types"):
-                login_types.update(
-                    provider.get_supported_login_types().keys()
-                )
-        self._supported_login_types = frozenset(login_types)
+                for t in provider.get_supported_login_types().keys():
+                    if t not in login_types:
+                        login_types.append(t)
+        self._supported_login_types = login_types
+
+    @defer.inlineCallbacks
+    def validate_user_via_ui_auth(self, requester, request_body, clientip):
+        """
+        Checks that the user is who they claim to be, via a UI auth.
+
+        This is used for things like device deletion and password reset where
+        the user already has a valid access token, but we want to double-check
+        that it isn't stolen by re-authenticating them.
+
+        Args:
+            requester (Requester): The user, as given by the access token
+
+            request_body (dict): The body of the request sent by the client
+
+            clientip (str): The IP address of the client.
+
+        Returns:
+            defer.Deferred[dict]: the parameters for this request (which may
+                have been given only in a previous call).
+
+        Raises:
+            InteractiveAuthIncompleteError if the client has not yet completed
+                any of the permitted login flows
+
+            AuthError if the client has completed a login flow, and it gives
+                a different user to `requester`
+        """
+
+        # build a list of supported flows
+        flows = [
+            [login_type] for login_type in self._supported_login_types
+        ]
+
+        result, params, _ = yield self.check_auth(
+            flows, request_body, clientip,
+        )
+
+        # find the completed login type
+        for login_type in self._supported_login_types:
+            if login_type not in result:
+                continue
+
+            user_id = result[login_type]
+            break
+        else:
+            # this can't happen
+            raise Exception(
+                "check_auth returned True but no successful login type",
+            )
+
+        # check that the UI auth matched the access token
+        if user_id != requester.user.to_string():
+            raise AuthError(403, "Invalid auth")
+
+        defer.returnValue(params)
 
     @defer.inlineCallbacks
     def check_auth(self, flows, clientdict, clientip):
@@ -95,26 +158,36 @@ class AuthHandler(BaseHandler):
         session with a map, which maps each auth-type (str) to the relevant
         identity authenticated by that auth-type (mostly str, but for captcha, bool).
 
+        If no auth flows have been completed successfully, raises an
+        InteractiveAuthIncompleteError. To handle this, you can use
+        synapse.rest.client.v2_alpha._base.interactive_auth_handler as a
+        decorator.
+
         Args:
             flows (list): A list of login flows. Each flow is an ordered list of
                           strings representing auth-types. At least one full
                           flow must be completed in order for auth to be successful.
+
             clientdict: The dictionary from the client root level, not the
                         'auth' key: this method prompts for auth if none is sent.
+
             clientip (str): The IP address of the client.
+
         Returns:
-            A tuple of (authed, dict, dict, session_id) where authed is true if
-            the client has successfully completed an auth flow. If it is true
-            the first dict contains the authenticated credentials of each stage.
+            defer.Deferred[dict, dict, str]: a deferred tuple of
+                (creds, params, session_id).
 
-            If authed is false, the first dictionary is the server response to
-            the login request and should be passed back to the client.
+                'creds' contains the authenticated credentials of each stage.
 
-            In either case, the second dict contains the parameters for this
-            request (which may have been given only in a previous call).
+                'params' contains the parameters for this request (which may
+                have been given only in a previous call).
 
-            session_id is the ID of this session, either passed in by the client
-            or assigned by the call to check_auth
+                'session_id' is the ID of this session, either passed in by the
+                client or assigned by this call
+
+        Raises:
+            InteractiveAuthIncompleteError if the client has not yet completed
+                all the stages in any of the permitted flows.
         """
 
         authdict = None
@@ -142,11 +215,8 @@ class AuthHandler(BaseHandler):
             clientdict = session['clientdict']
 
         if not authdict:
-            defer.returnValue(
-                (
-                    False, self._auth_dict_for_flows(flows, session),
-                    clientdict, session['id']
-                )
+            raise InteractiveAuthIncompleteError(
+                self._auth_dict_for_flows(flows, session),
             )
 
         if 'creds' not in session:
@@ -157,14 +227,12 @@ class AuthHandler(BaseHandler):
         errordict = {}
         if 'type' in authdict:
             login_type = authdict['type']
-            if login_type not in self.checkers:
-                raise LoginError(400, "", Codes.UNRECOGNIZED)
             try:
-                result = yield self.checkers[login_type](authdict, clientip)
+                result = yield self._check_auth_dict(authdict, clientip)
                 if result:
                     creds[login_type] = result
                     self._save_session(session)
-            except LoginError, e:
+            except LoginError as e:
                 if login_type == LoginType.EMAIL_IDENTITY:
                     # riot used to have a bug where it would request a new
                     # validation token (thus sending a new email) each time it
@@ -173,7 +241,7 @@ class AuthHandler(BaseHandler):
                     #
                     # Grandfather in the old behaviour for now to avoid
                     # breaking old riot deployments.
-                    raise e
+                    raise
 
                 # this step failed. Merge the error dict into the response
                 # so that the client can have another go.
@@ -190,12 +258,14 @@ class AuthHandler(BaseHandler):
                     "Auth completed with creds: %r. Client dict has keys: %r",
                     creds, clientdict.keys()
                 )
-                defer.returnValue((True, creds, clientdict, session['id']))
+                defer.returnValue((creds, clientdict, session['id']))
 
         ret = self._auth_dict_for_flows(flows, session)
         ret['completed'] = creds.keys()
         ret.update(errordict)
-        defer.returnValue((False, ret, clientdict, session['id']))
+        raise InteractiveAuthIncompleteError(
+            ret,
+        )
 
     @defer.inlineCallbacks
     def add_oob_auth(self, stagetype, authdict, clientip):
@@ -268,17 +338,35 @@ class AuthHandler(BaseHandler):
         return sess.setdefault('serverdict', {}).get(key, default)
 
     @defer.inlineCallbacks
-    def _check_password_auth(self, authdict, _):
-        if "user" not in authdict or "password" not in authdict:
-            raise LoginError(400, "", Codes.MISSING_PARAM)
+    def _check_auth_dict(self, authdict, clientip):
+        """Attempt to validate the auth dict provided by a client
 
-        user_id = authdict["user"]
-        password = authdict["password"]
+        Args:
+            authdict (object): auth dict provided by the client
+            clientip (str): IP address of the client
 
-        (canonical_id, callback) = yield self.validate_login(user_id, {
-            "type": LoginType.PASSWORD,
-            "password": password,
-        })
+        Returns:
+            Deferred: result of the stage verification.
+
+        Raises:
+            StoreError if there was a problem accessing the database
+            SynapseError if there was a problem with the request
+            LoginError if there was an authentication problem.
+        """
+        login_type = authdict['type']
+        checker = self.checkers.get(login_type)
+        if checker is not None:
+            res = yield checker(authdict, clientip)
+            defer.returnValue(res)
+
+        # build a v1-login-style dict out of the authdict and fall back to the
+        # v1 code
+        user_id = authdict.get("user")
+
+        if user_id is None:
+            raise SynapseError(400, "", Codes.MISSING_PARAM)
+
+        (canonical_id, callback) = yield self.validate_login(user_id, authdict)
         defer.returnValue(canonical_id)
 
     @defer.inlineCallbacks
@@ -650,41 +738,6 @@ class AuthHandler(BaseHandler):
             raise AuthError(403, "Invalid token", errcode=Codes.FORBIDDEN)
 
     @defer.inlineCallbacks
-    def set_password(self, user_id, newpassword, requester=None):
-        password_hash = self.hash(newpassword)
-
-        except_access_token_id = requester.access_token_id if requester else None
-
-        try:
-            yield self.store.user_set_password_hash(user_id, password_hash)
-        except StoreError as e:
-            if e.code == 404:
-                raise SynapseError(404, "Unknown user", Codes.NOT_FOUND)
-            raise e
-        yield self.delete_access_tokens_for_user(
-            user_id, except_token_id=except_access_token_id,
-        )
-        yield self.hs.get_pusherpool().remove_pushers_by_user(
-            user_id, except_access_token_id
-        )
-
-    @defer.inlineCallbacks
-    def deactivate_account(self, user_id):
-        """Deactivate a user's account
-
-        Args:
-            user_id (str): ID of user to be deactivated
-
-        Returns:
-            Deferred
-        """
-        # FIXME: Theoretically there is a race here wherein user resets
-        # password using threepid.
-        yield self.delete_access_tokens_for_user(user_id)
-        yield self.store.user_delete_threepids(user_id)
-        yield self.store.user_set_password_hash(user_id, None)
-
-    @defer.inlineCallbacks
     def delete_access_token(self, access_token):
         """Invalidate a single access token
 
@@ -706,6 +759,12 @@ class AuthHandler(BaseHandler):
                     access_token=access_token,
                 )
 
+        # delete pushers associated with this access token
+        if user_info["token_id"] is not None:
+            yield self.hs.get_pusherpool().remove_pushers_by_access_token(
+                str(user_info["user"]), (user_info["token_id"], )
+            )
+
     @defer.inlineCallbacks
     def delete_access_tokens_for_user(self, user_id, except_token_id=None,
                                       device_id=None):
@@ -728,13 +787,18 @@ class AuthHandler(BaseHandler):
         # see if any of our auth providers want to know about this
         for provider in self.password_providers:
             if hasattr(provider, "on_logged_out"):
-                for token, device_id in tokens_and_devices:
+                for token, token_id, device_id in tokens_and_devices:
                     yield provider.on_logged_out(
                         user_id=user_id,
                         device_id=device_id,
                         access_token=token,
                     )
 
+        # delete pushers associated with the access tokens
+        yield self.hs.get_pusherpool().remove_pushers_by_access_token(
+            user_id, (token_id for _, token_id, _ in tokens_and_devices),
+        )
+
     @defer.inlineCallbacks
     def add_threepid(self, user_id, medium, address, validated_at):
         # 'Canonicalise' email addresses down to lower case.
diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py
new file mode 100644
index 0000000000..b1d3814909
--- /dev/null
+++ b/synapse/handlers/deactivate_account.py
@@ -0,0 +1,52 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from twisted.internet import defer
+
+from ._base import BaseHandler
+
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+class DeactivateAccountHandler(BaseHandler):
+    """Handler which deals with deactivating user accounts."""
+    def __init__(self, hs):
+        super(DeactivateAccountHandler, self).__init__(hs)
+        self._auth_handler = hs.get_auth_handler()
+        self._device_handler = hs.get_device_handler()
+
+    @defer.inlineCallbacks
+    def deactivate_account(self, user_id):
+        """Deactivate a user's account
+
+        Args:
+            user_id (str): ID of user to be deactivated
+
+        Returns:
+            Deferred
+        """
+        # FIXME: Theoretically there is a race here wherein user resets
+        # password using threepid.
+
+        # first delete any devices belonging to the user, which will also
+        # delete corresponding access tokens.
+        yield self._device_handler.delete_all_devices_for_user(user_id)
+        # then delete any remaining access tokens which weren't associated with
+        # a device.
+        yield self._auth_handler.delete_access_tokens_for_user(user_id)
+
+        yield self.store.user_delete_threepids(user_id)
+        yield self.store.user_set_password_hash(user_id, None)
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 579d8477ba..2152efc692 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -171,12 +171,30 @@ class DeviceHandler(BaseHandler):
         yield self.notify_device_update(user_id, [device_id])
 
     @defer.inlineCallbacks
+    def delete_all_devices_for_user(self, user_id, except_device_id=None):
+        """Delete all of the user's devices
+
+        Args:
+            user_id (str):
+            except_device_id (str|None): optional device id which should not
+                be deleted
+
+        Returns:
+            defer.Deferred:
+        """
+        device_map = yield self.store.get_devices_by_user(user_id)
+        device_ids = device_map.keys()
+        if except_device_id is not None:
+            device_ids = [d for d in device_ids if d != except_device_id]
+        yield self.delete_devices(user_id, device_ids)
+
+    @defer.inlineCallbacks
     def delete_devices(self, user_id, device_ids):
         """ Delete several devices
 
         Args:
             user_id (str):
-            device_ids (str): The list of device IDs to delete
+            device_ids (List[str]): The list of device IDs to delete
 
         Returns:
             defer.Deferred:
diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py
index da00aeb0f4..7e5d3f148d 100644
--- a/synapse/handlers/groups_local.py
+++ b/synapse/handlers/groups_local.py
@@ -375,6 +375,12 @@ class GroupsLocalHandler(object):
     def get_publicised_groups_for_user(self, user_id):
         if self.hs.is_mine_id(user_id):
             result = yield self.store.get_publicised_groups_for_user(user_id)
+
+            # Check AS associated groups for this user - this depends on the
+            # RegExps in the AS registration file (under `users`)
+            for app_service in self.store.get_app_services():
+                result.extend(app_service.get_groups_for_user(user_id))
+
             defer.returnValue({"groups": result})
         else:
             result = yield self.transport_client.get_publicised_groups_for_user(
@@ -415,4 +421,9 @@ class GroupsLocalHandler(object):
                 uid
             )
 
+            # Check AS associated groups for this user - this depends on the
+            # RegExps in the AS registration file (under `users`)
+            for app_service in self.store.get_app_services():
+                results[uid].extend(app_service.get_groups_for_user(uid))
+
         defer.returnValue({"users": results})
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index 9718d4abc5..c5267b4b84 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -27,7 +27,7 @@ from synapse.types import (
 from synapse.util import unwrapFirstError
 from synapse.util.async import concurrently_execute
 from synapse.util.caches.snapshot_cache import SnapshotCache
-from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
+from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
 from synapse.visibility import filter_events_for_client
 
 from ._base import BaseHandler
@@ -163,7 +163,7 @@ class InitialSyncHandler(BaseHandler):
                         lambda states: states[event.event_id]
                     )
 
-                (messages, token), current_state = yield preserve_context_over_deferred(
+                (messages, token), current_state = yield make_deferred_yieldable(
                     defer.gatherResults(
                         [
                             preserve_fn(self.store.get_recent_events_for_room)(
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index fa96ea69cd..cb158ba962 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -1199,7 +1199,7 @@ def handle_timeout(state, is_mine, syncing_user_ids, now):
                 )
                 changed = True
     else:
-        # We expect to be poked occaisonally by the other side.
+        # We expect to be poked occasionally by the other side.
         # This is to protect against forgetful/buggy servers, so that
         # no one gets stuck online forever.
         if now - state.last_federation_update_ts > FEDERATION_TIMEOUT:
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 5e5b1952dd..9800e24453 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -36,6 +36,8 @@ class ProfileHandler(BaseHandler):
             "profile", self.on_profile_query
         )
 
+        self.user_directory_handler = hs.get_user_directory_handler()
+
         self.clock.looping_call(self._update_remote_profile_cache, self.PROFILE_UPDATE_MS)
 
     @defer.inlineCallbacks
@@ -139,6 +141,12 @@ class ProfileHandler(BaseHandler):
             target_user.localpart, new_displayname
         )
 
+        if self.hs.config.user_directory_search_all_users:
+            profile = yield self.store.get_profileinfo(target_user.localpart)
+            yield self.user_directory_handler.handle_local_profile_change(
+                target_user.to_string(), profile
+            )
+
         yield self._update_join_states(requester, target_user)
 
     @defer.inlineCallbacks
@@ -183,6 +191,12 @@ class ProfileHandler(BaseHandler):
             target_user.localpart, new_avatar_url
         )
 
+        if self.hs.config.user_directory_search_all_users:
+            profile = yield self.store.get_profileinfo(target_user.localpart)
+            yield self.user_directory_handler.handle_local_profile_change(
+                target_user.to_string(), profile
+            )
+
         yield self._update_join_states(requester, target_user)
 
     @defer.inlineCallbacks
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index f6e7e58563..4bc6ef51fe 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -38,6 +38,7 @@ class RegistrationHandler(BaseHandler):
         self.auth = hs.get_auth()
         self._auth_handler = hs.get_auth_handler()
         self.profile_handler = hs.get_profile_handler()
+        self.user_directory_handler = hs.get_user_directory_handler()
         self.captcha_client = CaptchaServerHttpClient(hs)
 
         self._next_generated_user_id = None
@@ -165,6 +166,13 @@ class RegistrationHandler(BaseHandler):
                 ),
                 admin=admin,
             )
+
+            if self.hs.config.user_directory_search_all_users:
+                profile = yield self.store.get_profileinfo(localpart)
+                yield self.user_directory_handler.handle_local_profile_change(
+                    user_id, profile
+                )
+
         else:
             # autogen a sequential user ID
             attempts = 0
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index 2cf34e51cb..bb40075387 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -154,6 +154,8 @@ class RoomListHandler(BaseHandler):
             # We want larger rooms to be first, hence negating num_joined_users
             rooms_to_order_value[room_id] = (-num_joined_users, room_id)
 
+        logger.info("Getting ordering for %i rooms since %s",
+                    len(room_ids), stream_token)
         yield concurrently_execute(get_order_for_room, room_ids, 10)
 
         sorted_entries = sorted(rooms_to_order_value.items(), key=lambda e: e[1])
@@ -181,34 +183,42 @@ class RoomListHandler(BaseHandler):
                 rooms_to_scan = rooms_to_scan[:since_token.current_limit]
                 rooms_to_scan.reverse()
 
-        # Actually generate the entries. _append_room_entry_to_chunk will append to
-        # chunk but will stop if len(chunk) > limit
-        chunk = []
-        if limit and not search_filter:
+        logger.info("After sorting and filtering, %i rooms remain",
+                    len(rooms_to_scan))
+
+        # _append_room_entry_to_chunk will append to chunk but will stop if
+        # len(chunk) > limit
+        #
+        # Normally we will generate enough results on the first iteration here,
+        #  but if there is a search filter, _append_room_entry_to_chunk may
+        # filter some results out, in which case we loop again.
+        #
+        # We don't want to scan over the entire range either as that
+        # would potentially waste a lot of work.
+        #
+        # XXX if there is no limit, we may end up DoSing the server with
+        # calls to get_current_state_ids for every single room on the
+        # server. Surely we should cap this somehow?
+        #
+        if limit:
             step = limit + 1
-            for i in xrange(0, len(rooms_to_scan), step):
-                # We iterate here because the vast majority of cases we'll stop
-                # at first iteration, but occaisonally _append_room_entry_to_chunk
-                # won't append to the chunk and so we need to loop again.
-                # We don't want to scan over the entire range either as that
-                # would potentially waste a lot of work.
-                yield concurrently_execute(
-                    lambda r: self._append_room_entry_to_chunk(
-                        r, rooms_to_num_joined[r],
-                        chunk, limit, search_filter
-                    ),
-                    rooms_to_scan[i:i + step], 10
-                )
-                if len(chunk) >= limit + 1:
-                    break
         else:
+            step = len(rooms_to_scan)
+
+        chunk = []
+        for i in xrange(0, len(rooms_to_scan), step):
+            batch = rooms_to_scan[i:i + step]
+            logger.info("Processing %i rooms for result", len(batch))
             yield concurrently_execute(
                 lambda r: self._append_room_entry_to_chunk(
                     r, rooms_to_num_joined[r],
                     chunk, limit, search_filter
                 ),
-                rooms_to_scan, 5
+                batch, 5,
             )
+            logger.info("Now %i rooms in result", len(chunk))
+            if len(chunk) >= limit + 1:
+                break
 
         chunk.sort(key=lambda e: (-e["num_joined_members"], e["room_id"]))
 
diff --git a/synapse/handlers/set_password.py b/synapse/handlers/set_password.py
new file mode 100644
index 0000000000..44414e1dc1
--- /dev/null
+++ b/synapse/handlers/set_password.py
@@ -0,0 +1,56 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+
+from twisted.internet import defer
+
+from synapse.api.errors import Codes, StoreError, SynapseError
+from ._base import BaseHandler
+
+logger = logging.getLogger(__name__)
+
+
+class SetPasswordHandler(BaseHandler):
+    """Handler which deals with changing user account passwords"""
+    def __init__(self, hs):
+        super(SetPasswordHandler, self).__init__(hs)
+        self._auth_handler = hs.get_auth_handler()
+        self._device_handler = hs.get_device_handler()
+
+    @defer.inlineCallbacks
+    def set_password(self, user_id, newpassword, requester=None):
+        password_hash = self._auth_handler.hash(newpassword)
+
+        except_device_id = requester.device_id if requester else None
+        except_access_token_id = requester.access_token_id if requester else None
+
+        try:
+            yield self.store.user_set_password_hash(user_id, password_hash)
+        except StoreError as e:
+            if e.code == 404:
+                raise SynapseError(404, "Unknown user", Codes.NOT_FOUND)
+            raise e
+
+        # we want to log out all of the user's other sessions. First delete
+        # all his other devices.
+        yield self._device_handler.delete_all_devices_for_user(
+            user_id, except_device_id=except_device_id,
+        )
+
+        # and now delete any access tokens which weren't associated with
+        # devices (or were associated with this device).
+        yield self._auth_handler.delete_access_tokens_for_user(
+            user_id, except_token_id=except_access_token_id,
+        )
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index b5be5d9623..714f0195c8 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -20,12 +20,13 @@ from synapse.api.constants import EventTypes, JoinRules, Membership
 from synapse.storage.roommember import ProfileInfo
 from synapse.util.metrics import Measure
 from synapse.util.async import sleep
+from synapse.types import get_localpart_from_id
 
 
 logger = logging.getLogger(__name__)
 
 
-class UserDirectoyHandler(object):
+class UserDirectoryHandler(object):
     """Handles querying of and keeping updated the user_directory.
 
     N.B.: ASSUMES IT IS THE ONLY THING THAT MODIFIES THE USER DIRECTORY
@@ -41,9 +42,10 @@ class UserDirectoyHandler(object):
     one public room.
     """
 
-    INITIAL_SLEEP_MS = 50
-    INITIAL_SLEEP_COUNT = 100
-    INITIAL_BATCH_SIZE = 100
+    INITIAL_ROOM_SLEEP_MS = 50
+    INITIAL_ROOM_SLEEP_COUNT = 100
+    INITIAL_ROOM_BATCH_SIZE = 100
+    INITIAL_USER_SLEEP_MS = 10
 
     def __init__(self, hs):
         self.store = hs.get_datastore()
@@ -53,6 +55,7 @@ class UserDirectoyHandler(object):
         self.notifier = hs.get_notifier()
         self.is_mine_id = hs.is_mine_id
         self.update_user_directory = hs.config.update_user_directory
+        self.search_all_users = hs.config.user_directory_search_all_users
 
         # When start up for the first time we need to populate the user_directory.
         # This is a set of user_id's we've inserted already
@@ -111,6 +114,15 @@ class UserDirectoyHandler(object):
             self._is_processing = False
 
     @defer.inlineCallbacks
+    def handle_local_profile_change(self, user_id, profile):
+        """Called to update index of our local user profiles when they change
+        irrespective of any rooms the user may be in.
+        """
+        yield self.store.update_profile_in_user_dir(
+            user_id, profile.display_name, profile.avatar_url, None,
+        )
+
+    @defer.inlineCallbacks
     def _unsafe_process(self):
         # If self.pos is None then means we haven't fetched it from DB
         if self.pos is None:
@@ -148,16 +160,30 @@ class UserDirectoyHandler(object):
         room_ids = yield self.store.get_all_rooms()
 
         logger.info("Doing initial update of user directory. %d rooms", len(room_ids))
-        num_processed_rooms = 1
+        num_processed_rooms = 0
 
         for room_id in room_ids:
-            logger.info("Handling room %d/%d", num_processed_rooms, len(room_ids))
+            logger.info("Handling room %d/%d", num_processed_rooms + 1, len(room_ids))
             yield self._handle_initial_room(room_id)
             num_processed_rooms += 1
-            yield sleep(self.INITIAL_SLEEP_MS / 1000.)
+            yield sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
 
         logger.info("Processed all rooms.")
 
+        if self.search_all_users:
+            num_processed_users = 0
+            user_ids = yield self.store.get_all_local_users()
+            logger.info("Doing initial update of user directory. %d users", len(user_ids))
+            for user_id in user_ids:
+                # We add profiles for all users even if they don't match the
+                # include pattern, just in case we want to change it in future
+                logger.info("Handling user %d/%d", num_processed_users + 1, len(user_ids))
+                yield self._handle_local_user(user_id)
+                num_processed_users += 1
+                yield sleep(self.INITIAL_USER_SLEEP_MS / 1000.)
+
+            logger.info("Processed all users")
+
         self.initially_handled_users = None
         self.initially_handled_users_in_public = None
         self.initially_handled_users_share = None
@@ -201,8 +227,8 @@ class UserDirectoyHandler(object):
         to_update = set()
         count = 0
         for user_id in user_ids:
-            if count % self.INITIAL_SLEEP_COUNT == 0:
-                yield sleep(self.INITIAL_SLEEP_MS / 1000.)
+            if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
+                yield sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
 
             if not self.is_mine_id(user_id):
                 count += 1
@@ -216,8 +242,8 @@ class UserDirectoyHandler(object):
                 if user_id == other_user_id:
                     continue
 
-                if count % self.INITIAL_SLEEP_COUNT == 0:
-                    yield sleep(self.INITIAL_SLEEP_MS / 1000.)
+                if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
+                    yield sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
                 count += 1
 
                 user_set = (user_id, other_user_id)
@@ -237,13 +263,13 @@ class UserDirectoyHandler(object):
                 else:
                     self.initially_handled_users_share_private_room.add(user_set)
 
-                if len(to_insert) > self.INITIAL_BATCH_SIZE:
+                if len(to_insert) > self.INITIAL_ROOM_BATCH_SIZE:
                     yield self.store.add_users_who_share_room(
                         room_id, not is_public, to_insert,
                     )
                     to_insert.clear()
 
-                if len(to_update) > self.INITIAL_BATCH_SIZE:
+                if len(to_update) > self.INITIAL_ROOM_BATCH_SIZE:
                     yield self.store.update_users_who_share_room(
                         room_id, not is_public, to_update,
                     )
@@ -385,14 +411,28 @@ class UserDirectoyHandler(object):
                 yield self._handle_remove_user(room_id, user_id)
 
     @defer.inlineCallbacks
+    def _handle_local_user(self, user_id):
+        """Adds a new local roomless user into the user_directory_search table.
+        Used to populate up the user index when we have an
+        user_directory_search_all_users specified.
+        """
+        logger.debug("Adding new local user to dir, %r", user_id)
+
+        profile = yield self.store.get_profileinfo(get_localpart_from_id(user_id))
+
+        row = yield self.store.get_user_in_directory(user_id)
+        if not row:
+            yield self.store.add_profiles_to_user_dir(None, {user_id: profile})
+
+    @defer.inlineCallbacks
     def _handle_new_user(self, room_id, user_id, profile):
         """Called when we might need to add user to directory
 
         Args:
-            room_id (str): room_id that user joined or started being public that
+            room_id (str): room_id that user joined or started being public
             user_id (str)
         """
-        logger.debug("Adding user to dir, %r", user_id)
+        logger.debug("Adding new user to dir, %r", user_id)
 
         row = yield self.store.get_user_in_directory(user_id)
         if not row:
@@ -407,7 +447,7 @@ class UserDirectoyHandler(object):
             if not row:
                 yield self.store.add_users_to_public_room(room_id, [user_id])
         else:
-            logger.debug("Not adding user to public dir, %r", user_id)
+            logger.debug("Not adding new user to public dir, %r", user_id)
 
         # Now we update users who share rooms with users. We do this by getting
         # all the current users in the room and seeing which aren't already
diff --git a/synapse/http/endpoint.py b/synapse/http/endpoint.py
index a97532162f..e2b99ef3bd 100644
--- a/synapse/http/endpoint.py
+++ b/synapse/http/endpoint.py
@@ -362,8 +362,10 @@ def _get_hosts_for_srv_record(dns_client, host):
         return res
 
     # no logcontexts here, so we can safely fire these off and gatherResults
-    d1 = dns_client.lookupAddress(host).addCallbacks(cb, eb)
-    d2 = dns_client.lookupIPV6Address(host).addCallbacks(cb, eb)
+    d1 = dns_client.lookupAddress(host).addCallbacks(
+        cb, eb, errbackArgs=("A", ))
+    d2 = dns_client.lookupIPV6Address(host).addCallbacks(
+        cb, eb, errbackArgs=("AAAA", ))
     results = yield defer.DeferredList(
         [d1, d2], consumeErrors=True)
 
diff --git a/synapse/http/server.py b/synapse/http/server.py
index 3ca1c9947c..25466cd292 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -28,6 +28,7 @@ from canonicaljson import (
 )
 
 from twisted.internet import defer
+from twisted.python import failure
 from twisted.web import server, resource
 from twisted.web.server import NOT_DONE_YET
 from twisted.web.util import redirectTo
@@ -131,12 +132,17 @@ def wrap_request_handler(request_handler, include_metrics=False):
                             version_string=self.version_string,
                         )
                     except Exception:
-                        logger.exception(
-                            "Failed handle request %s.%s on %r: %r",
+                        # failure.Failure() fishes the original Failure out
+                        # of our stack, and thus gives us a sensible stack
+                        # trace.
+                        f = failure.Failure()
+                        logger.error(
+                            "Failed handle request %s.%s on %r: %r: %s",
                             request_handler.__module__,
                             request_handler.__name__,
                             self,
-                            request
+                            request,
+                            f.getTraceback().rstrip(),
                         )
                         respond_with_json(
                             request,
diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py
index dc680ddf43..097c844d31 100644
--- a/synapse/module_api/__init__.py
+++ b/synapse/module_api/__init__.py
@@ -12,6 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+from twisted.internet import defer
 
 from synapse.types import UserID
 
@@ -81,6 +82,7 @@ class ModuleApi(object):
         reg = self.hs.get_handlers().registration_handler
         return reg.register(localpart=localpart)
 
+    @defer.inlineCallbacks
     def invalidate_access_token(self, access_token):
         """Invalidate an access token for a user
 
@@ -94,8 +96,16 @@ class ModuleApi(object):
         Raises:
             synapse.api.errors.AuthError: the access token is invalid
         """
-
-        return self._auth_handler.delete_access_token(access_token)
+        # see if the access token corresponds to a device
+        user_info = yield self._auth.get_user_by_access_token(access_token)
+        device_id = user_info.get("device_id")
+        user_id = user_info["user"].to_string()
+        if device_id:
+            # delete the device, which will also delete its access tokens
+            yield self.hs.get_device_handler().delete_device(user_id, device_id)
+        else:
+            # no associated device. Just delete the access token.
+            yield self._auth_handler.delete_access_token(access_token)
 
     def run_db_interaction(self, desc, func, *args, **kwargs):
         """Run a function with a database connection
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 626da778cd..ef042681bc 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -255,9 +255,7 @@ class Notifier(object):
         )
 
         if self.federation_sender:
-            preserve_fn(self.federation_sender.notify_new_events)(
-                room_stream_id
-            )
+            self.federation_sender.notify_new_events(room_stream_id)
 
         if event.type == EventTypes.Member and event.membership == Membership.JOIN:
             self._user_joined_room(event.state_key, event.room_id)
@@ -297,8 +295,7 @@ class Notifier(object):
     def on_new_replication_data(self):
         """Used to inform replication listeners that something has happend
         without waking up any of the normal user event streams"""
-        with PreserveLoggingContext():
-            self.notify_replication()
+        self.notify_replication()
 
     @defer.inlineCallbacks
     def wait_for_events(self, user_id, timeout, callback, room_ids=None,
@@ -516,8 +513,14 @@ class Notifier(object):
             self.replication_deferred = ObservableDeferred(defer.Deferred())
             deferred.callback(None)
 
-        for cb in self.replication_callbacks:
-            preserve_fn(cb)()
+            # the callbacks may well outlast the current request, so we run
+            # them in the sentinel logcontext.
+            #
+            # (ideally it would be up to the callbacks to know if they were
+            # starting off background processes and drop the logcontext
+            # accordingly, but that requires more changes)
+            for cb in self.replication_callbacks:
+                cb()
 
     @defer.inlineCallbacks
     def wait_for_replication(self, callback, timeout):
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index 74c0bc462c..c16f61452c 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2017 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -295,7 +296,7 @@ class HttpPusher(object):
         if event.type == 'm.room.member':
             d['notification']['membership'] = event.content['membership']
             d['notification']['user_is_target'] = event.state_key == self.user_id
-        if not self.hs.config.push_redact_content and 'content' in event:
+        if self.hs.config.push_include_content and 'content' in event:
             d['notification']['content'] = event.content
 
         # We no longer send aliases separately, instead, we send the human
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 7c069b662e..134e89b371 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -17,7 +17,7 @@
 from twisted.internet import defer
 
 from .pusher import PusherFactory
-from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
+from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
 from synapse.util.async import run_on_reactor
 
 import logging
@@ -103,19 +103,25 @@ class PusherPool:
                 yield self.remove_pusher(p['app_id'], p['pushkey'], p['user_name'])
 
     @defer.inlineCallbacks
-    def remove_pushers_by_user(self, user_id, except_access_token_id=None):
-        all = yield self.store.get_all_pushers()
-        logger.info(
-            "Removing all pushers for user %s except access tokens id %r",
-            user_id, except_access_token_id
-        )
-        for p in all:
-            if p['user_name'] == user_id and p['access_token'] != except_access_token_id:
+    def remove_pushers_by_access_token(self, user_id, access_tokens):
+        """Remove the pushers for a given user corresponding to a set of
+        access_tokens.
+
+        Args:
+            user_id (str): user to remove pushers for
+            access_tokens (Iterable[int]): access token *ids* to remove pushers
+                for
+        """
+        tokens = set(access_tokens)
+        for p in (yield self.store.get_pushers_by_user_id(user_id)):
+            if p['access_token'] in tokens:
                 logger.info(
                     "Removing pusher for app id %s, pushkey %s, user %s",
                     p['app_id'], p['pushkey'], p['user_name']
                 )
-                yield self.remove_pusher(p['app_id'], p['pushkey'], p['user_name'])
+                yield self.remove_pusher(
+                    p['app_id'], p['pushkey'], p['user_name'],
+                )
 
     @defer.inlineCallbacks
     def on_new_notifications(self, min_stream_id, max_stream_id):
@@ -136,7 +142,7 @@ class PusherPool:
                             )
                         )
 
-            yield preserve_context_over_deferred(defer.gatherResults(deferreds))
+            yield make_deferred_yieldable(defer.gatherResults(deferreds))
         except Exception:
             logger.exception("Exception in pusher on_new_notifications")
 
@@ -161,7 +167,7 @@ class PusherPool:
                             preserve_fn(p.on_new_receipts)(min_stream_id, max_stream_id)
                         )
 
-            yield preserve_context_over_deferred(defer.gatherResults(deferreds))
+            yield make_deferred_yieldable(defer.gatherResults(deferreds))
         except Exception:
             logger.exception("Exception in pusher on_new_receipts")
 
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index 94ebbffc1b..29d7296b43 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -12,20 +12,18 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-from ._base import BaseSlavedStore
-from ._slaved_id_tracker import SlavedIdTracker
+import logging
 
 from synapse.api.constants import EventTypes
 from synapse.storage import DataStore
-from synapse.storage.roommember import RoomMemberStore
 from synapse.storage.event_federation import EventFederationStore
 from synapse.storage.event_push_actions import EventPushActionsStore
-from synapse.storage.state import StateStore
+from synapse.storage.roommember import RoomMemberStore
+from synapse.storage.state import StateGroupReadStore
 from synapse.storage.stream import StreamStore
 from synapse.util.caches.stream_change_cache import StreamChangeCache
-
-import logging
-
+from ._base import BaseSlavedStore
+from ._slaved_id_tracker import SlavedIdTracker
 
 logger = logging.getLogger(__name__)
 
@@ -39,7 +37,7 @@ logger = logging.getLogger(__name__)
 # the method descriptor on the DataStore and chuck them into our class.
 
 
-class SlavedEventStore(BaseSlavedStore):
+class SlavedEventStore(StateGroupReadStore, BaseSlavedStore):
 
     def __init__(self, db_conn, hs):
         super(SlavedEventStore, self).__init__(db_conn, hs)
@@ -90,25 +88,9 @@ class SlavedEventStore(BaseSlavedStore):
     _get_unread_counts_by_pos_txn = (
         DataStore._get_unread_counts_by_pos_txn.__func__
     )
-    _get_state_group_for_events = (
-        StateStore.__dict__["_get_state_group_for_events"]
-    )
-    _get_state_group_for_event = (
-        StateStore.__dict__["_get_state_group_for_event"]
-    )
-    _get_state_groups_from_groups = (
-        StateStore.__dict__["_get_state_groups_from_groups"]
-    )
-    _get_state_groups_from_groups_txn = (
-        DataStore._get_state_groups_from_groups_txn.__func__
-    )
     get_recent_event_ids_for_room = (
         StreamStore.__dict__["get_recent_event_ids_for_room"]
     )
-    get_current_state_ids = (
-        StateStore.__dict__["get_current_state_ids"]
-    )
-    get_state_group_delta = StateStore.__dict__["get_state_group_delta"]
     _get_joined_hosts_cache = RoomMemberStore.__dict__["_get_joined_hosts_cache"]
     has_room_changed_since = DataStore.has_room_changed_since.__func__
 
@@ -134,12 +116,6 @@ class SlavedEventStore(BaseSlavedStore):
         DataStore.get_room_events_stream_for_room.__func__
     )
     get_events_around = DataStore.get_events_around.__func__
-    get_state_for_event = DataStore.get_state_for_event.__func__
-    get_state_for_events = DataStore.get_state_for_events.__func__
-    get_state_groups = DataStore.get_state_groups.__func__
-    get_state_groups_ids = DataStore.get_state_groups_ids.__func__
-    get_state_ids_for_event = DataStore.get_state_ids_for_event.__func__
-    get_state_ids_for_events = DataStore.get_state_ids_for_events.__func__
     get_joined_users_from_state = DataStore.get_joined_users_from_state.__func__
     get_joined_users_from_context = DataStore.get_joined_users_from_context.__func__
     _get_joined_users_from_context = (
@@ -169,10 +145,7 @@ class SlavedEventStore(BaseSlavedStore):
     _get_rooms_for_user_where_membership_is_txn = (
         DataStore._get_rooms_for_user_where_membership_is_txn.__func__
     )
-    _get_state_for_groups = DataStore._get_state_for_groups.__func__
-    _get_all_state_from_cache = DataStore._get_all_state_from_cache.__func__
     _get_events_around_txn = DataStore._get_events_around_txn.__func__
-    _get_some_state_from_cache = DataStore._get_some_state_from_cache.__func__
 
     get_backfill_events = DataStore.get_backfill_events.__func__
     _get_backfill_events = DataStore._get_backfill_events.__func__
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 1d03e79b85..786c3fe864 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -216,11 +216,12 @@ class ReplicationStreamer(object):
             self.federation_sender.federation_ack(token)
 
     @measure_func("repl.on_user_sync")
+    @defer.inlineCallbacks
     def on_user_sync(self, conn_id, user_id, is_syncing, last_sync_ms):
         """A client has started/stopped syncing on a worker.
         """
         user_sync_counter.inc()
-        self.presence_handler.update_external_syncs_row(
+        yield self.presence_handler.update_external_syncs_row(
             conn_id, user_id, is_syncing, last_sync_ms,
         )
 
@@ -244,11 +245,12 @@ class ReplicationStreamer(object):
         getattr(self.store, cache_func).invalidate(tuple(keys))
 
     @measure_func("repl.on_user_ip")
+    @defer.inlineCallbacks
     def on_user_ip(self, user_id, access_token, ip, user_agent, device_id, last_seen):
         """The client saw a user request
         """
         user_ip_cache_counter.inc()
-        self.store.insert_client_ip(
+        yield self.store.insert_client_ip(
             user_id, access_token, ip, user_agent, device_id, last_seen,
         )
 
diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py
index 1197158fdc..5022808ea9 100644
--- a/synapse/rest/client/v1/admin.py
+++ b/synapse/rest/client/v1/admin.py
@@ -137,8 +137,8 @@ class DeactivateAccountRestServlet(ClientV1RestServlet):
     PATTERNS = client_path_patterns("/admin/deactivate/(?P<target_user_id>[^/]*)")
 
     def __init__(self, hs):
-        self._auth_handler = hs.get_auth_handler()
         super(DeactivateAccountRestServlet, self).__init__(hs)
+        self._deactivate_account_handler = hs.get_deactivate_account_handler()
 
     @defer.inlineCallbacks
     def on_POST(self, request, target_user_id):
@@ -149,7 +149,7 @@ class DeactivateAccountRestServlet(ClientV1RestServlet):
         if not is_admin:
             raise AuthError(403, "You are not a server admin")
 
-        yield self._auth_handler.deactivate_account(target_user_id)
+        yield self._deactivate_account_handler.deactivate_account(target_user_id)
         defer.returnValue((200, {}))
 
 
@@ -309,7 +309,7 @@ class ResetPasswordRestServlet(ClientV1RestServlet):
         super(ResetPasswordRestServlet, self).__init__(hs)
         self.hs = hs
         self.auth = hs.get_auth()
-        self.auth_handler = hs.get_auth_handler()
+        self._set_password_handler = hs.get_set_password_handler()
 
     @defer.inlineCallbacks
     def on_POST(self, request, target_user_id):
@@ -330,7 +330,7 @@ class ResetPasswordRestServlet(ClientV1RestServlet):
 
         logger.info("new_password: %r", new_password)
 
-        yield self.auth_handler.set_password(
+        yield self._set_password_handler.set_password(
             target_user_id, new_password, requester
         )
         defer.returnValue((200, {}))
diff --git a/synapse/rest/client/v1/logout.py b/synapse/rest/client/v1/logout.py
index 6add754782..ca49955935 100644
--- a/synapse/rest/client/v1/logout.py
+++ b/synapse/rest/client/v1/logout.py
@@ -16,6 +16,7 @@
 from twisted.internet import defer
 
 from synapse.api.auth import get_access_token_from_request
+from synapse.api.errors import AuthError
 
 from .base import ClientV1RestServlet, client_path_patterns
 
@@ -30,15 +31,30 @@ class LogoutRestServlet(ClientV1RestServlet):
 
     def __init__(self, hs):
         super(LogoutRestServlet, self).__init__(hs)
+        self._auth = hs.get_auth()
         self._auth_handler = hs.get_auth_handler()
+        self._device_handler = hs.get_device_handler()
 
     def on_OPTIONS(self, request):
         return (200, {})
 
     @defer.inlineCallbacks
     def on_POST(self, request):
-        access_token = get_access_token_from_request(request)
-        yield self._auth_handler.delete_access_token(access_token)
+        try:
+            requester = yield self.auth.get_user_by_req(request)
+        except AuthError:
+            # this implies the access token has already been deleted.
+            pass
+        else:
+            if requester.device_id is None:
+                # the acccess token wasn't associated with a device.
+                # Just delete the access token
+                access_token = get_access_token_from_request(request)
+                yield self._auth_handler.delete_access_token(access_token)
+            else:
+                yield self._device_handler.delete_device(
+                    requester.user.to_string(), requester.device_id)
+
         defer.returnValue((200, {}))
 
 
@@ -49,6 +65,7 @@ class LogoutAllRestServlet(ClientV1RestServlet):
         super(LogoutAllRestServlet, self).__init__(hs)
         self.auth = hs.get_auth()
         self._auth_handler = hs.get_auth_handler()
+        self._device_handler = hs.get_device_handler()
 
     def on_OPTIONS(self, request):
         return (200, {})
@@ -57,6 +74,12 @@ class LogoutAllRestServlet(ClientV1RestServlet):
     def on_POST(self, request):
         requester = yield self.auth.get_user_by_req(request)
         user_id = requester.user.to_string()
+
+        # first delete all of the user's devices
+        yield self._device_handler.delete_all_devices_for_user(user_id)
+
+        # .. and then delete any access tokens which weren't associated with
+        # devices.
         yield self._auth_handler.delete_access_tokens_for_user(user_id)
         defer.returnValue((200, {}))
 
diff --git a/synapse/rest/client/v2_alpha/_base.py b/synapse/rest/client/v2_alpha/_base.py
index 1f5bc24cc3..77434937ff 100644
--- a/synapse/rest/client/v2_alpha/_base.py
+++ b/synapse/rest/client/v2_alpha/_base.py
@@ -15,12 +15,13 @@
 
 """This module contains base REST classes for constructing client v1 servlets.
 """
-
-from synapse.api.urls import CLIENT_V2_ALPHA_PREFIX
+import logging
 import re
 
-import logging
+from twisted.internet import defer
 
+from synapse.api.errors import InteractiveAuthIncompleteError
+from synapse.api.urls import CLIENT_V2_ALPHA_PREFIX
 
 logger = logging.getLogger(__name__)
 
@@ -57,3 +58,37 @@ def set_timeline_upper_limit(filter_json, filter_timeline_limit):
         filter_json['room']['timeline']["limit"] = min(
             filter_json['room']['timeline']['limit'],
             filter_timeline_limit)
+
+
+def interactive_auth_handler(orig):
+    """Wraps an on_POST method to handle InteractiveAuthIncompleteErrors
+
+    Takes a on_POST method which returns a deferred (errcode, body) response
+    and adds exception handling to turn a InteractiveAuthIncompleteError into
+    a 401 response.
+
+    Normal usage is:
+
+    @interactive_auth_handler
+    @defer.inlineCallbacks
+    def on_POST(self, request):
+        # ...
+        yield self.auth_handler.check_auth
+            """
+    def wrapped(*args, **kwargs):
+        res = defer.maybeDeferred(orig, *args, **kwargs)
+        res.addErrback(_catch_incomplete_interactive_auth)
+        return res
+    return wrapped
+
+
+def _catch_incomplete_interactive_auth(f):
+    """helper for interactive_auth_handler
+
+    Catches InteractiveAuthIncompleteErrors and turns them into 401 responses
+
+    Args:
+        f (failure.Failure):
+    """
+    f.trap(InteractiveAuthIncompleteError)
+    return 401, f.value.result
diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py
index 726e0a2826..385a3ad2ec 100644
--- a/synapse/rest/client/v2_alpha/account.py
+++ b/synapse/rest/client/v2_alpha/account.py
@@ -19,14 +19,14 @@ from twisted.internet import defer
 
 from synapse.api.auth import has_access_token
 from synapse.api.constants import LoginType
-from synapse.api.errors import Codes, LoginError, SynapseError
+from synapse.api.errors import Codes, SynapseError
 from synapse.http.servlet import (
     RestServlet, assert_params_in_request,
     parse_json_object_from_request,
 )
 from synapse.util.async import run_on_reactor
 from synapse.util.msisdn import phone_number_to_msisdn
-from ._base import client_v2_patterns
+from ._base import client_v2_patterns, interactive_auth_handler
 
 logger = logging.getLogger(__name__)
 
@@ -98,56 +98,61 @@ class PasswordRestServlet(RestServlet):
         self.auth = hs.get_auth()
         self.auth_handler = hs.get_auth_handler()
         self.datastore = self.hs.get_datastore()
+        self._set_password_handler = hs.get_set_password_handler()
 
+    @interactive_auth_handler
     @defer.inlineCallbacks
     def on_POST(self, request):
-        yield run_on_reactor()
-
         body = parse_json_object_from_request(request)
 
-        authed, result, params, _ = yield self.auth_handler.check_auth([
-            [LoginType.PASSWORD],
-            [LoginType.EMAIL_IDENTITY],
-            [LoginType.MSISDN],
-        ], body, self.hs.get_ip_from_request(request))
-
-        if not authed:
-            defer.returnValue((401, result))
+        # there are two possibilities here. Either the user does not have an
+        # access token, and needs to do a password reset; or they have one and
+        # need to validate their identity.
+        #
+        # In the first case, we offer a couple of means of identifying
+        # themselves (email and msisdn, though it's unclear if msisdn actually
+        # works).
+        #
+        # In the second case, we require a password to confirm their identity.
 
-        user_id = None
-        requester = None
-
-        if LoginType.PASSWORD in result:
-            # if using password, they should also be logged in
+        if has_access_token(request):
             requester = yield self.auth.get_user_by_req(request)
-            user_id = requester.user.to_string()
-            if user_id != result[LoginType.PASSWORD]:
-                raise LoginError(400, "", Codes.UNKNOWN)
-        elif LoginType.EMAIL_IDENTITY in result:
-            threepid = result[LoginType.EMAIL_IDENTITY]
-            if 'medium' not in threepid or 'address' not in threepid:
-                raise SynapseError(500, "Malformed threepid")
-            if threepid['medium'] == 'email':
-                # For emails, transform the address to lowercase.
-                # We store all email addreses as lowercase in the DB.
-                # (See add_threepid in synapse/handlers/auth.py)
-                threepid['address'] = threepid['address'].lower()
-            # if using email, we must know about the email they're authing with!
-            threepid_user_id = yield self.datastore.get_user_id_by_threepid(
-                threepid['medium'], threepid['address']
+            params = yield self.auth_handler.validate_user_via_ui_auth(
+                requester, body, self.hs.get_ip_from_request(request),
             )
-            if not threepid_user_id:
-                raise SynapseError(404, "Email address not found", Codes.NOT_FOUND)
-            user_id = threepid_user_id
+            user_id = requester.user.to_string()
         else:
-            logger.error("Auth succeeded but no known type!", result.keys())
-            raise SynapseError(500, "", Codes.UNKNOWN)
+            requester = None
+            result, params, _ = yield self.auth_handler.check_auth(
+                [[LoginType.EMAIL_IDENTITY], [LoginType.MSISDN]],
+                body, self.hs.get_ip_from_request(request),
+            )
+
+            if LoginType.EMAIL_IDENTITY in result:
+                threepid = result[LoginType.EMAIL_IDENTITY]
+                if 'medium' not in threepid or 'address' not in threepid:
+                    raise SynapseError(500, "Malformed threepid")
+                if threepid['medium'] == 'email':
+                    # For emails, transform the address to lowercase.
+                    # We store all email addreses as lowercase in the DB.
+                    # (See add_threepid in synapse/handlers/auth.py)
+                    threepid['address'] = threepid['address'].lower()
+                # if using email, we must know about the email they're authing with!
+                threepid_user_id = yield self.datastore.get_user_id_by_threepid(
+                    threepid['medium'], threepid['address']
+                )
+                if not threepid_user_id:
+                    raise SynapseError(404, "Email address not found", Codes.NOT_FOUND)
+                user_id = threepid_user_id
+            else:
+                logger.error("Auth succeeded but no known type!", result.keys())
+                raise SynapseError(500, "", Codes.UNKNOWN)
 
         if 'new_password' not in params:
             raise SynapseError(400, "", Codes.MISSING_PARAM)
         new_password = params['new_password']
 
-        yield self.auth_handler.set_password(
+        yield self._set_password_handler.set_password(
             user_id, new_password, requester
         )
 
@@ -161,52 +166,32 @@ class DeactivateAccountRestServlet(RestServlet):
     PATTERNS = client_v2_patterns("/account/deactivate$")
 
     def __init__(self, hs):
+        super(DeactivateAccountRestServlet, self).__init__()
         self.hs = hs
         self.auth = hs.get_auth()
         self.auth_handler = hs.get_auth_handler()
-        super(DeactivateAccountRestServlet, self).__init__()
+        self._deactivate_account_handler = hs.get_deactivate_account_handler()
 
+    @interactive_auth_handler
     @defer.inlineCallbacks
     def on_POST(self, request):
         body = parse_json_object_from_request(request)
 
-        # if the caller provides an access token, it ought to be valid.
-        requester = None
-        if has_access_token(request):
-            requester = yield self.auth.get_user_by_req(
-                request,
-            )  # type: synapse.types.Requester
+        requester = yield self.auth.get_user_by_req(request)
 
         # allow ASes to dectivate their own users
-        if requester and requester.app_service:
-            yield self.auth_handler.deactivate_account(
+        if requester.app_service:
+            yield self._deactivate_account_handler.deactivate_account(
                 requester.user.to_string()
             )
             defer.returnValue((200, {}))
 
-        authed, result, params, _ = yield self.auth_handler.check_auth([
-            [LoginType.PASSWORD],
-        ], body, self.hs.get_ip_from_request(request))
-
-        if not authed:
-            defer.returnValue((401, result))
-
-        if LoginType.PASSWORD in result:
-            user_id = result[LoginType.PASSWORD]
-            # if using password, they should also be logged in
-            if requester is None:
-                raise SynapseError(
-                    400,
-                    "Deactivate account requires an access_token",
-                    errcode=Codes.MISSING_TOKEN
-                )
-            if requester.user.to_string() != user_id:
-                raise LoginError(400, "", Codes.UNKNOWN)
-        else:
-            logger.error("Auth succeeded but no known type!", result.keys())
-            raise SynapseError(500, "", Codes.UNKNOWN)
-
-        yield self.auth_handler.deactivate_account(user_id)
+        yield self.auth_handler.validate_user_via_ui_auth(
+            requester, body, self.hs.get_ip_from_request(request),
+        )
+        yield self._deactivate_account_handler.deactivate_account(
+            requester.user.to_string(),
+        )
         defer.returnValue((200, {}))
 
 
diff --git a/synapse/rest/client/v2_alpha/devices.py b/synapse/rest/client/v2_alpha/devices.py
index 5321e5abbb..35d58b367a 100644
--- a/synapse/rest/client/v2_alpha/devices.py
+++ b/synapse/rest/client/v2_alpha/devices.py
@@ -17,9 +17,9 @@ import logging
 
 from twisted.internet import defer
 
-from synapse.api import constants, errors
+from synapse.api import errors
 from synapse.http import servlet
-from ._base import client_v2_patterns
+from ._base import client_v2_patterns, interactive_auth_handler
 
 logger = logging.getLogger(__name__)
 
@@ -60,8 +60,11 @@ class DeleteDevicesRestServlet(servlet.RestServlet):
         self.device_handler = hs.get_device_handler()
         self.auth_handler = hs.get_auth_handler()
 
+    @interactive_auth_handler
     @defer.inlineCallbacks
     def on_POST(self, request):
+        requester = yield self.auth.get_user_by_req(request)
+
         try:
             body = servlet.parse_json_object_from_request(request)
         except errors.SynapseError as e:
@@ -77,14 +80,10 @@ class DeleteDevicesRestServlet(servlet.RestServlet):
                 400, "No devices supplied", errcode=errors.Codes.MISSING_PARAM
             )
 
-        authed, result, params, _ = yield self.auth_handler.check_auth([
-            [constants.LoginType.PASSWORD],
-        ], body, self.hs.get_ip_from_request(request))
-
-        if not authed:
-            defer.returnValue((401, result))
+        yield self.auth_handler.validate_user_via_ui_auth(
+            requester, body, self.hs.get_ip_from_request(request),
+        )
 
-        requester = yield self.auth.get_user_by_req(request)
         yield self.device_handler.delete_devices(
             requester.user.to_string(),
             body['devices'],
@@ -115,6 +114,7 @@ class DeviceRestServlet(servlet.RestServlet):
         )
         defer.returnValue((200, device))
 
+    @interactive_auth_handler
     @defer.inlineCallbacks
     def on_DELETE(self, request, device_id):
         requester = yield self.auth.get_user_by_req(request)
@@ -130,19 +130,13 @@ class DeviceRestServlet(servlet.RestServlet):
             else:
                 raise
 
-        authed, result, params, _ = yield self.auth_handler.check_auth([
-            [constants.LoginType.PASSWORD],
-        ], body, self.hs.get_ip_from_request(request))
-
-        if not authed:
-            defer.returnValue((401, result))
-
-        # check that the UI auth matched the access token
-        user_id = result[constants.LoginType.PASSWORD]
-        if user_id != requester.user.to_string():
-            raise errors.AuthError(403, "Invalid auth")
+        yield self.auth_handler.validate_user_via_ui_auth(
+            requester, body, self.hs.get_ip_from_request(request),
+        )
 
-        yield self.device_handler.delete_device(user_id, device_id)
+        yield self.device_handler.delete_device(
+            requester.user.to_string(), device_id,
+        )
         defer.returnValue((200, {}))
 
     @defer.inlineCallbacks
diff --git a/synapse/rest/client/v2_alpha/groups.py b/synapse/rest/client/v2_alpha/groups.py
index 089ec71c81..f762dbfa9a 100644
--- a/synapse/rest/client/v2_alpha/groups.py
+++ b/synapse/rest/client/v2_alpha/groups.py
@@ -38,7 +38,7 @@ class GroupServlet(RestServlet):
 
     @defer.inlineCallbacks
     def on_GET(self, request, group_id):
-        requester = yield self.auth.get_user_by_req(request)
+        requester = yield self.auth.get_user_by_req(request, allow_guest=True)
         requester_user_id = requester.user.to_string()
 
         group_description = yield self.groups_handler.get_group_profile(
@@ -74,7 +74,7 @@ class GroupSummaryServlet(RestServlet):
 
     @defer.inlineCallbacks
     def on_GET(self, request, group_id):
-        requester = yield self.auth.get_user_by_req(request)
+        requester = yield self.auth.get_user_by_req(request, allow_guest=True)
         requester_user_id = requester.user.to_string()
 
         get_group_summary = yield self.groups_handler.get_group_summary(
@@ -148,7 +148,7 @@ class GroupCategoryServlet(RestServlet):
 
     @defer.inlineCallbacks
     def on_GET(self, request, group_id, category_id):
-        requester = yield self.auth.get_user_by_req(request)
+        requester = yield self.auth.get_user_by_req(request, allow_guest=True)
         requester_user_id = requester.user.to_string()
 
         category = yield self.groups_handler.get_group_category(
@@ -200,7 +200,7 @@ class GroupCategoriesServlet(RestServlet):
 
     @defer.inlineCallbacks
     def on_GET(self, request, group_id):
-        requester = yield self.auth.get_user_by_req(request)
+        requester = yield self.auth.get_user_by_req(request, allow_guest=True)
         requester_user_id = requester.user.to_string()
 
         category = yield self.groups_handler.get_group_categories(
@@ -225,7 +225,7 @@ class GroupRoleServlet(RestServlet):
 
     @defer.inlineCallbacks
     def on_GET(self, request, group_id, role_id):
-        requester = yield self.auth.get_user_by_req(request)
+        requester = yield self.auth.get_user_by_req(request, allow_guest=True)
         requester_user_id = requester.user.to_string()
 
         category = yield self.groups_handler.get_group_role(
@@ -277,7 +277,7 @@ class GroupRolesServlet(RestServlet):
 
     @defer.inlineCallbacks
     def on_GET(self, request, group_id):
-        requester = yield self.auth.get_user_by_req(request)
+        requester = yield self.auth.get_user_by_req(request, allow_guest=True)
         requester_user_id = requester.user.to_string()
 
         category = yield self.groups_handler.get_group_roles(
@@ -348,7 +348,7 @@ class GroupRoomServlet(RestServlet):
 
     @defer.inlineCallbacks
     def on_GET(self, request, group_id):
-        requester = yield self.auth.get_user_by_req(request)
+        requester = yield self.auth.get_user_by_req(request, allow_guest=True)
         requester_user_id = requester.user.to_string()
 
         result = yield self.groups_handler.get_rooms_in_group(group_id, requester_user_id)
@@ -369,7 +369,7 @@ class GroupUsersServlet(RestServlet):
 
     @defer.inlineCallbacks
     def on_GET(self, request, group_id):
-        requester = yield self.auth.get_user_by_req(request)
+        requester = yield self.auth.get_user_by_req(request, allow_guest=True)
         requester_user_id = requester.user.to_string()
 
         result = yield self.groups_handler.get_users_in_group(group_id, requester_user_id)
@@ -672,7 +672,7 @@ class PublicisedGroupsForUserServlet(RestServlet):
 
     @defer.inlineCallbacks
     def on_GET(self, request, user_id):
-        yield self.auth.get_user_by_req(request)
+        yield self.auth.get_user_by_req(request, allow_guest=True)
 
         result = yield self.groups_handler.get_publicised_groups_for_user(
             user_id
@@ -697,7 +697,7 @@ class PublicisedGroupsForUsersServlet(RestServlet):
 
     @defer.inlineCallbacks
     def on_POST(self, request):
-        yield self.auth.get_user_by_req(request)
+        yield self.auth.get_user_by_req(request, allow_guest=True)
 
         content = parse_json_object_from_request(request)
         user_ids = content["user_ids"]
@@ -724,7 +724,7 @@ class GroupsForUserServlet(RestServlet):
 
     @defer.inlineCallbacks
     def on_GET(self, request):
-        requester = yield self.auth.get_user_by_req(request)
+        requester = yield self.auth.get_user_by_req(request, allow_guest=True)
         requester_user_id = requester.user.to_string()
 
         result = yield self.groups_handler.get_joined_groups(requester_user_id)
diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py
index 9e2f7308ce..e9d88a8895 100644
--- a/synapse/rest/client/v2_alpha/register.py
+++ b/synapse/rest/client/v2_alpha/register.py
@@ -27,7 +27,7 @@ from synapse.http.servlet import (
 )
 from synapse.util.msisdn import phone_number_to_msisdn
 
-from ._base import client_v2_patterns
+from ._base import client_v2_patterns, interactive_auth_handler
 
 import logging
 import hmac
@@ -176,6 +176,7 @@ class RegisterRestServlet(RestServlet):
         self.device_handler = hs.get_device_handler()
         self.macaroon_gen = hs.get_macaroon_generator()
 
+    @interactive_auth_handler
     @defer.inlineCallbacks
     def on_POST(self, request):
         yield run_on_reactor()
@@ -325,14 +326,10 @@ class RegisterRestServlet(RestServlet):
                     [LoginType.MSISDN, LoginType.EMAIL_IDENTITY],
                 ])
 
-        authed, auth_result, params, session_id = yield self.auth_handler.check_auth(
+        auth_result, params, session_id = yield self.auth_handler.check_auth(
             flows, body, self.hs.get_ip_from_request(request)
         )
 
-        if not authed:
-            defer.returnValue((401, auth_result))
-            return
-
         if registered_user_id is not None:
             logger.info(
                 "Already registered user ID %r for this session",
diff --git a/synapse/rest/client/versions.py b/synapse/rest/client/versions.py
index e984ea47db..2ecb15deee 100644
--- a/synapse/rest/client/versions.py
+++ b/synapse/rest/client/versions.py
@@ -30,6 +30,7 @@ class VersionsRestServlet(RestServlet):
                 "r0.0.1",
                 "r0.1.0",
                 "r0.2.0",
+                "r0.3.0",
             ]
         })
 
diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py
index 723f7043f4..40d2e664eb 100644
--- a/synapse/rest/media/v1/preview_url_resource.py
+++ b/synapse/rest/media/v1/preview_url_resource.py
@@ -25,7 +25,8 @@ from synapse.util.stringutils import random_string
 from synapse.util.caches.expiringcache import ExpiringCache
 from synapse.http.client import SpiderHttpClient
 from synapse.http.server import (
-    request_handler, respond_with_json_bytes
+    request_handler, respond_with_json_bytes,
+    respond_with_json,
 )
 from synapse.util.async import ObservableDeferred
 from synapse.util.stringutils import is_ascii
@@ -78,6 +79,9 @@ class PreviewUrlResource(Resource):
             self._expire_url_cache_data, 10 * 1000
         )
 
+    def render_OPTIONS(self, request):
+        return respond_with_json(request, 200, {}, send_cors=True)
+
     def render_GET(self, request):
         self._async_render_GET(request)
         return NOT_DONE_YET
@@ -348,11 +352,16 @@ class PreviewUrlResource(Resource):
     def _expire_url_cache_data(self):
         """Clean up expired url cache content, media and thumbnails.
         """
-
         # TODO: Delete from backup media store
 
         now = self.clock.time_msec()
 
+        logger.info("Running url preview cache expiry")
+
+        if not (yield self.store.has_completed_background_updates()):
+            logger.info("Still running DB updates; skipping expiry")
+            return
+
         # First we delete expired url cache entries
         media_ids = yield self.store.get_expired_url_cache(now)
 
@@ -426,8 +435,7 @@ class PreviewUrlResource(Resource):
 
         yield self.store.delete_url_cache_media(removed_media)
 
-        if removed_media:
-            logger.info("Deleted %d media from url cache", len(removed_media))
+        logger.info("Deleted %d media from url cache", len(removed_media))
 
 
 def decode_and_calc_og(body, media_uri, request_encoding=None):
diff --git a/synapse/server.py b/synapse/server.py
index 10e3e9a4f1..99693071b6 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -39,18 +39,20 @@ from synapse.federation.transaction_queue import TransactionQueue
 from synapse.handlers import Handlers
 from synapse.handlers.appservice import ApplicationServicesHandler
 from synapse.handlers.auth import AuthHandler, MacaroonGeneartor
+from synapse.handlers.deactivate_account import DeactivateAccountHandler
 from synapse.handlers.devicemessage import DeviceMessageHandler
 from synapse.handlers.device import DeviceHandler
 from synapse.handlers.e2e_keys import E2eKeysHandler
 from synapse.handlers.presence import PresenceHandler
 from synapse.handlers.room_list import RoomListHandler
+from synapse.handlers.set_password import SetPasswordHandler
 from synapse.handlers.sync import SyncHandler
 from synapse.handlers.typing import TypingHandler
 from synapse.handlers.events import EventHandler, EventStreamHandler
 from synapse.handlers.initial_sync import InitialSyncHandler
 from synapse.handlers.receipts import ReceiptsHandler
 from synapse.handlers.read_marker import ReadMarkerHandler
-from synapse.handlers.user_directory import UserDirectoyHandler
+from synapse.handlers.user_directory import UserDirectoryHandler
 from synapse.handlers.groups_local import GroupsLocalHandler
 from synapse.handlers.profile import ProfileHandler
 from synapse.groups.groups_server import GroupsServerHandler
@@ -60,7 +62,10 @@ from synapse.http.matrixfederationclient import MatrixFederationHttpClient
 from synapse.notifier import Notifier
 from synapse.push.action_generator import ActionGenerator
 from synapse.push.pusherpool import PusherPool
-from synapse.rest.media.v1.media_repository import MediaRepository
+from synapse.rest.media.v1.media_repository import (
+    MediaRepository,
+    MediaRepositoryResource,
+)
 from synapse.state import StateHandler
 from synapse.storage import DataStore
 from synapse.streams.events import EventSources
@@ -90,17 +95,12 @@ class HomeServer(object):
     """
 
     DEPENDENCIES = [
-        'config',
-        'clock',
         'http_client',
         'db_pool',
-        'persistence_service',
         'replication_layer',
-        'datastore',
         'handlers',
         'v1auth',
         'auth',
-        'rest_servlet_factory',
         'state_handler',
         'presence_handler',
         'sync_handler',
@@ -117,19 +117,10 @@ class HomeServer(object):
         'application_service_handler',
         'device_message_handler',
         'profile_handler',
+        'deactivate_account_handler',
+        'set_password_handler',
         'notifier',
-        'distributor',
-        'client_resource',
-        'resource_for_federation',
-        'resource_for_static_content',
-        'resource_for_web_client',
-        'resource_for_content_repo',
-        'resource_for_server_key',
-        'resource_for_server_key_v2',
-        'resource_for_media_repository',
-        'resource_for_metrics',
         'event_sources',
-        'ratelimiter',
         'keyring',
         'pusherpool',
         'event_builder_factory',
@@ -137,6 +128,7 @@ class HomeServer(object):
         'http_client_context_factory',
         'simple_http_client',
         'media_repository',
+        'media_repository_resource',
         'federation_transport_client',
         'federation_sender',
         'receipts_handler',
@@ -183,6 +175,21 @@ class HomeServer(object):
     def is_mine_id(self, string):
         return string.split(":", 1)[1] == self.hostname
 
+    def get_clock(self):
+        return self.clock
+
+    def get_datastore(self):
+        return self.datastore
+
+    def get_config(self):
+        return self.config
+
+    def get_distributor(self):
+        return self.distributor
+
+    def get_ratelimiter(self):
+        return self.ratelimiter
+
     def build_replication_layer(self):
         return initialize_http_replication(self)
 
@@ -265,6 +272,12 @@ class HomeServer(object):
     def build_profile_handler(self):
         return ProfileHandler(self)
 
+    def build_deactivate_account_handler(self):
+        return DeactivateAccountHandler(self)
+
+    def build_set_password_handler(self):
+        return SetPasswordHandler(self)
+
     def build_event_sources(self):
         return EventSources(self)
 
@@ -294,6 +307,11 @@ class HomeServer(object):
             **self.db_config.get("args", {})
         )
 
+    def build_media_repository_resource(self):
+        # build the media repo resource. This indirects through the HomeServer
+        # to ensure that we only have a single instance of
+        return MediaRepositoryResource(self)
+
     def build_media_repository(self):
         return MediaRepository(self)
 
@@ -321,7 +339,7 @@ class HomeServer(object):
         return ActionGenerator(self)
 
     def build_user_directory_handler(self):
-        return UserDirectoyHandler(self)
+        return UserDirectoryHandler(self)
 
     def build_groups_local_handler(self):
         return GroupsLocalHandler(self)
diff --git a/synapse/server.pyi b/synapse/server.pyi
index e8c0386b7f..41416ef252 100644
--- a/synapse/server.pyi
+++ b/synapse/server.pyi
@@ -3,10 +3,14 @@ import synapse.federation.transaction_queue
 import synapse.federation.transport.client
 import synapse.handlers
 import synapse.handlers.auth
+import synapse.handlers.deactivate_account
 import synapse.handlers.device
 import synapse.handlers.e2e_keys
-import synapse.storage
+import synapse.handlers.set_password
+import synapse.rest.media.v1.media_repository
 import synapse.state
+import synapse.storage
+
 
 class HomeServer(object):
     def get_auth(self) -> synapse.api.auth.Auth:
@@ -30,8 +34,20 @@ class HomeServer(object):
     def get_state_handler(self) -> synapse.state.StateHandler:
         pass
 
+    def get_deactivate_account_handler(self) -> synapse.handlers.deactivate_account.DeactivateAccountHandler:
+        pass
+
+    def get_set_password_handler(self) -> synapse.handlers.set_password.SetPasswordHandler:
+        pass
+
     def get_federation_sender(self) -> synapse.federation.transaction_queue.TransactionQueue:
         pass
 
     def get_federation_transport_client(self) -> synapse.federation.transport.client.TransportLayerClient:
         pass
+
+    def get_media_repository_resource(self) -> synapse.rest.media.v1.media_repository.MediaRepositoryResource:
+        pass
+
+    def get_media_repository(self) -> synapse.rest.media.v1.media_repository.MediaRepository:
+        pass
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index e94917d9cd..b971f0cb18 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -16,8 +16,6 @@ import logging
 
 from synapse.api.errors import StoreError
 from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
-from synapse.util.caches import CACHE_SIZE_FACTOR
-from synapse.util.caches.dictionary_cache import DictionaryCache
 from synapse.util.caches.descriptors import Cache
 from synapse.storage.engines import PostgresEngine
 import synapse.metrics
@@ -180,10 +178,6 @@ class SQLBaseStore(object):
         self._get_event_cache = Cache("*getEvent*", keylen=3,
                                       max_entries=hs.config.event_cache_size)
 
-        self._state_group_cache = DictionaryCache(
-            "*stateGroupCache*", 100000 * CACHE_SIZE_FACTOR
-        )
-
         self._event_fetch_lock = threading.Condition()
         self._event_fetch_list = []
         self._event_fetch_ongoing = 0
@@ -475,23 +469,53 @@ class SQLBaseStore(object):
 
         txn.executemany(sql, vals)
 
+    @defer.inlineCallbacks
     def _simple_upsert(self, table, keyvalues, values,
                        insertion_values={}, desc="_simple_upsert", lock=True):
         """
+
+        `lock` should generally be set to True (the default), but can be set
+        to False if either of the following are true:
+
+        * there is a UNIQUE INDEX on the key columns. In this case a conflict
+          will cause an IntegrityError in which case this function will retry
+          the update.
+
+        * we somehow know that we are the only thread which will be updating
+          this table.
+
         Args:
             table (str): The table to upsert into
             keyvalues (dict): The unique key tables and their new values
             values (dict): The nonunique columns and their new values
-            insertion_values (dict): key/values to use when inserting
+            insertion_values (dict): additional key/values to use only when
+                inserting
+            lock (bool): True to lock the table when doing the upsert.
         Returns:
             Deferred(bool): True if a new entry was created, False if an
                 existing one was updated.
         """
-        return self.runInteraction(
-            desc,
-            self._simple_upsert_txn, table, keyvalues, values, insertion_values,
-            lock
-        )
+        attempts = 0
+        while True:
+            try:
+                result = yield self.runInteraction(
+                    desc,
+                    self._simple_upsert_txn, table, keyvalues, values, insertion_values,
+                    lock=lock
+                )
+                defer.returnValue(result)
+            except self.database_engine.module.IntegrityError as e:
+                attempts += 1
+                if attempts >= 5:
+                    # don't retry forever, because things other than races
+                    # can cause IntegrityErrors
+                    raise
+
+                # presumably we raced with another transaction: let's retry.
+                logger.warn(
+                    "IntegrityError when upserting into %s; retrying: %s",
+                    table, e
+                )
 
     def _simple_upsert_txn(self, txn, table, keyvalues, values, insertion_values={},
                            lock=True):
@@ -499,7 +523,7 @@ class SQLBaseStore(object):
         if lock:
             self.database_engine.lock_table(txn, table)
 
-        # Try to update
+        # First try to update.
         sql = "UPDATE %s SET %s WHERE %s" % (
             table,
             ", ".join("%s = ?" % (k,) for k in values),
@@ -508,28 +532,29 @@ class SQLBaseStore(object):
         sqlargs = values.values() + keyvalues.values()
 
         txn.execute(sql, sqlargs)
-        if txn.rowcount == 0:
-            # We didn't update and rows so insert a new one
-            allvalues = {}
-            allvalues.update(keyvalues)
-            allvalues.update(values)
-            allvalues.update(insertion_values)
+        if txn.rowcount > 0:
+            # successfully updated at least one row.
+            return False
 
-            sql = "INSERT INTO %s (%s) VALUES (%s)" % (
-                table,
-                ", ".join(k for k in allvalues),
-                ", ".join("?" for _ in allvalues)
-            )
-            txn.execute(sql, allvalues.values())
+        # We didn't update any rows so insert a new one
+        allvalues = {}
+        allvalues.update(keyvalues)
+        allvalues.update(values)
+        allvalues.update(insertion_values)
 
-            return True
-        else:
-            return False
+        sql = "INSERT INTO %s (%s) VALUES (%s)" % (
+            table,
+            ", ".join(k for k in allvalues),
+            ", ".join("?" for _ in allvalues)
+        )
+        txn.execute(sql, allvalues.values())
+        # successfully inserted
+        return True
 
     def _simple_select_one(self, table, keyvalues, retcols,
                            allow_none=False, desc="_simple_select_one"):
         """Executes a SELECT query on the named table, which is expected to
-        return a single row, returning a single column from it.
+        return a single row, returning multiple columns from it.
 
         Args:
             table : string giving the table name
@@ -582,20 +607,18 @@ class SQLBaseStore(object):
 
     @staticmethod
     def _simple_select_onecol_txn(txn, table, keyvalues, retcol):
-        if keyvalues:
-            where = "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.iterkeys())
-        else:
-            where = ""
-
         sql = (
-            "SELECT %(retcol)s FROM %(table)s %(where)s"
+            "SELECT %(retcol)s FROM %(table)s"
         ) % {
             "retcol": retcol,
             "table": table,
-            "where": where,
         }
 
-        txn.execute(sql, keyvalues.values())
+        if keyvalues:
+            sql += " WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.iterkeys())
+            txn.execute(sql, keyvalues.values())
+        else:
+            txn.execute(sql)
 
         return [r[0] for r in txn]
 
@@ -606,7 +629,7 @@ class SQLBaseStore(object):
 
         Args:
             table (str): table name
-            keyvalues (dict): column names and values to select the rows with
+            keyvalues (dict|None): column names and values to select the rows with
             retcol (str): column whos value we wish to retrieve.
 
         Returns:
diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py
index c8a1eb016b..56a0bde549 100644
--- a/synapse/storage/account_data.py
+++ b/synapse/storage/account_data.py
@@ -222,9 +222,12 @@ class AccountDataStore(SQLBaseStore):
         """
         content_json = json.dumps(content)
 
-        def add_account_data_txn(txn, next_id):
-            self._simple_upsert_txn(
-                txn,
+        with self._account_data_id_gen.get_next() as next_id:
+            # no need to lock here as room_account_data has a unique constraint
+            # on (user_id, room_id, account_data_type) so _simple_upsert will
+            # retry if there is a conflict.
+            yield self._simple_upsert(
+                desc="add_room_account_data",
                 table="room_account_data",
                 keyvalues={
                     "user_id": user_id,
@@ -234,19 +237,20 @@ class AccountDataStore(SQLBaseStore):
                 values={
                     "stream_id": next_id,
                     "content": content_json,
-                }
-            )
-            txn.call_after(
-                self._account_data_stream_cache.entity_has_changed,
-                user_id, next_id,
+                },
+                lock=False,
             )
-            txn.call_after(self.get_account_data_for_user.invalidate, (user_id,))
-            self._update_max_stream_id(txn, next_id)
 
-        with self._account_data_id_gen.get_next() as next_id:
-            yield self.runInteraction(
-                "add_room_account_data", add_account_data_txn, next_id
-            )
+            # it's theoretically possible for the above to succeed and the
+            # below to fail - in which case we might reuse a stream id on
+            # restart, and the above update might not get propagated. That
+            # doesn't sound any worse than the whole update getting lost,
+            # which is what would happen if we combined the two into one
+            # transaction.
+            yield self._update_max_stream_id(next_id)
+
+            self._account_data_stream_cache.entity_has_changed(user_id, next_id)
+            self.get_account_data_for_user.invalidate((user_id,))
 
         result = self._account_data_id_gen.get_current_token()
         defer.returnValue(result)
@@ -263,9 +267,12 @@ class AccountDataStore(SQLBaseStore):
         """
         content_json = json.dumps(content)
 
-        def add_account_data_txn(txn, next_id):
-            self._simple_upsert_txn(
-                txn,
+        with self._account_data_id_gen.get_next() as next_id:
+            # no need to lock here as account_data has a unique constraint on
+            # (user_id, account_data_type) so _simple_upsert will retry if
+            # there is a conflict.
+            yield self._simple_upsert(
+                desc="add_user_account_data",
                 table="account_data",
                 keyvalues={
                     "user_id": user_id,
@@ -274,40 +281,46 @@ class AccountDataStore(SQLBaseStore):
                 values={
                     "stream_id": next_id,
                     "content": content_json,
-                }
+                },
+                lock=False,
             )
-            txn.call_after(
-                self._account_data_stream_cache.entity_has_changed,
+
+            # it's theoretically possible for the above to succeed and the
+            # below to fail - in which case we might reuse a stream id on
+            # restart, and the above update might not get propagated. That
+            # doesn't sound any worse than the whole update getting lost,
+            # which is what would happen if we combined the two into one
+            # transaction.
+            yield self._update_max_stream_id(next_id)
+
+            self._account_data_stream_cache.entity_has_changed(
                 user_id, next_id,
             )
-            txn.call_after(self.get_account_data_for_user.invalidate, (user_id,))
-            txn.call_after(
-                self.get_global_account_data_by_type_for_user.invalidate,
+            self.get_account_data_for_user.invalidate((user_id,))
+            self.get_global_account_data_by_type_for_user.invalidate(
                 (account_data_type, user_id,)
             )
-            self._update_max_stream_id(txn, next_id)
-
-        with self._account_data_id_gen.get_next() as next_id:
-            yield self.runInteraction(
-                "add_user_account_data", add_account_data_txn, next_id
-            )
 
         result = self._account_data_id_gen.get_current_token()
         defer.returnValue(result)
 
-    def _update_max_stream_id(self, txn, next_id):
+    def _update_max_stream_id(self, next_id):
         """Update the max stream_id
 
         Args:
-            txn: The database cursor
             next_id(int): The the revision to advance to.
         """
-        update_max_id_sql = (
-            "UPDATE account_data_max_stream_id"
-            " SET stream_id = ?"
-            " WHERE stream_id < ?"
+        def _update(txn):
+            update_max_id_sql = (
+                "UPDATE account_data_max_stream_id"
+                " SET stream_id = ?"
+                " WHERE stream_id < ?"
+            )
+            txn.execute(update_max_id_sql, (next_id, next_id))
+        return self.runInteraction(
+            "update_account_data_max_stream_id",
+            _update,
         )
-        txn.execute(update_max_id_sql, (next_id, next_id))
 
     @cachedInlineCallbacks(num_args=2, cache_context=True, max_entries=5000)
     def is_ignored_by(self, ignored_user_id, ignorer_user_id, cache_context):
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index 6f235ac051..11a1b942f1 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -85,6 +85,7 @@ class BackgroundUpdateStore(SQLBaseStore):
         self._background_update_performance = {}
         self._background_update_queue = []
         self._background_update_handlers = {}
+        self._all_done = False
 
     @defer.inlineCallbacks
     def start_doing_background_updates(self):
@@ -106,9 +107,41 @@ class BackgroundUpdateStore(SQLBaseStore):
                         "No more background updates to do."
                         " Unscheduling background update task."
                     )
+                    self._all_done = True
                     defer.returnValue(None)
 
     @defer.inlineCallbacks
+    def has_completed_background_updates(self):
+        """Check if all the background updates have completed
+
+        Returns:
+            Deferred[bool]: True if all background updates have completed
+        """
+        # if we've previously determined that there is nothing left to do, that
+        # is easy
+        if self._all_done:
+            defer.returnValue(True)
+
+        # obviously, if we have things in our queue, we're not done.
+        if self._background_update_queue:
+            defer.returnValue(False)
+
+        # otherwise, check if there are updates to be run. This is important,
+        # as we may be running on a worker which doesn't perform the bg updates
+        # itself, but still wants to wait for them to happen.
+        updates = yield self._simple_select_onecol(
+            "background_updates",
+            keyvalues=None,
+            retcol="1",
+            desc="check_background_updates",
+        )
+        if not updates:
+            self._all_done = True
+            defer.returnValue(True)
+
+        defer.returnValue(False)
+
+    @defer.inlineCallbacks
     def do_next_background_update(self, desired_duration_ms):
         """Does some amount of work on the next queued background update
 
@@ -269,7 +302,7 @@ class BackgroundUpdateStore(SQLBaseStore):
             # Sqlite doesn't support concurrent creation of indexes.
             #
             # We don't use partial indices on SQLite as it wasn't introduced
-            # until 3.8, and wheezy has 3.7
+            # until 3.8, and wheezy and CentOS 7 have 3.7
             #
             # We assume that sqlite doesn't give us invalid indices; however
             # we may still end up with the index existing but the
diff --git a/synapse/storage/media_repository.py b/synapse/storage/media_repository.py
index 52e5cdad70..a66ff7c1e0 100644
--- a/synapse/storage/media_repository.py
+++ b/synapse/storage/media_repository.py
@@ -12,13 +12,23 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+from synapse.storage.background_updates import BackgroundUpdateStore
 
-from ._base import SQLBaseStore
 
-
-class MediaRepositoryStore(SQLBaseStore):
+class MediaRepositoryStore(BackgroundUpdateStore):
     """Persistence for attachments and avatars"""
 
+    def __init__(self, db_conn, hs):
+        super(MediaRepositoryStore, self).__init__(db_conn, hs)
+
+        self.register_background_index_update(
+            update_name='local_media_repository_url_idx',
+            index_name='local_media_repository_url_idx',
+            table='local_media_repository',
+            columns=['created_ts'],
+            where_clause='url_cache IS NOT NULL',
+        )
+
     def get_default_thumbnails(self, top_level_type, sub_type):
         return []
 
diff --git a/synapse/storage/profile.py b/synapse/storage/profile.py
index beea3102fc..ec02e73bc2 100644
--- a/synapse/storage/profile.py
+++ b/synapse/storage/profile.py
@@ -15,6 +15,9 @@
 
 from twisted.internet import defer
 
+from synapse.storage.roommember import ProfileInfo
+from synapse.api.errors import StoreError
+
 from ._base import SQLBaseStore
 
 
@@ -26,6 +29,30 @@ class ProfileStore(SQLBaseStore):
             desc="create_profile",
         )
 
+    @defer.inlineCallbacks
+    def get_profileinfo(self, user_localpart):
+        try:
+            profile = yield self._simple_select_one(
+                table="profiles",
+                keyvalues={"user_id": user_localpart},
+                retcols=("displayname", "avatar_url"),
+                desc="get_profileinfo",
+            )
+        except StoreError as e:
+            if e.code == 404:
+                # no match
+                defer.returnValue(ProfileInfo(None, None))
+                return
+            else:
+                raise
+
+        defer.returnValue(
+            ProfileInfo(
+                avatar_url=profile['avatar_url'],
+                display_name=profile['displayname'],
+            )
+        )
+
     def get_profile_displayname(self, user_localpart):
         return self._simple_select_one_onecol(
             table="profiles",
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index 34d2f82b7f..3d8b4d5d5b 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -204,34 +204,35 @@ class PusherStore(SQLBaseStore):
                    pushkey, pushkey_ts, lang, data, last_stream_ordering,
                    profile_tag=""):
         with self._pushers_id_gen.get_next() as stream_id:
-            def f(txn):
-                newly_inserted = self._simple_upsert_txn(
-                    txn,
-                    "pushers",
-                    {
-                        "app_id": app_id,
-                        "pushkey": pushkey,
-                        "user_name": user_id,
-                    },
-                    {
-                        "access_token": access_token,
-                        "kind": kind,
-                        "app_display_name": app_display_name,
-                        "device_display_name": device_display_name,
-                        "ts": pushkey_ts,
-                        "lang": lang,
-                        "data": encode_canonical_json(data),
-                        "last_stream_ordering": last_stream_ordering,
-                        "profile_tag": profile_tag,
-                        "id": stream_id,
-                    },
-                )
-                if newly_inserted:
-                    # get_if_user_has_pusher only cares if the user has
-                    # at least *one* pusher.
-                    txn.call_after(self.get_if_user_has_pusher.invalidate, (user_id,))
+            # no need to lock because `pushers` has a unique key on
+            # (app_id, pushkey, user_name) so _simple_upsert will retry
+            newly_inserted = yield self._simple_upsert(
+                table="pushers",
+                keyvalues={
+                    "app_id": app_id,
+                    "pushkey": pushkey,
+                    "user_name": user_id,
+                },
+                values={
+                    "access_token": access_token,
+                    "kind": kind,
+                    "app_display_name": app_display_name,
+                    "device_display_name": device_display_name,
+                    "ts": pushkey_ts,
+                    "lang": lang,
+                    "data": encode_canonical_json(data),
+                    "last_stream_ordering": last_stream_ordering,
+                    "profile_tag": profile_tag,
+                    "id": stream_id,
+                },
+                desc="add_pusher",
+                lock=False,
+            )
 
-            yield self.runInteraction("add_pusher", f)
+            if newly_inserted:
+                # get_if_user_has_pusher only cares if the user has
+                # at least *one* pusher.
+                self.get_if_user_has_pusher.invalidate(user_id,)
 
     @defer.inlineCallbacks
     def delete_pusher_by_app_id_pushkey_user_id(self, app_id, pushkey, user_id):
@@ -243,11 +244,19 @@ class PusherStore(SQLBaseStore):
                 "pushers",
                 {"app_id": app_id, "pushkey": pushkey, "user_name": user_id}
             )
-            self._simple_upsert_txn(
+
+            # it's possible for us to end up with duplicate rows for
+            # (app_id, pushkey, user_id) at different stream_ids, but that
+            # doesn't really matter.
+            self._simple_insert_txn(
                 txn,
-                "deleted_pushers",
-                {"app_id": app_id, "pushkey": pushkey, "user_id": user_id},
-                {"stream_id": stream_id},
+                table="deleted_pushers",
+                values={
+                    "stream_id": stream_id,
+                    "app_id": app_id,
+                    "pushkey": pushkey,
+                    "user_id": user_id,
+                },
             )
 
         with self._pushers_id_gen.get_next() as stream_id:
@@ -310,9 +319,12 @@ class PusherStore(SQLBaseStore):
 
     @defer.inlineCallbacks
     def set_throttle_params(self, pusher_id, room_id, params):
+        # no need to lock because `pusher_throttle` has a primary key on
+        # (pusher, room_id) so _simple_upsert will retry
         yield self._simple_upsert(
             "pusher_throttle",
             {"pusher": pusher_id, "room_id": room_id},
             params,
-            desc="set_throttle_params"
+            desc="set_throttle_params",
+            lock=False,
         )
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 8b9544c209..3aa810981f 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -254,8 +254,8 @@ class RegistrationStore(background_updates.BackgroundUpdateStore):
                 If None, tokens associated with any device (or no device) will
                 be deleted
         Returns:
-            defer.Deferred[list[str, str|None]]: a list of the deleted tokens
-                and device IDs
+            defer.Deferred[list[str, int, str|None, int]]: a list of
+                (token, token id, device id) for each of the deleted tokens
         """
         def f(txn):
             keyvalues = {
@@ -272,12 +272,12 @@ class RegistrationStore(background_updates.BackgroundUpdateStore):
                 values.append(except_token_id)
 
             txn.execute(
-                "SELECT token, device_id FROM access_tokens WHERE %s" % where_clause,
+                "SELECT token, id, device_id FROM access_tokens WHERE %s" % where_clause,
                 values
             )
-            tokens_and_devices = [(r[0], r[1]) for r in txn]
+            tokens_and_devices = [(r[0], r[1], r[2]) for r in txn]
 
-            for token, _ in tokens_and_devices:
+            for token, _, _ in tokens_and_devices:
                 self._invalidate_cache_and_stream(
                     txn, self.get_user_by_access_token, (token,)
                 )
diff --git a/synapse/storage/schema/delta/44/expire_url_cache.sql b/synapse/storage/schema/delta/44/expire_url_cache.sql
index e2b775f038..b12f9b2ebf 100644
--- a/synapse/storage/schema/delta/44/expire_url_cache.sql
+++ b/synapse/storage/schema/delta/44/expire_url_cache.sql
@@ -13,7 +13,10 @@
  * limitations under the License.
  */
 
-CREATE INDEX local_media_repository_url_idx ON local_media_repository(created_ts) WHERE url_cache IS NOT NULL;
+-- this didn't work on SQLite 3.7 (because of lack of partial indexes), so was
+-- removed and replaced with 46/local_media_repository_url_idx.sql.
+--
+-- CREATE INDEX local_media_repository_url_idx ON local_media_repository(created_ts) WHERE url_cache IS NOT NULL;
 
 -- we need to change `expires` to `expires_ts` so that we can index on it. SQLite doesn't support
 -- indices on expressions until 3.9.
diff --git a/synapse/storage/schema/delta/46/drop_unique_deleted_pushers.sql b/synapse/storage/schema/delta/46/drop_unique_deleted_pushers.sql
new file mode 100644
index 0000000000..bb307889c1
--- /dev/null
+++ b/synapse/storage/schema/delta/46/drop_unique_deleted_pushers.sql
@@ -0,0 +1,35 @@
+/* Copyright 2017 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- drop the unique constraint on deleted_pushers so that we can just insert
+-- into it rather than upserting.
+
+CREATE TABLE deleted_pushers2 (
+    stream_id BIGINT NOT NULL,
+    app_id TEXT NOT NULL,
+    pushkey TEXT NOT NULL,
+    user_id TEXT NOT NULL
+);
+
+INSERT INTO deleted_pushers2 (stream_id, app_id, pushkey, user_id)
+    SELECT stream_id, app_id, pushkey, user_id from deleted_pushers;
+
+DROP TABLE deleted_pushers;
+ALTER TABLE deleted_pushers2 RENAME TO deleted_pushers;
+
+-- create the index after doing the inserts because that's more efficient.
+-- it also means we can give it the same name as the old one without renaming.
+CREATE INDEX deleted_pushers_stream_id ON deleted_pushers (stream_id);
+
diff --git a/synapse/storage/schema/delta/46/local_media_repository_url_idx.sql b/synapse/storage/schema/delta/46/local_media_repository_url_idx.sql
new file mode 100644
index 0000000000..bbfc7f5d1a
--- /dev/null
+++ b/synapse/storage/schema/delta/46/local_media_repository_url_idx.sql
@@ -0,0 +1,24 @@
+/* Copyright 2017 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- register a background update which will recreate the
+-- local_media_repository_url_idx index.
+--
+-- We do this as a bg update not because it is a particularly onerous
+-- operation, but because we'd like it to be a partial index if possible, and
+-- the background_index_update code will understand whether we are on
+-- postgres or sqlite and behave accordingly.
+INSERT INTO background_updates (update_name, progress_json) VALUES
+    ('local_media_repository_url_idx', '{}');
diff --git a/synapse/storage/schema/delta/46/user_dir_null_room_ids.sql b/synapse/storage/schema/delta/46/user_dir_null_room_ids.sql
new file mode 100644
index 0000000000..cb0d5a2576
--- /dev/null
+++ b/synapse/storage/schema/delta/46/user_dir_null_room_ids.sql
@@ -0,0 +1,35 @@
+/* Copyright 2017 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- change the user_directory table to also cover global local user profiles
+-- rather than just profiles within specific rooms.
+
+CREATE TABLE user_directory2 (
+    user_id TEXT NOT NULL,
+    room_id TEXT,
+    display_name TEXT,
+    avatar_url TEXT
+);
+
+INSERT INTO user_directory2(user_id, room_id, display_name, avatar_url)
+    SELECT user_id, room_id, display_name, avatar_url from user_directory;
+
+DROP TABLE user_directory;
+ALTER TABLE user_directory2 RENAME TO user_directory;
+
+-- create indexes after doing the inserts because that's more efficient.
+-- it also means we can give it the same name as the old one without renaming.
+CREATE INDEX user_directory_room_idx ON user_directory(room_id);
+CREATE UNIQUE INDEX user_directory_user_idx ON user_directory(user_id);
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index dd01b68762..360e3e4355 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -13,16 +13,18 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ._base import SQLBaseStore
-from synapse.util.caches.descriptors import cached, cachedList
-from synapse.util.caches import intern_string
-from synapse.util.stringutils import to_ascii
-from synapse.storage.engines import PostgresEngine
+from collections import namedtuple
+import logging
 
 from twisted.internet import defer
-from collections import namedtuple
 
-import logging
+from synapse.storage.background_updates import BackgroundUpdateStore
+from synapse.storage.engines import PostgresEngine
+from synapse.util.caches import intern_string, CACHE_SIZE_FACTOR
+from synapse.util.caches.descriptors import cached, cachedList
+from synapse.util.caches.dictionary_cache import DictionaryCache
+from synapse.util.stringutils import to_ascii
+from ._base import SQLBaseStore
 
 logger = logging.getLogger(__name__)
 
@@ -40,23 +42,11 @@ class _GetStateGroupDelta(namedtuple("_GetStateGroupDelta", ("prev_group", "delt
         return len(self.delta_ids) if self.delta_ids else 0
 
 
-class StateStore(SQLBaseStore):
-    """ Keeps track of the state at a given event.
+class StateGroupReadStore(SQLBaseStore):
+    """The read-only parts of StateGroupStore
 
-    This is done by the concept of `state groups`. Every event is a assigned
-    a state group (identified by an arbitrary string), which references a
-    collection of state events. The current state of an event is then the
-    collection of state events referenced by the event's state group.
-
-    Hence, every change in the current state causes a new state group to be
-    generated. However, if no change happens (e.g., if we get a message event
-    with only one parent it inherits the state group from its parent.)
-
-    There are three tables:
-      * `state_groups`: Stores group name, first event with in the group and
-        room id.
-      * `event_to_state_groups`: Maps events to state groups.
-      * `state_groups_state`: Maps state group to state events.
+    None of these functions write to the state tables, so are suitable for
+    including in the SlavedStores.
     """
 
     STATE_GROUP_DEDUPLICATION_UPDATE_NAME = "state_group_state_deduplication"
@@ -64,21 +54,10 @@ class StateStore(SQLBaseStore):
     CURRENT_STATE_INDEX_UPDATE_NAME = "current_state_members_idx"
 
     def __init__(self, db_conn, hs):
-        super(StateStore, self).__init__(db_conn, hs)
-        self.register_background_update_handler(
-            self.STATE_GROUP_DEDUPLICATION_UPDATE_NAME,
-            self._background_deduplicate_state,
-        )
-        self.register_background_update_handler(
-            self.STATE_GROUP_INDEX_UPDATE_NAME,
-            self._background_index_state,
-        )
-        self.register_background_index_update(
-            self.CURRENT_STATE_INDEX_UPDATE_NAME,
-            index_name="current_state_events_member_index",
-            table="current_state_events",
-            columns=["state_key"],
-            where_clause="type='m.room.member'",
+        super(StateGroupReadStore, self).__init__(db_conn, hs)
+
+        self._state_group_cache = DictionaryCache(
+            "*stateGroupCache*", 100000 * CACHE_SIZE_FACTOR
         )
 
     @cached(max_entries=100000, iterable=True)
@@ -190,178 +169,6 @@ class StateStore(SQLBaseStore):
             for group, event_id_map in group_to_ids.iteritems()
         })
 
-    def _have_persisted_state_group_txn(self, txn, state_group):
-        txn.execute(
-            "SELECT count(*) FROM state_groups WHERE id = ?",
-            (state_group,)
-        )
-        row = txn.fetchone()
-        return row and row[0]
-
-    def _store_mult_state_groups_txn(self, txn, events_and_contexts):
-        state_groups = {}
-        for event, context in events_and_contexts:
-            if event.internal_metadata.is_outlier():
-                continue
-
-            if context.current_state_ids is None:
-                # AFAIK, this can never happen
-                logger.error(
-                    "Non-outlier event %s had current_state_ids==None",
-                    event.event_id)
-                continue
-
-            # if the event was rejected, just give it the same state as its
-            # predecessor.
-            if context.rejected:
-                state_groups[event.event_id] = context.prev_group
-                continue
-
-            state_groups[event.event_id] = context.state_group
-
-            if self._have_persisted_state_group_txn(txn, context.state_group):
-                continue
-
-            self._simple_insert_txn(
-                txn,
-                table="state_groups",
-                values={
-                    "id": context.state_group,
-                    "room_id": event.room_id,
-                    "event_id": event.event_id,
-                },
-            )
-
-            # We persist as a delta if we can, while also ensuring the chain
-            # of deltas isn't tooo long, as otherwise read performance degrades.
-            if context.prev_group:
-                is_in_db = self._simple_select_one_onecol_txn(
-                    txn,
-                    table="state_groups",
-                    keyvalues={"id": context.prev_group},
-                    retcol="id",
-                    allow_none=True,
-                )
-                if not is_in_db:
-                    raise Exception(
-                        "Trying to persist state with unpersisted prev_group: %r"
-                        % (context.prev_group,)
-                    )
-
-                potential_hops = self._count_state_group_hops_txn(
-                    txn, context.prev_group
-                )
-            if context.prev_group and potential_hops < MAX_STATE_DELTA_HOPS:
-                self._simple_insert_txn(
-                    txn,
-                    table="state_group_edges",
-                    values={
-                        "state_group": context.state_group,
-                        "prev_state_group": context.prev_group,
-                    },
-                )
-
-                self._simple_insert_many_txn(
-                    txn,
-                    table="state_groups_state",
-                    values=[
-                        {
-                            "state_group": context.state_group,
-                            "room_id": event.room_id,
-                            "type": key[0],
-                            "state_key": key[1],
-                            "event_id": state_id,
-                        }
-                        for key, state_id in context.delta_ids.iteritems()
-                    ],
-                )
-            else:
-                self._simple_insert_many_txn(
-                    txn,
-                    table="state_groups_state",
-                    values=[
-                        {
-                            "state_group": context.state_group,
-                            "room_id": event.room_id,
-                            "type": key[0],
-                            "state_key": key[1],
-                            "event_id": state_id,
-                        }
-                        for key, state_id in context.current_state_ids.iteritems()
-                    ],
-                )
-
-            # Prefill the state group cache with this group.
-            # It's fine to use the sequence like this as the state group map
-            # is immutable. (If the map wasn't immutable then this prefill could
-            # race with another update)
-            txn.call_after(
-                self._state_group_cache.update,
-                self._state_group_cache.sequence,
-                key=context.state_group,
-                value=dict(context.current_state_ids),
-                full=True,
-            )
-
-        self._simple_insert_many_txn(
-            txn,
-            table="event_to_state_groups",
-            values=[
-                {
-                    "state_group": state_group_id,
-                    "event_id": event_id,
-                }
-                for event_id, state_group_id in state_groups.iteritems()
-            ],
-        )
-
-        for event_id, state_group_id in state_groups.iteritems():
-            txn.call_after(
-                self._get_state_group_for_event.prefill,
-                (event_id,), state_group_id
-            )
-
-    def _count_state_group_hops_txn(self, txn, state_group):
-        """Given a state group, count how many hops there are in the tree.
-
-        This is used to ensure the delta chains don't get too long.
-        """
-        if isinstance(self.database_engine, PostgresEngine):
-            sql = ("""
-                WITH RECURSIVE state(state_group) AS (
-                    VALUES(?::bigint)
-                    UNION ALL
-                    SELECT prev_state_group FROM state_group_edges e, state s
-                    WHERE s.state_group = e.state_group
-                )
-                SELECT count(*) FROM state;
-            """)
-
-            txn.execute(sql, (state_group,))
-            row = txn.fetchone()
-            if row and row[0]:
-                return row[0]
-            else:
-                return 0
-        else:
-            # We don't use WITH RECURSIVE on sqlite3 as there are distributions
-            # that ship with an sqlite3 version that doesn't support it (e.g. wheezy)
-            next_group = state_group
-            count = 0
-
-            while next_group:
-                next_group = self._simple_select_one_onecol_txn(
-                    txn,
-                    table="state_group_edges",
-                    keyvalues={"state_group": next_group},
-                    retcol="prev_state_group",
-                    allow_none=True,
-                )
-                if next_group:
-                    count += 1
-
-            return count
-
     @defer.inlineCallbacks
     def _get_state_groups_from_groups(self, groups, types):
         """Returns dictionary state_group -> (dict of (type, state_key) -> event id)
@@ -742,6 +549,220 @@ class StateStore(SQLBaseStore):
 
         defer.returnValue(results)
 
+
+class StateStore(StateGroupReadStore, BackgroundUpdateStore):
+    """ Keeps track of the state at a given event.
+
+    This is done by the concept of `state groups`. Every event is a assigned
+    a state group (identified by an arbitrary string), which references a
+    collection of state events. The current state of an event is then the
+    collection of state events referenced by the event's state group.
+
+    Hence, every change in the current state causes a new state group to be
+    generated. However, if no change happens (e.g., if we get a message event
+    with only one parent it inherits the state group from its parent.)
+
+    There are three tables:
+      * `state_groups`: Stores group name, first event with in the group and
+        room id.
+      * `event_to_state_groups`: Maps events to state groups.
+      * `state_groups_state`: Maps state group to state events.
+    """
+
+    STATE_GROUP_DEDUPLICATION_UPDATE_NAME = "state_group_state_deduplication"
+    STATE_GROUP_INDEX_UPDATE_NAME = "state_group_state_type_index"
+    CURRENT_STATE_INDEX_UPDATE_NAME = "current_state_members_idx"
+
+    def __init__(self, db_conn, hs):
+        super(StateStore, self).__init__(db_conn, hs)
+        self.register_background_update_handler(
+            self.STATE_GROUP_DEDUPLICATION_UPDATE_NAME,
+            self._background_deduplicate_state,
+        )
+        self.register_background_update_handler(
+            self.STATE_GROUP_INDEX_UPDATE_NAME,
+            self._background_index_state,
+        )
+        self.register_background_index_update(
+            self.CURRENT_STATE_INDEX_UPDATE_NAME,
+            index_name="current_state_events_member_index",
+            table="current_state_events",
+            columns=["state_key"],
+            where_clause="type='m.room.member'",
+        )
+
+    def _have_persisted_state_group_txn(self, txn, state_group):
+        txn.execute(
+            "SELECT count(*) FROM state_groups WHERE id = ?",
+            (state_group,)
+        )
+        row = txn.fetchone()
+        return row and row[0]
+
+    def _store_mult_state_groups_txn(self, txn, events_and_contexts):
+        state_groups = {}
+        for event, context in events_and_contexts:
+            if event.internal_metadata.is_outlier():
+                continue
+
+            if context.current_state_ids is None:
+                # AFAIK, this can never happen
+                logger.error(
+                    "Non-outlier event %s had current_state_ids==None",
+                    event.event_id)
+                continue
+
+            # if the event was rejected, just give it the same state as its
+            # predecessor.
+            if context.rejected:
+                state_groups[event.event_id] = context.prev_group
+                continue
+
+            state_groups[event.event_id] = context.state_group
+
+            if self._have_persisted_state_group_txn(txn, context.state_group):
+                continue
+
+            self._simple_insert_txn(
+                txn,
+                table="state_groups",
+                values={
+                    "id": context.state_group,
+                    "room_id": event.room_id,
+                    "event_id": event.event_id,
+                },
+            )
+
+            # We persist as a delta if we can, while also ensuring the chain
+            # of deltas isn't tooo long, as otherwise read performance degrades.
+            if context.prev_group:
+                is_in_db = self._simple_select_one_onecol_txn(
+                    txn,
+                    table="state_groups",
+                    keyvalues={"id": context.prev_group},
+                    retcol="id",
+                    allow_none=True,
+                )
+                if not is_in_db:
+                    raise Exception(
+                        "Trying to persist state with unpersisted prev_group: %r"
+                        % (context.prev_group,)
+                    )
+
+                potential_hops = self._count_state_group_hops_txn(
+                    txn, context.prev_group
+                )
+            if context.prev_group and potential_hops < MAX_STATE_DELTA_HOPS:
+                self._simple_insert_txn(
+                    txn,
+                    table="state_group_edges",
+                    values={
+                        "state_group": context.state_group,
+                        "prev_state_group": context.prev_group,
+                    },
+                )
+
+                self._simple_insert_many_txn(
+                    txn,
+                    table="state_groups_state",
+                    values=[
+                        {
+                            "state_group": context.state_group,
+                            "room_id": event.room_id,
+                            "type": key[0],
+                            "state_key": key[1],
+                            "event_id": state_id,
+                        }
+                        for key, state_id in context.delta_ids.iteritems()
+                    ],
+                )
+            else:
+                self._simple_insert_many_txn(
+                    txn,
+                    table="state_groups_state",
+                    values=[
+                        {
+                            "state_group": context.state_group,
+                            "room_id": event.room_id,
+                            "type": key[0],
+                            "state_key": key[1],
+                            "event_id": state_id,
+                        }
+                        for key, state_id in context.current_state_ids.iteritems()
+                    ],
+                )
+
+            # Prefill the state group cache with this group.
+            # It's fine to use the sequence like this as the state group map
+            # is immutable. (If the map wasn't immutable then this prefill could
+            # race with another update)
+            txn.call_after(
+                self._state_group_cache.update,
+                self._state_group_cache.sequence,
+                key=context.state_group,
+                value=dict(context.current_state_ids),
+                full=True,
+            )
+
+        self._simple_insert_many_txn(
+            txn,
+            table="event_to_state_groups",
+            values=[
+                {
+                    "state_group": state_group_id,
+                    "event_id": event_id,
+                }
+                for event_id, state_group_id in state_groups.iteritems()
+            ],
+        )
+
+        for event_id, state_group_id in state_groups.iteritems():
+            txn.call_after(
+                self._get_state_group_for_event.prefill,
+                (event_id,), state_group_id
+            )
+
+    def _count_state_group_hops_txn(self, txn, state_group):
+        """Given a state group, count how many hops there are in the tree.
+
+        This is used to ensure the delta chains don't get too long.
+        """
+        if isinstance(self.database_engine, PostgresEngine):
+            sql = ("""
+                WITH RECURSIVE state(state_group) AS (
+                    VALUES(?::bigint)
+                    UNION ALL
+                    SELECT prev_state_group FROM state_group_edges e, state s
+                    WHERE s.state_group = e.state_group
+                )
+                SELECT count(*) FROM state;
+            """)
+
+            txn.execute(sql, (state_group,))
+            row = txn.fetchone()
+            if row and row[0]:
+                return row[0]
+            else:
+                return 0
+        else:
+            # We don't use WITH RECURSIVE on sqlite3 as there are distributions
+            # that ship with an sqlite3 version that doesn't support it (e.g. wheezy)
+            next_group = state_group
+            count = 0
+
+            while next_group:
+                next_group = self._simple_select_one_onecol_txn(
+                    txn,
+                    table="state_group_edges",
+                    keyvalues={"state_group": next_group},
+                    retcol="prev_state_group",
+                    allow_none=True,
+                )
+                if next_group:
+                    count += 1
+
+            return count
+
     def get_next_state_group(self):
         return self._state_groups_id_gen.get_next()
 
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index dddd5fc0e7..52bdce5be2 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -39,7 +39,7 @@ from ._base import SQLBaseStore
 from synapse.util.caches.descriptors import cached
 from synapse.api.constants import EventTypes
 from synapse.types import RoomStreamToken
-from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
+from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
 from synapse.storage.engines import PostgresEngine, Sqlite3Engine
 
 import logging
@@ -234,7 +234,7 @@ class StreamStore(SQLBaseStore):
         results = {}
         room_ids = list(room_ids)
         for rm_ids in (room_ids[i:i + 20] for i in xrange(0, len(room_ids), 20)):
-            res = yield preserve_context_over_deferred(defer.gatherResults([
+            res = yield make_deferred_yieldable(defer.gatherResults([
                 preserve_fn(self.get_room_events_stream_for_room)(
                     room_id, from_key, to_key, limit, order=order,
                 )
diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py
index 5dc5b9582a..c9bff408ef 100644
--- a/synapse/storage/user_directory.py
+++ b/synapse/storage/user_directory.py
@@ -164,7 +164,7 @@ class UserDirectoryStore(SQLBaseStore):
             )
 
             if isinstance(self.database_engine, PostgresEngine):
-                # We weight the loclpart most highly, then display name and finally
+                # We weight the localpart most highly, then display name and finally
                 # server name
                 if new_entry:
                     sql = """
@@ -317,6 +317,16 @@ class UserDirectoryStore(SQLBaseStore):
         rows = yield self._execute("get_all_rooms", None, sql)
         defer.returnValue([room_id for room_id, in rows])
 
+    @defer.inlineCallbacks
+    def get_all_local_users(self):
+        """Get all local users
+        """
+        sql = """
+            SELECT name FROM users
+        """
+        rows = yield self._execute("get_all_local_users", None, sql)
+        defer.returnValue([name for name, in rows])
+
     def add_users_who_share_room(self, room_id, share_private, user_id_tuples):
         """Insert entries into the users_who_share_rooms table. The first
         user should be a local user.
@@ -629,6 +639,20 @@ class UserDirectoryStore(SQLBaseStore):
                     ]
                 }
         """
+
+        if self.hs.config.user_directory_search_all_users:
+            join_clause = ""
+            where_clause = "?<>''"  # naughty hack to keep the same number of binds
+        else:
+            join_clause = """
+                LEFT JOIN users_in_public_rooms AS p USING (user_id)
+                LEFT JOIN (
+                    SELECT other_user_id AS user_id FROM users_who_share_rooms
+                    WHERE user_id = ? AND share_private
+                ) AS s USING (user_id)
+            """
+            where_clause = "(s.user_id IS NOT NULL OR p.user_id IS NOT NULL)"
+
         if isinstance(self.database_engine, PostgresEngine):
             full_query, exact_query, prefix_query = _parse_query_postgres(search_term)
 
@@ -641,13 +665,9 @@ class UserDirectoryStore(SQLBaseStore):
                 SELECT d.user_id, display_name, avatar_url
                 FROM user_directory_search
                 INNER JOIN user_directory AS d USING (user_id)
-                LEFT JOIN users_in_public_rooms AS p USING (user_id)
-                LEFT JOIN (
-                    SELECT other_user_id AS user_id FROM users_who_share_rooms
-                    WHERE user_id = ? AND share_private
-                ) AS s USING (user_id)
+                %s
                 WHERE
-                    (s.user_id IS NOT NULL OR p.user_id IS NOT NULL)
+                    %s
                     AND vector @@ to_tsquery('english', ?)
                 ORDER BY
                     (CASE WHEN s.user_id IS NOT NULL THEN 4.0 ELSE 1.0 END)
@@ -671,7 +691,7 @@ class UserDirectoryStore(SQLBaseStore):
                     display_name IS NULL,
                     avatar_url IS NULL
                 LIMIT ?
-            """
+            """ % (join_clause, where_clause)
             args = (user_id, full_query, exact_query, prefix_query, limit + 1,)
         elif isinstance(self.database_engine, Sqlite3Engine):
             search_query = _parse_query_sqlite(search_term)
@@ -680,20 +700,16 @@ class UserDirectoryStore(SQLBaseStore):
                 SELECT d.user_id, display_name, avatar_url
                 FROM user_directory_search
                 INNER JOIN user_directory AS d USING (user_id)
-                LEFT JOIN users_in_public_rooms AS p USING (user_id)
-                LEFT JOIN (
-                    SELECT other_user_id AS user_id FROM users_who_share_rooms
-                    WHERE user_id = ? AND share_private
-                ) AS s USING (user_id)
+                %s
                 WHERE
-                    (s.user_id IS NOT NULL OR p.user_id IS NOT NULL)
+                    %s
                     AND value MATCH ?
                 ORDER BY
                     rank(matchinfo(user_directory_search)) DESC,
                     display_name IS NULL,
                     avatar_url IS NULL
                 LIMIT ?
-            """
+            """ % (join_clause, where_clause)
             args = (user_id, search_query, limit + 1)
         else:
             # This should be unreachable.
@@ -723,7 +739,7 @@ def _parse_query_sqlite(search_term):
 
     # Pull out the individual words, discarding any non-word characters.
     results = re.findall(r"([\w\-]+)", search_term, re.UNICODE)
-    return " & ".join("(%s* | %s)" % (result, result,) for result in results)
+    return " & ".join("(%s* OR %s)" % (result, result,) for result in results)
 
 
 def _parse_query_postgres(search_term):
diff --git a/synapse/util/async.py b/synapse/util/async.py
index e786fb38a9..0729bb2863 100644
--- a/synapse/util/async.py
+++ b/synapse/util/async.py
@@ -17,7 +17,7 @@
 from twisted.internet import defer, reactor
 
 from .logcontext import (
-    PreserveLoggingContext, preserve_fn, preserve_context_over_deferred,
+    PreserveLoggingContext, make_deferred_yieldable, preserve_fn
 )
 from synapse.util import logcontext, unwrapFirstError
 
@@ -351,7 +351,7 @@ class ReadWriteLock(object):
 
         # We wait for the latest writer to finish writing. We can safely ignore
         # any existing readers... as they're readers.
-        yield curr_writer
+        yield make_deferred_yieldable(curr_writer)
 
         @contextmanager
         def _ctx_manager():
@@ -380,7 +380,7 @@ class ReadWriteLock(object):
         curr_readers.clear()
         self.key_to_current_writer[key] = new_defer
 
-        yield preserve_context_over_deferred(defer.gatherResults(to_wait_on))
+        yield make_deferred_yieldable(defer.gatherResults(to_wait_on))
 
         @contextmanager
         def _ctx_manager():
diff --git a/synapse/util/distributor.py b/synapse/util/distributor.py
index e68f94ce77..734331caaa 100644
--- a/synapse/util/distributor.py
+++ b/synapse/util/distributor.py
@@ -13,32 +13,24 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from twisted.internet import defer
+import logging
 
-from synapse.util.logcontext import (
-    PreserveLoggingContext, preserve_context_over_fn
-)
+from twisted.internet import defer
 
 from synapse.util import unwrapFirstError
-
-import logging
-
+from synapse.util.logcontext import PreserveLoggingContext
 
 logger = logging.getLogger(__name__)
 
 
 def user_left_room(distributor, user, room_id):
-    return preserve_context_over_fn(
-        distributor.fire,
-        "user_left_room", user=user, room_id=room_id
-    )
+    with PreserveLoggingContext():
+        distributor.fire("user_left_room", user=user, room_id=room_id)
 
 
 def user_joined_room(distributor, user, room_id):
-    return preserve_context_over_fn(
-        distributor.fire,
-        "user_joined_room", user=user, room_id=room_id
-    )
+    with PreserveLoggingContext():
+        distributor.fire("user_joined_room", user=user, room_id=room_id)
 
 
 class Distributor(object):
diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py
index 9683cc7265..48c9f6802d 100644
--- a/synapse/util/logcontext.py
+++ b/synapse/util/logcontext.py
@@ -261,67 +261,6 @@ class PreserveLoggingContext(object):
                 )
 
 
-class _PreservingContextDeferred(defer.Deferred):
-    """A deferred that ensures that all callbacks and errbacks are called with
-    the given logging context.
-    """
-    def __init__(self, context):
-        self._log_context = context
-        defer.Deferred.__init__(self)
-
-    def addCallbacks(self, callback, errback=None,
-                     callbackArgs=None, callbackKeywords=None,
-                     errbackArgs=None, errbackKeywords=None):
-        callback = self._wrap_callback(callback)
-        errback = self._wrap_callback(errback)
-        return defer.Deferred.addCallbacks(
-            self, callback,
-            errback=errback,
-            callbackArgs=callbackArgs,
-            callbackKeywords=callbackKeywords,
-            errbackArgs=errbackArgs,
-            errbackKeywords=errbackKeywords,
-        )
-
-    def _wrap_callback(self, f):
-        def g(res, *args, **kwargs):
-            with PreserveLoggingContext(self._log_context):
-                res = f(res, *args, **kwargs)
-            return res
-        return g
-
-
-def preserve_context_over_fn(fn, *args, **kwargs):
-    """Takes a function and invokes it with the given arguments, but removes
-    and restores the current logging context while doing so.
-
-    If the result is a deferred, call preserve_context_over_deferred before
-    returning it.
-    """
-    with PreserveLoggingContext():
-        res = fn(*args, **kwargs)
-
-    if isinstance(res, defer.Deferred):
-        return preserve_context_over_deferred(res)
-    else:
-        return res
-
-
-def preserve_context_over_deferred(deferred, context=None):
-    """Given a deferred wrap it such that any callbacks added later to it will
-    be invoked with the current context.
-
-    Deprecated: this almost certainly doesn't do want you want, ie make
-    the deferred follow the synapse logcontext rules: try
-    ``make_deferred_yieldable`` instead.
-    """
-    if context is None:
-        context = LoggingContext.current_context()
-    d = _PreservingContextDeferred(context)
-    deferred.chainDeferred(d)
-    return d
-
-
 def preserve_fn(f):
     """Wraps a function, to ensure that the current context is restored after
     return from the function, and that the sentinel context is set once the
diff --git a/synapse/visibility.py b/synapse/visibility.py
index d7dbdc77ff..aaca2c584c 100644
--- a/synapse/visibility.py
+++ b/synapse/visibility.py
@@ -17,7 +17,7 @@ from twisted.internet import defer
 
 from synapse.api.constants import Membership, EventTypes
 
-from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
+from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
 
 import logging
 
@@ -58,7 +58,7 @@ def filter_events_for_clients(store, user_tuples, events, event_id_to_state,
         always_include_ids (set(event_id)): set of event ids to specifically
             include (unless sender is ignored)
     """
-    forgotten = yield preserve_context_over_deferred(defer.gatherResults([
+    forgotten = yield make_deferred_yieldable(defer.gatherResults([
         defer.maybeDeferred(
             preserve_fn(store.who_forgot_in_room),
             room_id,