summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorMatthew Hodgson <matthew@matrix.org>2017-11-30 01:51:38 +0000
committerMatthew Hodgson <matthew@matrix.org>2017-11-30 01:51:38 +0000
commitf397153dfc4020744e1cf8687abb01d4b7885a7a (patch)
tree342da238d1e7de6178731ad6a040bf9c3de16ce1 /synapse
parentspecify default user_directory_include_pattern (diff)
parentMerge pull request #2721 from matrix-org/rav/get_user_by_access_token_comments (diff)
downloadsynapse-f397153dfc4020744e1cf8687abb01d4b7885a7a.tar.xz
Merge branch 'develop' into matthew/search-all-local-users
Diffstat (limited to 'synapse')
-rw-r--r--synapse/api/auth.py6
-rwxr-xr-xsynapse/app/homeserver.py22
-rw-r--r--synapse/app/media_repository.py10
-rw-r--r--synapse/app/synchrotron.py3
-rw-r--r--synapse/config/server.py6
-rw-r--r--synapse/crypto/event_signing.py13
-rw-r--r--synapse/federation/transaction_queue.py10
-rw-r--r--synapse/handlers/auth.py48
-rw-r--r--synapse/handlers/deactivate_account.py52
-rw-r--r--synapse/handlers/device.py20
-rw-r--r--synapse/handlers/set_password.py56
-rw-r--r--synapse/http/endpoint.py6
-rw-r--r--synapse/http/server.py12
-rw-r--r--synapse/module_api/__init__.py14
-rw-r--r--synapse/notifier.py17
-rw-r--r--synapse/push/pusherpool.py24
-rw-r--r--synapse/replication/tcp/resource.py6
-rw-r--r--synapse/rest/client/v1/admin.py8
-rw-r--r--synapse/rest/client/v1/logout.py27
-rw-r--r--synapse/rest/client/v2_alpha/account.py10
-rw-r--r--synapse/rest/client/v2_alpha/groups.py22
-rw-r--r--synapse/rest/media/v1/preview_url_resource.py16
-rw-r--r--synapse/server.py52
-rw-r--r--synapse/server.pyi18
-rw-r--r--synapse/storage/_base.py23
-rw-r--r--synapse/storage/account_data.py85
-rw-r--r--synapse/storage/background_updates.py35
-rw-r--r--synapse/storage/media_repository.py16
-rw-r--r--synapse/storage/registration.py10
-rw-r--r--synapse/storage/schema/delta/44/expire_url_cache.sql5
-rw-r--r--synapse/storage/schema/delta/46/local_media_repository_url_idx.sql24
31 files changed, 497 insertions, 179 deletions
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index 72858cca1f..ac0a3655a5 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -270,7 +270,11 @@ class Auth(object):
             rights (str): The operation being performed; the access token must
                 allow this.
         Returns:
-            dict : dict that includes the user and the ID of their access token.
+            Deferred[dict]: dict that includes:
+               `user` (UserID)
+               `is_guest` (bool)
+               `token_id` (int|None): access token id. May be None if guest
+               `device_id` (str|None): device corresponding to access token
         Raises:
             AuthError if no user by that token exists or the token is invalid.
         """
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 9e26146338..6b8875afb4 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -43,7 +43,6 @@ from synapse.rest import ClientRestResource
 from synapse.rest.key.v1.server_key_resource import LocalKey
 from synapse.rest.key.v2 import KeyApiV2Resource
 from synapse.rest.media.v0.content_repository import ContentRepoResource
-from synapse.rest.media.v1.media_repository import MediaRepositoryResource
 from synapse.server import HomeServer
 from synapse.storage import are_all_users_on_domain
 from synapse.storage.engines import IncorrectDatabaseSetup, create_engine
@@ -195,14 +194,19 @@ class SynapseHomeServer(HomeServer):
             })
 
         if name in ["media", "federation", "client"]:
-            media_repo = MediaRepositoryResource(self)
-            resources.update({
-                MEDIA_PREFIX: media_repo,
-                LEGACY_MEDIA_PREFIX: media_repo,
-                CONTENT_REPO_PREFIX: ContentRepoResource(
-                    self, self.config.uploads_path
-                ),
-            })
+            if self.get_config().enable_media_repo:
+                media_repo = self.get_media_repository_resource()
+                resources.update({
+                    MEDIA_PREFIX: media_repo,
+                    LEGACY_MEDIA_PREFIX: media_repo,
+                    CONTENT_REPO_PREFIX: ContentRepoResource(
+                        self, self.config.uploads_path
+                    ),
+                })
+            elif name == "media":
+                raise ConfigError(
+                    "'media' resource conflicts with enable_media_repo=False",
+                )
 
         if name in ["keys", "federation"]:
             resources.update({
diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py
index 36c18bdbcb..c4e5f0965d 100644
--- a/synapse/app/media_repository.py
+++ b/synapse/app/media_repository.py
@@ -35,7 +35,6 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto
 from synapse.replication.slave.storage.transactions import TransactionStore
 from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.rest.media.v0.content_repository import ContentRepoResource
-from synapse.rest.media.v1.media_repository import MediaRepositoryResource
 from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
 from synapse.storage.media_repository import MediaRepositoryStore
@@ -89,7 +88,7 @@ class MediaRepositoryServer(HomeServer):
                 if name == "metrics":
                     resources[METRICS_PREFIX] = MetricsResource(self)
                 elif name == "media":
-                    media_repo = MediaRepositoryResource(self)
+                    media_repo = self.get_media_repository_resource()
                     resources.update({
                         MEDIA_PREFIX: media_repo,
                         LEGACY_MEDIA_PREFIX: media_repo,
@@ -151,6 +150,13 @@ def start(config_options):
 
     assert config.worker_app == "synapse.app.media_repository"
 
+    if config.enable_media_repo:
+        _base.quit_with_error(
+            "enable_media_repo must be disabled in the main synapse process\n"
+            "before the media repo can be run in a separate worker.\n"
+            "Please add ``enable_media_repo: false`` to the main config\n"
+        )
+
     setup_logging(config, use_worker_options=True)
 
     events.USE_FROZEN_DICTS = config.use_frozen_dicts
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 576ac6fb7e..323fddee21 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -340,11 +340,10 @@ class SyncReplicationHandler(ReplicationClientHandler):
 
         self.store = hs.get_datastore()
         self.typing_handler = hs.get_typing_handler()
+        # NB this is a SynchrotronPresence, not a normal PresenceHandler
         self.presence_handler = hs.get_presence_handler()
         self.notifier = hs.get_notifier()
 
-        self.presence_handler.sync_callback = self.send_user_sync
-
     def on_rdata(self, stream_name, token, rows):
         super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows)
 
diff --git a/synapse/config/server.py b/synapse/config/server.py
index 4d9193536d..edb90a1348 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -41,6 +41,12 @@ class ServerConfig(Config):
         # false only if we are updating the user directory in a worker
         self.update_user_directory = config.get("update_user_directory", True)
 
+        # whether to enable the media repository endpoints. This should be set
+        # to false if the media repository is running as a separate endpoint;
+        # doing so ensures that we will not run cache cleanup jobs on the
+        # master, potentially causing inconsistency.
+        self.enable_media_repo = config.get("enable_media_repo", True)
+
         self.filter_timeline_limit = config.get("filter_timeline_limit", -1)
 
         # Whether we should block invites sent to users on this server
diff --git a/synapse/crypto/event_signing.py b/synapse/crypto/event_signing.py
index 0d0e7b5286..aaa3efaca3 100644
--- a/synapse/crypto/event_signing.py
+++ b/synapse/crypto/event_signing.py
@@ -32,15 +32,22 @@ def check_event_content_hash(event, hash_algorithm=hashlib.sha256):
     """Check whether the hash for this PDU matches the contents"""
     name, expected_hash = compute_content_hash(event, hash_algorithm)
     logger.debug("Expecting hash: %s", encode_base64(expected_hash))
-    if name not in event.hashes:
+
+    # some malformed events lack a 'hashes'. Protect against it being missing
+    # or a weird type by basically treating it the same as an unhashed event.
+    hashes = event.get("hashes")
+    if not isinstance(hashes, dict):
+        raise SynapseError(400, "Malformed 'hashes'", Codes.UNAUTHORIZED)
+
+    if name not in hashes:
         raise SynapseError(
             400,
             "Algorithm %s not in hashes %s" % (
-                name, list(event.hashes),
+                name, list(hashes),
             ),
             Codes.UNAUTHORIZED,
         )
-    message_hash_base64 = event.hashes[name]
+    message_hash_base64 = hashes[name]
     try:
         message_hash_bytes = decode_base64(message_hash_base64)
     except Exception:
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 7a3c9cbb70..3e7809b04f 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -20,7 +20,7 @@ from .persistence import TransactionActions
 from .units import Transaction, Edu
 
 from synapse.api.errors import HttpResponseException
-from synapse.util import logcontext
+from synapse.util import logcontext, PreserveLoggingContext
 from synapse.util.async import run_on_reactor
 from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
 from synapse.util.metrics import measure_func
@@ -146,7 +146,6 @@ class TransactionQueue(object):
         else:
             return not destination.startswith("localhost")
 
-    @defer.inlineCallbacks
     def notify_new_events(self, current_id):
         """This gets called when we have some new events we might want to
         send out to other servers.
@@ -156,6 +155,13 @@ class TransactionQueue(object):
         if self._is_processing:
             return
 
+        # fire off a processing loop in the background. It's likely it will
+        # outlast the current request, so run it in the sentinel logcontext.
+        with PreserveLoggingContext():
+            self._process_event_queue_loop()
+
+    @defer.inlineCallbacks
+    def _process_event_queue_loop(self):
         try:
             self._is_processing = True
             while True:
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 080eb14271..2f30f183ce 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -650,41 +650,6 @@ class AuthHandler(BaseHandler):
             raise AuthError(403, "Invalid token", errcode=Codes.FORBIDDEN)
 
     @defer.inlineCallbacks
-    def set_password(self, user_id, newpassword, requester=None):
-        password_hash = self.hash(newpassword)
-
-        except_access_token_id = requester.access_token_id if requester else None
-
-        try:
-            yield self.store.user_set_password_hash(user_id, password_hash)
-        except StoreError as e:
-            if e.code == 404:
-                raise SynapseError(404, "Unknown user", Codes.NOT_FOUND)
-            raise e
-        yield self.delete_access_tokens_for_user(
-            user_id, except_token_id=except_access_token_id,
-        )
-        yield self.hs.get_pusherpool().remove_pushers_by_user(
-            user_id, except_access_token_id
-        )
-
-    @defer.inlineCallbacks
-    def deactivate_account(self, user_id):
-        """Deactivate a user's account
-
-        Args:
-            user_id (str): ID of user to be deactivated
-
-        Returns:
-            Deferred
-        """
-        # FIXME: Theoretically there is a race here wherein user resets
-        # password using threepid.
-        yield self.delete_access_tokens_for_user(user_id)
-        yield self.store.user_delete_threepids(user_id)
-        yield self.store.user_set_password_hash(user_id, None)
-
-    @defer.inlineCallbacks
     def delete_access_token(self, access_token):
         """Invalidate a single access token
 
@@ -706,6 +671,12 @@ class AuthHandler(BaseHandler):
                     access_token=access_token,
                 )
 
+        # delete pushers associated with this access token
+        if user_info["token_id"] is not None:
+            yield self.hs.get_pusherpool().remove_pushers_by_access_token(
+                str(user_info["user"]), (user_info["token_id"], )
+            )
+
     @defer.inlineCallbacks
     def delete_access_tokens_for_user(self, user_id, except_token_id=None,
                                       device_id=None):
@@ -728,13 +699,18 @@ class AuthHandler(BaseHandler):
         # see if any of our auth providers want to know about this
         for provider in self.password_providers:
             if hasattr(provider, "on_logged_out"):
-                for token, device_id in tokens_and_devices:
+                for token, token_id, device_id in tokens_and_devices:
                     yield provider.on_logged_out(
                         user_id=user_id,
                         device_id=device_id,
                         access_token=token,
                     )
 
+        # delete pushers associated with the access tokens
+        yield self.hs.get_pusherpool().remove_pushers_by_access_token(
+            user_id, (token_id for _, token_id, _ in tokens_and_devices),
+        )
+
     @defer.inlineCallbacks
     def add_threepid(self, user_id, medium, address, validated_at):
         # 'Canonicalise' email addresses down to lower case.
diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py
new file mode 100644
index 0000000000..b1d3814909
--- /dev/null
+++ b/synapse/handlers/deactivate_account.py
@@ -0,0 +1,52 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from twisted.internet import defer
+
+from ._base import BaseHandler
+
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+class DeactivateAccountHandler(BaseHandler):
+    """Handler which deals with deactivating user accounts."""
+    def __init__(self, hs):
+        super(DeactivateAccountHandler, self).__init__(hs)
+        self._auth_handler = hs.get_auth_handler()
+        self._device_handler = hs.get_device_handler()
+
+    @defer.inlineCallbacks
+    def deactivate_account(self, user_id):
+        """Deactivate a user's account
+
+        Args:
+            user_id (str): ID of user to be deactivated
+
+        Returns:
+            Deferred
+        """
+        # FIXME: Theoretically there is a race here wherein user resets
+        # password using threepid.
+
+        # first delete any devices belonging to the user, which will also
+        # delete corresponding access tokens.
+        yield self._device_handler.delete_all_devices_for_user(user_id)
+        # then delete any remaining access tokens which weren't associated with
+        # a device.
+        yield self._auth_handler.delete_access_tokens_for_user(user_id)
+
+        yield self.store.user_delete_threepids(user_id)
+        yield self.store.user_set_password_hash(user_id, None)
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 579d8477ba..2152efc692 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -171,12 +171,30 @@ class DeviceHandler(BaseHandler):
         yield self.notify_device_update(user_id, [device_id])
 
     @defer.inlineCallbacks
+    def delete_all_devices_for_user(self, user_id, except_device_id=None):
+        """Delete all of the user's devices
+
+        Args:
+            user_id (str):
+            except_device_id (str|None): optional device id which should not
+                be deleted
+
+        Returns:
+            defer.Deferred:
+        """
+        device_map = yield self.store.get_devices_by_user(user_id)
+        device_ids = device_map.keys()
+        if except_device_id is not None:
+            device_ids = [d for d in device_ids if d != except_device_id]
+        yield self.delete_devices(user_id, device_ids)
+
+    @defer.inlineCallbacks
     def delete_devices(self, user_id, device_ids):
         """ Delete several devices
 
         Args:
             user_id (str):
-            device_ids (str): The list of device IDs to delete
+            device_ids (List[str]): The list of device IDs to delete
 
         Returns:
             defer.Deferred:
diff --git a/synapse/handlers/set_password.py b/synapse/handlers/set_password.py
new file mode 100644
index 0000000000..44414e1dc1
--- /dev/null
+++ b/synapse/handlers/set_password.py
@@ -0,0 +1,56 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+
+from twisted.internet import defer
+
+from synapse.api.errors import Codes, StoreError, SynapseError
+from ._base import BaseHandler
+
+logger = logging.getLogger(__name__)
+
+
+class SetPasswordHandler(BaseHandler):
+    """Handler which deals with changing user account passwords"""
+    def __init__(self, hs):
+        super(SetPasswordHandler, self).__init__(hs)
+        self._auth_handler = hs.get_auth_handler()
+        self._device_handler = hs.get_device_handler()
+
+    @defer.inlineCallbacks
+    def set_password(self, user_id, newpassword, requester=None):
+        password_hash = self._auth_handler.hash(newpassword)
+
+        except_device_id = requester.device_id if requester else None
+        except_access_token_id = requester.access_token_id if requester else None
+
+        try:
+            yield self.store.user_set_password_hash(user_id, password_hash)
+        except StoreError as e:
+            if e.code == 404:
+                raise SynapseError(404, "Unknown user", Codes.NOT_FOUND)
+            raise e
+
+        # we want to log out all of the user's other sessions. First delete
+        # all his other devices.
+        yield self._device_handler.delete_all_devices_for_user(
+            user_id, except_device_id=except_device_id,
+        )
+
+        # and now delete any access tokens which weren't associated with
+        # devices (or were associated with this device).
+        yield self._auth_handler.delete_access_tokens_for_user(
+            user_id, except_token_id=except_access_token_id,
+        )
diff --git a/synapse/http/endpoint.py b/synapse/http/endpoint.py
index a97532162f..e2b99ef3bd 100644
--- a/synapse/http/endpoint.py
+++ b/synapse/http/endpoint.py
@@ -362,8 +362,10 @@ def _get_hosts_for_srv_record(dns_client, host):
         return res
 
     # no logcontexts here, so we can safely fire these off and gatherResults
-    d1 = dns_client.lookupAddress(host).addCallbacks(cb, eb)
-    d2 = dns_client.lookupIPV6Address(host).addCallbacks(cb, eb)
+    d1 = dns_client.lookupAddress(host).addCallbacks(
+        cb, eb, errbackArgs=("A", ))
+    d2 = dns_client.lookupIPV6Address(host).addCallbacks(
+        cb, eb, errbackArgs=("AAAA", ))
     results = yield defer.DeferredList(
         [d1, d2], consumeErrors=True)
 
diff --git a/synapse/http/server.py b/synapse/http/server.py
index 3ca1c9947c..25466cd292 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -28,6 +28,7 @@ from canonicaljson import (
 )
 
 from twisted.internet import defer
+from twisted.python import failure
 from twisted.web import server, resource
 from twisted.web.server import NOT_DONE_YET
 from twisted.web.util import redirectTo
@@ -131,12 +132,17 @@ def wrap_request_handler(request_handler, include_metrics=False):
                             version_string=self.version_string,
                         )
                     except Exception:
-                        logger.exception(
-                            "Failed handle request %s.%s on %r: %r",
+                        # failure.Failure() fishes the original Failure out
+                        # of our stack, and thus gives us a sensible stack
+                        # trace.
+                        f = failure.Failure()
+                        logger.error(
+                            "Failed handle request %s.%s on %r: %r: %s",
                             request_handler.__module__,
                             request_handler.__name__,
                             self,
-                            request
+                            request,
+                            f.getTraceback().rstrip(),
                         )
                         respond_with_json(
                             request,
diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py
index dc680ddf43..097c844d31 100644
--- a/synapse/module_api/__init__.py
+++ b/synapse/module_api/__init__.py
@@ -12,6 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+from twisted.internet import defer
 
 from synapse.types import UserID
 
@@ -81,6 +82,7 @@ class ModuleApi(object):
         reg = self.hs.get_handlers().registration_handler
         return reg.register(localpart=localpart)
 
+    @defer.inlineCallbacks
     def invalidate_access_token(self, access_token):
         """Invalidate an access token for a user
 
@@ -94,8 +96,16 @@ class ModuleApi(object):
         Raises:
             synapse.api.errors.AuthError: the access token is invalid
         """
-
-        return self._auth_handler.delete_access_token(access_token)
+        # see if the access token corresponds to a device
+        user_info = yield self._auth.get_user_by_access_token(access_token)
+        device_id = user_info.get("device_id")
+        user_id = user_info["user"].to_string()
+        if device_id:
+            # delete the device, which will also delete its access tokens
+            yield self.hs.get_device_handler().delete_device(user_id, device_id)
+        else:
+            # no associated device. Just delete the access token.
+            yield self._auth_handler.delete_access_token(access_token)
 
     def run_db_interaction(self, desc, func, *args, **kwargs):
         """Run a function with a database connection
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 626da778cd..ef042681bc 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -255,9 +255,7 @@ class Notifier(object):
         )
 
         if self.federation_sender:
-            preserve_fn(self.federation_sender.notify_new_events)(
-                room_stream_id
-            )
+            self.federation_sender.notify_new_events(room_stream_id)
 
         if event.type == EventTypes.Member and event.membership == Membership.JOIN:
             self._user_joined_room(event.state_key, event.room_id)
@@ -297,8 +295,7 @@ class Notifier(object):
     def on_new_replication_data(self):
         """Used to inform replication listeners that something has happend
         without waking up any of the normal user event streams"""
-        with PreserveLoggingContext():
-            self.notify_replication()
+        self.notify_replication()
 
     @defer.inlineCallbacks
     def wait_for_events(self, user_id, timeout, callback, room_ids=None,
@@ -516,8 +513,14 @@ class Notifier(object):
             self.replication_deferred = ObservableDeferred(defer.Deferred())
             deferred.callback(None)
 
-        for cb in self.replication_callbacks:
-            preserve_fn(cb)()
+            # the callbacks may well outlast the current request, so we run
+            # them in the sentinel logcontext.
+            #
+            # (ideally it would be up to the callbacks to know if they were
+            # starting off background processes and drop the logcontext
+            # accordingly, but that requires more changes)
+            for cb in self.replication_callbacks:
+                cb()
 
     @defer.inlineCallbacks
     def wait_for_replication(self, callback, timeout):
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 34cb108dcb..134e89b371 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -103,19 +103,25 @@ class PusherPool:
                 yield self.remove_pusher(p['app_id'], p['pushkey'], p['user_name'])
 
     @defer.inlineCallbacks
-    def remove_pushers_by_user(self, user_id, except_access_token_id=None):
-        all = yield self.store.get_all_pushers()
-        logger.info(
-            "Removing all pushers for user %s except access tokens id %r",
-            user_id, except_access_token_id
-        )
-        for p in all:
-            if p['user_name'] == user_id and p['access_token'] != except_access_token_id:
+    def remove_pushers_by_access_token(self, user_id, access_tokens):
+        """Remove the pushers for a given user corresponding to a set of
+        access_tokens.
+
+        Args:
+            user_id (str): user to remove pushers for
+            access_tokens (Iterable[int]): access token *ids* to remove pushers
+                for
+        """
+        tokens = set(access_tokens)
+        for p in (yield self.store.get_pushers_by_user_id(user_id)):
+            if p['access_token'] in tokens:
                 logger.info(
                     "Removing pusher for app id %s, pushkey %s, user %s",
                     p['app_id'], p['pushkey'], p['user_name']
                 )
-                yield self.remove_pusher(p['app_id'], p['pushkey'], p['user_name'])
+                yield self.remove_pusher(
+                    p['app_id'], p['pushkey'], p['user_name'],
+                )
 
     @defer.inlineCallbacks
     def on_new_notifications(self, min_stream_id, max_stream_id):
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 1d03e79b85..786c3fe864 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -216,11 +216,12 @@ class ReplicationStreamer(object):
             self.federation_sender.federation_ack(token)
 
     @measure_func("repl.on_user_sync")
+    @defer.inlineCallbacks
     def on_user_sync(self, conn_id, user_id, is_syncing, last_sync_ms):
         """A client has started/stopped syncing on a worker.
         """
         user_sync_counter.inc()
-        self.presence_handler.update_external_syncs_row(
+        yield self.presence_handler.update_external_syncs_row(
             conn_id, user_id, is_syncing, last_sync_ms,
         )
 
@@ -244,11 +245,12 @@ class ReplicationStreamer(object):
         getattr(self.store, cache_func).invalidate(tuple(keys))
 
     @measure_func("repl.on_user_ip")
+    @defer.inlineCallbacks
     def on_user_ip(self, user_id, access_token, ip, user_agent, device_id, last_seen):
         """The client saw a user request
         """
         user_ip_cache_counter.inc()
-        self.store.insert_client_ip(
+        yield self.store.insert_client_ip(
             user_id, access_token, ip, user_agent, device_id, last_seen,
         )
 
diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py
index 1197158fdc..5022808ea9 100644
--- a/synapse/rest/client/v1/admin.py
+++ b/synapse/rest/client/v1/admin.py
@@ -137,8 +137,8 @@ class DeactivateAccountRestServlet(ClientV1RestServlet):
     PATTERNS = client_path_patterns("/admin/deactivate/(?P<target_user_id>[^/]*)")
 
     def __init__(self, hs):
-        self._auth_handler = hs.get_auth_handler()
         super(DeactivateAccountRestServlet, self).__init__(hs)
+        self._deactivate_account_handler = hs.get_deactivate_account_handler()
 
     @defer.inlineCallbacks
     def on_POST(self, request, target_user_id):
@@ -149,7 +149,7 @@ class DeactivateAccountRestServlet(ClientV1RestServlet):
         if not is_admin:
             raise AuthError(403, "You are not a server admin")
 
-        yield self._auth_handler.deactivate_account(target_user_id)
+        yield self._deactivate_account_handler.deactivate_account(target_user_id)
         defer.returnValue((200, {}))
 
 
@@ -309,7 +309,7 @@ class ResetPasswordRestServlet(ClientV1RestServlet):
         super(ResetPasswordRestServlet, self).__init__(hs)
         self.hs = hs
         self.auth = hs.get_auth()
-        self.auth_handler = hs.get_auth_handler()
+        self._set_password_handler = hs.get_set_password_handler()
 
     @defer.inlineCallbacks
     def on_POST(self, request, target_user_id):
@@ -330,7 +330,7 @@ class ResetPasswordRestServlet(ClientV1RestServlet):
 
         logger.info("new_password: %r", new_password)
 
-        yield self.auth_handler.set_password(
+        yield self._set_password_handler.set_password(
             target_user_id, new_password, requester
         )
         defer.returnValue((200, {}))
diff --git a/synapse/rest/client/v1/logout.py b/synapse/rest/client/v1/logout.py
index 6add754782..ca49955935 100644
--- a/synapse/rest/client/v1/logout.py
+++ b/synapse/rest/client/v1/logout.py
@@ -16,6 +16,7 @@
 from twisted.internet import defer
 
 from synapse.api.auth import get_access_token_from_request
+from synapse.api.errors import AuthError
 
 from .base import ClientV1RestServlet, client_path_patterns
 
@@ -30,15 +31,30 @@ class LogoutRestServlet(ClientV1RestServlet):
 
     def __init__(self, hs):
         super(LogoutRestServlet, self).__init__(hs)
+        self._auth = hs.get_auth()
         self._auth_handler = hs.get_auth_handler()
+        self._device_handler = hs.get_device_handler()
 
     def on_OPTIONS(self, request):
         return (200, {})
 
     @defer.inlineCallbacks
     def on_POST(self, request):
-        access_token = get_access_token_from_request(request)
-        yield self._auth_handler.delete_access_token(access_token)
+        try:
+            requester = yield self.auth.get_user_by_req(request)
+        except AuthError:
+            # this implies the access token has already been deleted.
+            pass
+        else:
+            if requester.device_id is None:
+                # the acccess token wasn't associated with a device.
+                # Just delete the access token
+                access_token = get_access_token_from_request(request)
+                yield self._auth_handler.delete_access_token(access_token)
+            else:
+                yield self._device_handler.delete_device(
+                    requester.user.to_string(), requester.device_id)
+
         defer.returnValue((200, {}))
 
 
@@ -49,6 +65,7 @@ class LogoutAllRestServlet(ClientV1RestServlet):
         super(LogoutAllRestServlet, self).__init__(hs)
         self.auth = hs.get_auth()
         self._auth_handler = hs.get_auth_handler()
+        self._device_handler = hs.get_device_handler()
 
     def on_OPTIONS(self, request):
         return (200, {})
@@ -57,6 +74,12 @@ class LogoutAllRestServlet(ClientV1RestServlet):
     def on_POST(self, request):
         requester = yield self.auth.get_user_by_req(request)
         user_id = requester.user.to_string()
+
+        # first delete all of the user's devices
+        yield self._device_handler.delete_all_devices_for_user(user_id)
+
+        # .. and then delete any access tokens which weren't associated with
+        # devices.
         yield self._auth_handler.delete_access_tokens_for_user(user_id)
         defer.returnValue((200, {}))
 
diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py
index 726e0a2826..c26ce63bcf 100644
--- a/synapse/rest/client/v2_alpha/account.py
+++ b/synapse/rest/client/v2_alpha/account.py
@@ -98,6 +98,7 @@ class PasswordRestServlet(RestServlet):
         self.auth = hs.get_auth()
         self.auth_handler = hs.get_auth_handler()
         self.datastore = self.hs.get_datastore()
+        self._set_password_handler = hs.get_set_password_handler()
 
     @defer.inlineCallbacks
     def on_POST(self, request):
@@ -147,7 +148,7 @@ class PasswordRestServlet(RestServlet):
             raise SynapseError(400, "", Codes.MISSING_PARAM)
         new_password = params['new_password']
 
-        yield self.auth_handler.set_password(
+        yield self._set_password_handler.set_password(
             user_id, new_password, requester
         )
 
@@ -161,10 +162,11 @@ class DeactivateAccountRestServlet(RestServlet):
     PATTERNS = client_v2_patterns("/account/deactivate$")
 
     def __init__(self, hs):
+        super(DeactivateAccountRestServlet, self).__init__()
         self.hs = hs
         self.auth = hs.get_auth()
         self.auth_handler = hs.get_auth_handler()
-        super(DeactivateAccountRestServlet, self).__init__()
+        self._deactivate_account_handler = hs.get_deactivate_account_handler()
 
     @defer.inlineCallbacks
     def on_POST(self, request):
@@ -179,7 +181,7 @@ class DeactivateAccountRestServlet(RestServlet):
 
         # allow ASes to dectivate their own users
         if requester and requester.app_service:
-            yield self.auth_handler.deactivate_account(
+            yield self._deactivate_account_handler.deactivate_account(
                 requester.user.to_string()
             )
             defer.returnValue((200, {}))
@@ -206,7 +208,7 @@ class DeactivateAccountRestServlet(RestServlet):
             logger.error("Auth succeeded but no known type!", result.keys())
             raise SynapseError(500, "", Codes.UNKNOWN)
 
-        yield self.auth_handler.deactivate_account(user_id)
+        yield self._deactivate_account_handler.deactivate_account(user_id)
         defer.returnValue((200, {}))
 
 
diff --git a/synapse/rest/client/v2_alpha/groups.py b/synapse/rest/client/v2_alpha/groups.py
index 089ec71c81..f762dbfa9a 100644
--- a/synapse/rest/client/v2_alpha/groups.py
+++ b/synapse/rest/client/v2_alpha/groups.py
@@ -38,7 +38,7 @@ class GroupServlet(RestServlet):
 
     @defer.inlineCallbacks
     def on_GET(self, request, group_id):
-        requester = yield self.auth.get_user_by_req(request)
+        requester = yield self.auth.get_user_by_req(request, allow_guest=True)
         requester_user_id = requester.user.to_string()
 
         group_description = yield self.groups_handler.get_group_profile(
@@ -74,7 +74,7 @@ class GroupSummaryServlet(RestServlet):
 
     @defer.inlineCallbacks
     def on_GET(self, request, group_id):
-        requester = yield self.auth.get_user_by_req(request)
+        requester = yield self.auth.get_user_by_req(request, allow_guest=True)
         requester_user_id = requester.user.to_string()
 
         get_group_summary = yield self.groups_handler.get_group_summary(
@@ -148,7 +148,7 @@ class GroupCategoryServlet(RestServlet):
 
     @defer.inlineCallbacks
     def on_GET(self, request, group_id, category_id):
-        requester = yield self.auth.get_user_by_req(request)
+        requester = yield self.auth.get_user_by_req(request, allow_guest=True)
         requester_user_id = requester.user.to_string()
 
         category = yield self.groups_handler.get_group_category(
@@ -200,7 +200,7 @@ class GroupCategoriesServlet(RestServlet):
 
     @defer.inlineCallbacks
     def on_GET(self, request, group_id):
-        requester = yield self.auth.get_user_by_req(request)
+        requester = yield self.auth.get_user_by_req(request, allow_guest=True)
         requester_user_id = requester.user.to_string()
 
         category = yield self.groups_handler.get_group_categories(
@@ -225,7 +225,7 @@ class GroupRoleServlet(RestServlet):
 
     @defer.inlineCallbacks
     def on_GET(self, request, group_id, role_id):
-        requester = yield self.auth.get_user_by_req(request)
+        requester = yield self.auth.get_user_by_req(request, allow_guest=True)
         requester_user_id = requester.user.to_string()
 
         category = yield self.groups_handler.get_group_role(
@@ -277,7 +277,7 @@ class GroupRolesServlet(RestServlet):
 
     @defer.inlineCallbacks
     def on_GET(self, request, group_id):
-        requester = yield self.auth.get_user_by_req(request)
+        requester = yield self.auth.get_user_by_req(request, allow_guest=True)
         requester_user_id = requester.user.to_string()
 
         category = yield self.groups_handler.get_group_roles(
@@ -348,7 +348,7 @@ class GroupRoomServlet(RestServlet):
 
     @defer.inlineCallbacks
     def on_GET(self, request, group_id):
-        requester = yield self.auth.get_user_by_req(request)
+        requester = yield self.auth.get_user_by_req(request, allow_guest=True)
         requester_user_id = requester.user.to_string()
 
         result = yield self.groups_handler.get_rooms_in_group(group_id, requester_user_id)
@@ -369,7 +369,7 @@ class GroupUsersServlet(RestServlet):
 
     @defer.inlineCallbacks
     def on_GET(self, request, group_id):
-        requester = yield self.auth.get_user_by_req(request)
+        requester = yield self.auth.get_user_by_req(request, allow_guest=True)
         requester_user_id = requester.user.to_string()
 
         result = yield self.groups_handler.get_users_in_group(group_id, requester_user_id)
@@ -672,7 +672,7 @@ class PublicisedGroupsForUserServlet(RestServlet):
 
     @defer.inlineCallbacks
     def on_GET(self, request, user_id):
-        yield self.auth.get_user_by_req(request)
+        yield self.auth.get_user_by_req(request, allow_guest=True)
 
         result = yield self.groups_handler.get_publicised_groups_for_user(
             user_id
@@ -697,7 +697,7 @@ class PublicisedGroupsForUsersServlet(RestServlet):
 
     @defer.inlineCallbacks
     def on_POST(self, request):
-        yield self.auth.get_user_by_req(request)
+        yield self.auth.get_user_by_req(request, allow_guest=True)
 
         content = parse_json_object_from_request(request)
         user_ids = content["user_ids"]
@@ -724,7 +724,7 @@ class GroupsForUserServlet(RestServlet):
 
     @defer.inlineCallbacks
     def on_GET(self, request):
-        requester = yield self.auth.get_user_by_req(request)
+        requester = yield self.auth.get_user_by_req(request, allow_guest=True)
         requester_user_id = requester.user.to_string()
 
         result = yield self.groups_handler.get_joined_groups(requester_user_id)
diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py
index 723f7043f4..40d2e664eb 100644
--- a/synapse/rest/media/v1/preview_url_resource.py
+++ b/synapse/rest/media/v1/preview_url_resource.py
@@ -25,7 +25,8 @@ from synapse.util.stringutils import random_string
 from synapse.util.caches.expiringcache import ExpiringCache
 from synapse.http.client import SpiderHttpClient
 from synapse.http.server import (
-    request_handler, respond_with_json_bytes
+    request_handler, respond_with_json_bytes,
+    respond_with_json,
 )
 from synapse.util.async import ObservableDeferred
 from synapse.util.stringutils import is_ascii
@@ -78,6 +79,9 @@ class PreviewUrlResource(Resource):
             self._expire_url_cache_data, 10 * 1000
         )
 
+    def render_OPTIONS(self, request):
+        return respond_with_json(request, 200, {}, send_cors=True)
+
     def render_GET(self, request):
         self._async_render_GET(request)
         return NOT_DONE_YET
@@ -348,11 +352,16 @@ class PreviewUrlResource(Resource):
     def _expire_url_cache_data(self):
         """Clean up expired url cache content, media and thumbnails.
         """
-
         # TODO: Delete from backup media store
 
         now = self.clock.time_msec()
 
+        logger.info("Running url preview cache expiry")
+
+        if not (yield self.store.has_completed_background_updates()):
+            logger.info("Still running DB updates; skipping expiry")
+            return
+
         # First we delete expired url cache entries
         media_ids = yield self.store.get_expired_url_cache(now)
 
@@ -426,8 +435,7 @@ class PreviewUrlResource(Resource):
 
         yield self.store.delete_url_cache_media(removed_media)
 
-        if removed_media:
-            logger.info("Deleted %d media from url cache", len(removed_media))
+        logger.info("Deleted %d media from url cache", len(removed_media))
 
 
 def decode_and_calc_og(body, media_uri, request_encoding=None):
diff --git a/synapse/server.py b/synapse/server.py
index 6de33019d2..99693071b6 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -39,11 +39,13 @@ from synapse.federation.transaction_queue import TransactionQueue
 from synapse.handlers import Handlers
 from synapse.handlers.appservice import ApplicationServicesHandler
 from synapse.handlers.auth import AuthHandler, MacaroonGeneartor
+from synapse.handlers.deactivate_account import DeactivateAccountHandler
 from synapse.handlers.devicemessage import DeviceMessageHandler
 from synapse.handlers.device import DeviceHandler
 from synapse.handlers.e2e_keys import E2eKeysHandler
 from synapse.handlers.presence import PresenceHandler
 from synapse.handlers.room_list import RoomListHandler
+from synapse.handlers.set_password import SetPasswordHandler
 from synapse.handlers.sync import SyncHandler
 from synapse.handlers.typing import TypingHandler
 from synapse.handlers.events import EventHandler, EventStreamHandler
@@ -60,7 +62,10 @@ from synapse.http.matrixfederationclient import MatrixFederationHttpClient
 from synapse.notifier import Notifier
 from synapse.push.action_generator import ActionGenerator
 from synapse.push.pusherpool import PusherPool
-from synapse.rest.media.v1.media_repository import MediaRepository
+from synapse.rest.media.v1.media_repository import (
+    MediaRepository,
+    MediaRepositoryResource,
+)
 from synapse.state import StateHandler
 from synapse.storage import DataStore
 from synapse.streams.events import EventSources
@@ -90,17 +95,12 @@ class HomeServer(object):
     """
 
     DEPENDENCIES = [
-        'config',
-        'clock',
         'http_client',
         'db_pool',
-        'persistence_service',
         'replication_layer',
-        'datastore',
         'handlers',
         'v1auth',
         'auth',
-        'rest_servlet_factory',
         'state_handler',
         'presence_handler',
         'sync_handler',
@@ -117,19 +117,10 @@ class HomeServer(object):
         'application_service_handler',
         'device_message_handler',
         'profile_handler',
+        'deactivate_account_handler',
+        'set_password_handler',
         'notifier',
-        'distributor',
-        'client_resource',
-        'resource_for_federation',
-        'resource_for_static_content',
-        'resource_for_web_client',
-        'resource_for_content_repo',
-        'resource_for_server_key',
-        'resource_for_server_key_v2',
-        'resource_for_media_repository',
-        'resource_for_metrics',
         'event_sources',
-        'ratelimiter',
         'keyring',
         'pusherpool',
         'event_builder_factory',
@@ -137,6 +128,7 @@ class HomeServer(object):
         'http_client_context_factory',
         'simple_http_client',
         'media_repository',
+        'media_repository_resource',
         'federation_transport_client',
         'federation_sender',
         'receipts_handler',
@@ -183,6 +175,21 @@ class HomeServer(object):
     def is_mine_id(self, string):
         return string.split(":", 1)[1] == self.hostname
 
+    def get_clock(self):
+        return self.clock
+
+    def get_datastore(self):
+        return self.datastore
+
+    def get_config(self):
+        return self.config
+
+    def get_distributor(self):
+        return self.distributor
+
+    def get_ratelimiter(self):
+        return self.ratelimiter
+
     def build_replication_layer(self):
         return initialize_http_replication(self)
 
@@ -265,6 +272,12 @@ class HomeServer(object):
     def build_profile_handler(self):
         return ProfileHandler(self)
 
+    def build_deactivate_account_handler(self):
+        return DeactivateAccountHandler(self)
+
+    def build_set_password_handler(self):
+        return SetPasswordHandler(self)
+
     def build_event_sources(self):
         return EventSources(self)
 
@@ -294,6 +307,11 @@ class HomeServer(object):
             **self.db_config.get("args", {})
         )
 
+    def build_media_repository_resource(self):
+        # build the media repo resource. This indirects through the HomeServer
+        # to ensure that we only have a single instance of
+        return MediaRepositoryResource(self)
+
     def build_media_repository(self):
         return MediaRepository(self)
 
diff --git a/synapse/server.pyi b/synapse/server.pyi
index e8c0386b7f..41416ef252 100644
--- a/synapse/server.pyi
+++ b/synapse/server.pyi
@@ -3,10 +3,14 @@ import synapse.federation.transaction_queue
 import synapse.federation.transport.client
 import synapse.handlers
 import synapse.handlers.auth
+import synapse.handlers.deactivate_account
 import synapse.handlers.device
 import synapse.handlers.e2e_keys
-import synapse.storage
+import synapse.handlers.set_password
+import synapse.rest.media.v1.media_repository
 import synapse.state
+import synapse.storage
+
 
 class HomeServer(object):
     def get_auth(self) -> synapse.api.auth.Auth:
@@ -30,8 +34,20 @@ class HomeServer(object):
     def get_state_handler(self) -> synapse.state.StateHandler:
         pass
 
+    def get_deactivate_account_handler(self) -> synapse.handlers.deactivate_account.DeactivateAccountHandler:
+        pass
+
+    def get_set_password_handler(self) -> synapse.handlers.set_password.SetPasswordHandler:
+        pass
+
     def get_federation_sender(self) -> synapse.federation.transaction_queue.TransactionQueue:
         pass
 
     def get_federation_transport_client(self) -> synapse.federation.transport.client.TransportLayerClient:
         pass
+
+    def get_media_repository_resource(self) -> synapse.rest.media.v1.media_repository.MediaRepositoryResource:
+        pass
+
+    def get_media_repository(self) -> synapse.rest.media.v1.media_repository.MediaRepository:
+        pass
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 0fdf49a2fd..b971f0cb18 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -495,6 +495,7 @@ class SQLBaseStore(object):
             Deferred(bool): True if a new entry was created, False if an
                 existing one was updated.
         """
+        attempts = 0
         while True:
             try:
                 result = yield self.runInteraction(
@@ -504,6 +505,12 @@ class SQLBaseStore(object):
                 )
                 defer.returnValue(result)
             except self.database_engine.module.IntegrityError as e:
+                attempts += 1
+                if attempts >= 5:
+                    # don't retry forever, because things other than races
+                    # can cause IntegrityErrors
+                    raise
+
                 # presumably we raced with another transaction: let's retry.
                 logger.warn(
                     "IntegrityError when upserting into %s; retrying: %s",
@@ -600,20 +607,18 @@ class SQLBaseStore(object):
 
     @staticmethod
     def _simple_select_onecol_txn(txn, table, keyvalues, retcol):
-        if keyvalues:
-            where = "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.iterkeys())
-        else:
-            where = ""
-
         sql = (
-            "SELECT %(retcol)s FROM %(table)s %(where)s"
+            "SELECT %(retcol)s FROM %(table)s"
         ) % {
             "retcol": retcol,
             "table": table,
-            "where": where,
         }
 
-        txn.execute(sql, keyvalues.values())
+        if keyvalues:
+            sql += " WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.iterkeys())
+            txn.execute(sql, keyvalues.values())
+        else:
+            txn.execute(sql)
 
         return [r[0] for r in txn]
 
@@ -624,7 +629,7 @@ class SQLBaseStore(object):
 
         Args:
             table (str): table name
-            keyvalues (dict): column names and values to select the rows with
+            keyvalues (dict|None): column names and values to select the rows with
             retcol (str): column whos value we wish to retrieve.
 
         Returns:
diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py
index c8a1eb016b..56a0bde549 100644
--- a/synapse/storage/account_data.py
+++ b/synapse/storage/account_data.py
@@ -222,9 +222,12 @@ class AccountDataStore(SQLBaseStore):
         """
         content_json = json.dumps(content)
 
-        def add_account_data_txn(txn, next_id):
-            self._simple_upsert_txn(
-                txn,
+        with self._account_data_id_gen.get_next() as next_id:
+            # no need to lock here as room_account_data has a unique constraint
+            # on (user_id, room_id, account_data_type) so _simple_upsert will
+            # retry if there is a conflict.
+            yield self._simple_upsert(
+                desc="add_room_account_data",
                 table="room_account_data",
                 keyvalues={
                     "user_id": user_id,
@@ -234,19 +237,20 @@ class AccountDataStore(SQLBaseStore):
                 values={
                     "stream_id": next_id,
                     "content": content_json,
-                }
-            )
-            txn.call_after(
-                self._account_data_stream_cache.entity_has_changed,
-                user_id, next_id,
+                },
+                lock=False,
             )
-            txn.call_after(self.get_account_data_for_user.invalidate, (user_id,))
-            self._update_max_stream_id(txn, next_id)
 
-        with self._account_data_id_gen.get_next() as next_id:
-            yield self.runInteraction(
-                "add_room_account_data", add_account_data_txn, next_id
-            )
+            # it's theoretically possible for the above to succeed and the
+            # below to fail - in which case we might reuse a stream id on
+            # restart, and the above update might not get propagated. That
+            # doesn't sound any worse than the whole update getting lost,
+            # which is what would happen if we combined the two into one
+            # transaction.
+            yield self._update_max_stream_id(next_id)
+
+            self._account_data_stream_cache.entity_has_changed(user_id, next_id)
+            self.get_account_data_for_user.invalidate((user_id,))
 
         result = self._account_data_id_gen.get_current_token()
         defer.returnValue(result)
@@ -263,9 +267,12 @@ class AccountDataStore(SQLBaseStore):
         """
         content_json = json.dumps(content)
 
-        def add_account_data_txn(txn, next_id):
-            self._simple_upsert_txn(
-                txn,
+        with self._account_data_id_gen.get_next() as next_id:
+            # no need to lock here as account_data has a unique constraint on
+            # (user_id, account_data_type) so _simple_upsert will retry if
+            # there is a conflict.
+            yield self._simple_upsert(
+                desc="add_user_account_data",
                 table="account_data",
                 keyvalues={
                     "user_id": user_id,
@@ -274,40 +281,46 @@ class AccountDataStore(SQLBaseStore):
                 values={
                     "stream_id": next_id,
                     "content": content_json,
-                }
+                },
+                lock=False,
             )
-            txn.call_after(
-                self._account_data_stream_cache.entity_has_changed,
+
+            # it's theoretically possible for the above to succeed and the
+            # below to fail - in which case we might reuse a stream id on
+            # restart, and the above update might not get propagated. That
+            # doesn't sound any worse than the whole update getting lost,
+            # which is what would happen if we combined the two into one
+            # transaction.
+            yield self._update_max_stream_id(next_id)
+
+            self._account_data_stream_cache.entity_has_changed(
                 user_id, next_id,
             )
-            txn.call_after(self.get_account_data_for_user.invalidate, (user_id,))
-            txn.call_after(
-                self.get_global_account_data_by_type_for_user.invalidate,
+            self.get_account_data_for_user.invalidate((user_id,))
+            self.get_global_account_data_by_type_for_user.invalidate(
                 (account_data_type, user_id,)
             )
-            self._update_max_stream_id(txn, next_id)
-
-        with self._account_data_id_gen.get_next() as next_id:
-            yield self.runInteraction(
-                "add_user_account_data", add_account_data_txn, next_id
-            )
 
         result = self._account_data_id_gen.get_current_token()
         defer.returnValue(result)
 
-    def _update_max_stream_id(self, txn, next_id):
+    def _update_max_stream_id(self, next_id):
         """Update the max stream_id
 
         Args:
-            txn: The database cursor
             next_id(int): The the revision to advance to.
         """
-        update_max_id_sql = (
-            "UPDATE account_data_max_stream_id"
-            " SET stream_id = ?"
-            " WHERE stream_id < ?"
+        def _update(txn):
+            update_max_id_sql = (
+                "UPDATE account_data_max_stream_id"
+                " SET stream_id = ?"
+                " WHERE stream_id < ?"
+            )
+            txn.execute(update_max_id_sql, (next_id, next_id))
+        return self.runInteraction(
+            "update_account_data_max_stream_id",
+            _update,
         )
-        txn.execute(update_max_id_sql, (next_id, next_id))
 
     @cachedInlineCallbacks(num_args=2, cache_context=True, max_entries=5000)
     def is_ignored_by(self, ignored_user_id, ignorer_user_id, cache_context):
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index 6f235ac051..11a1b942f1 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -85,6 +85,7 @@ class BackgroundUpdateStore(SQLBaseStore):
         self._background_update_performance = {}
         self._background_update_queue = []
         self._background_update_handlers = {}
+        self._all_done = False
 
     @defer.inlineCallbacks
     def start_doing_background_updates(self):
@@ -106,9 +107,41 @@ class BackgroundUpdateStore(SQLBaseStore):
                         "No more background updates to do."
                         " Unscheduling background update task."
                     )
+                    self._all_done = True
                     defer.returnValue(None)
 
     @defer.inlineCallbacks
+    def has_completed_background_updates(self):
+        """Check if all the background updates have completed
+
+        Returns:
+            Deferred[bool]: True if all background updates have completed
+        """
+        # if we've previously determined that there is nothing left to do, that
+        # is easy
+        if self._all_done:
+            defer.returnValue(True)
+
+        # obviously, if we have things in our queue, we're not done.
+        if self._background_update_queue:
+            defer.returnValue(False)
+
+        # otherwise, check if there are updates to be run. This is important,
+        # as we may be running on a worker which doesn't perform the bg updates
+        # itself, but still wants to wait for them to happen.
+        updates = yield self._simple_select_onecol(
+            "background_updates",
+            keyvalues=None,
+            retcol="1",
+            desc="check_background_updates",
+        )
+        if not updates:
+            self._all_done = True
+            defer.returnValue(True)
+
+        defer.returnValue(False)
+
+    @defer.inlineCallbacks
     def do_next_background_update(self, desired_duration_ms):
         """Does some amount of work on the next queued background update
 
@@ -269,7 +302,7 @@ class BackgroundUpdateStore(SQLBaseStore):
             # Sqlite doesn't support concurrent creation of indexes.
             #
             # We don't use partial indices on SQLite as it wasn't introduced
-            # until 3.8, and wheezy has 3.7
+            # until 3.8, and wheezy and CentOS 7 have 3.7
             #
             # We assume that sqlite doesn't give us invalid indices; however
             # we may still end up with the index existing but the
diff --git a/synapse/storage/media_repository.py b/synapse/storage/media_repository.py
index 52e5cdad70..a66ff7c1e0 100644
--- a/synapse/storage/media_repository.py
+++ b/synapse/storage/media_repository.py
@@ -12,13 +12,23 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+from synapse.storage.background_updates import BackgroundUpdateStore
 
-from ._base import SQLBaseStore
 
-
-class MediaRepositoryStore(SQLBaseStore):
+class MediaRepositoryStore(BackgroundUpdateStore):
     """Persistence for attachments and avatars"""
 
+    def __init__(self, db_conn, hs):
+        super(MediaRepositoryStore, self).__init__(db_conn, hs)
+
+        self.register_background_index_update(
+            update_name='local_media_repository_url_idx',
+            index_name='local_media_repository_url_idx',
+            table='local_media_repository',
+            columns=['created_ts'],
+            where_clause='url_cache IS NOT NULL',
+        )
+
     def get_default_thumbnails(self, top_level_type, sub_type):
         return []
 
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 8b9544c209..3aa810981f 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -254,8 +254,8 @@ class RegistrationStore(background_updates.BackgroundUpdateStore):
                 If None, tokens associated with any device (or no device) will
                 be deleted
         Returns:
-            defer.Deferred[list[str, str|None]]: a list of the deleted tokens
-                and device IDs
+            defer.Deferred[list[str, int, str|None, int]]: a list of
+                (token, token id, device id) for each of the deleted tokens
         """
         def f(txn):
             keyvalues = {
@@ -272,12 +272,12 @@ class RegistrationStore(background_updates.BackgroundUpdateStore):
                 values.append(except_token_id)
 
             txn.execute(
-                "SELECT token, device_id FROM access_tokens WHERE %s" % where_clause,
+                "SELECT token, id, device_id FROM access_tokens WHERE %s" % where_clause,
                 values
             )
-            tokens_and_devices = [(r[0], r[1]) for r in txn]
+            tokens_and_devices = [(r[0], r[1], r[2]) for r in txn]
 
-            for token, _ in tokens_and_devices:
+            for token, _, _ in tokens_and_devices:
                 self._invalidate_cache_and_stream(
                     txn, self.get_user_by_access_token, (token,)
                 )
diff --git a/synapse/storage/schema/delta/44/expire_url_cache.sql b/synapse/storage/schema/delta/44/expire_url_cache.sql
index e2b775f038..b12f9b2ebf 100644
--- a/synapse/storage/schema/delta/44/expire_url_cache.sql
+++ b/synapse/storage/schema/delta/44/expire_url_cache.sql
@@ -13,7 +13,10 @@
  * limitations under the License.
  */
 
-CREATE INDEX local_media_repository_url_idx ON local_media_repository(created_ts) WHERE url_cache IS NOT NULL;
+-- this didn't work on SQLite 3.7 (because of lack of partial indexes), so was
+-- removed and replaced with 46/local_media_repository_url_idx.sql.
+--
+-- CREATE INDEX local_media_repository_url_idx ON local_media_repository(created_ts) WHERE url_cache IS NOT NULL;
 
 -- we need to change `expires` to `expires_ts` so that we can index on it. SQLite doesn't support
 -- indices on expressions until 3.9.
diff --git a/synapse/storage/schema/delta/46/local_media_repository_url_idx.sql b/synapse/storage/schema/delta/46/local_media_repository_url_idx.sql
new file mode 100644
index 0000000000..bbfc7f5d1a
--- /dev/null
+++ b/synapse/storage/schema/delta/46/local_media_repository_url_idx.sql
@@ -0,0 +1,24 @@
+/* Copyright 2017 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- register a background update which will recreate the
+-- local_media_repository_url_idx index.
+--
+-- We do this as a bg update not because it is a particularly onerous
+-- operation, but because we'd like it to be a partial index if possible, and
+-- the background_index_update code will understand whether we are on
+-- postgres or sqlite and behave accordingly.
+INSERT INTO background_updates (update_name, progress_json) VALUES
+    ('local_media_repository_url_idx', '{}');