summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/api/errors.py27
-rw-r--r--synapse/app/appservice.py13
-rw-r--r--synapse/app/client_reader.py13
-rw-r--r--synapse/app/federation_reader.py13
-rw-r--r--synapse/app/federation_sender.py13
-rw-r--r--synapse/app/frontend_proxy.py13
-rwxr-xr-xsynapse/app/homeserver.py13
-rw-r--r--synapse/app/media_repository.py13
-rw-r--r--synapse/app/pusher.py13
-rw-r--r--synapse/app/synchrotron.py13
-rw-r--r--synapse/app/user_dir.py13
-rw-r--r--synapse/config/registration.py19
-rw-r--r--synapse/config/repository.py87
-rw-r--r--synapse/config/server.py22
-rw-r--r--synapse/federation/federation_client.py5
-rw-r--r--synapse/federation/transaction_queue.py4
-rw-r--r--synapse/federation/transport/client.py3
-rw-r--r--synapse/federation/transport/server.py9
-rw-r--r--synapse/handlers/device.py4
-rw-r--r--synapse/handlers/devicemessage.py14
-rw-r--r--synapse/handlers/e2e_keys.py21
-rw-r--r--synapse/handlers/federation.py15
-rw-r--r--synapse/handlers/register.py8
-rw-r--r--synapse/handlers/room_list.py3
-rw-r--r--synapse/http/client.py14
-rw-r--r--synapse/http/endpoint.py3
-rw-r--r--synapse/http/matrixfederationclient.py28
-rw-r--r--synapse/http/server.py29
-rw-r--r--synapse/http/site.py10
-rw-r--r--synapse/metrics/__init__.py7
-rw-r--r--synapse/metrics/metric.py9
-rw-r--r--synapse/push/httppusher.py37
-rw-r--r--synapse/rest/client/v1/admin.py22
-rw-r--r--synapse/rest/client/v1/register.py34
-rw-r--r--synapse/rest/client/v1/room.py17
-rw-r--r--synapse/rest/client/v2_alpha/account.py21
-rw-r--r--synapse/rest/client/v2_alpha/register.py69
-rw-r--r--synapse/rest/key/v2/remote_key_resource.py8
-rw-r--r--synapse/rest/media/v1/media_repository.py69
-rw-r--r--synapse/rest/media/v1/media_storage.py8
-rw-r--r--synapse/rest/media/v1/preview_url_resource.py55
-rw-r--r--synapse/rest/media/v1/storage_provider.py31
-rw-r--r--synapse/rest/media/v1/thumbnail_resource.py2
-rw-r--r--synapse/server.py23
-rw-r--r--synapse/server.pyi3
-rw-r--r--synapse/state.py252
-rw-r--r--synapse/storage/_base.py55
-rw-r--r--synapse/storage/events.py69
-rw-r--r--synapse/storage/media_repository.py23
-rw-r--r--synapse/storage/prepare_database.py2
-rw-r--r--synapse/storage/room.py153
-rw-r--r--synapse/storage/schema/delta/47/last_access_media.sql16
-rw-r--r--synapse/storage/user_directory.py13
-rw-r--r--synapse/util/logcontext.py48
-rw-r--r--synapse/util/metrics.py26
-rw-r--r--synapse/util/retryutils.py12
-rw-r--r--synapse/util/threepids.py48
57 files changed, 1112 insertions, 475 deletions
diff --git a/synapse/api/errors.py b/synapse/api/errors.py
index 79b35b3e7c..aa15f73f36 100644
--- a/synapse/api/errors.py
+++ b/synapse/api/errors.py
@@ -46,6 +46,7 @@ class Codes(object):
     THREEPID_AUTH_FAILED = "M_THREEPID_AUTH_FAILED"
     THREEPID_IN_USE = "M_THREEPID_IN_USE"
     THREEPID_NOT_FOUND = "M_THREEPID_NOT_FOUND"
+    THREEPID_DENIED = "M_THREEPID_DENIED"
     INVALID_USERNAME = "M_INVALID_USERNAME"
     SERVER_NOT_TRUSTED = "M_SERVER_NOT_TRUSTED"
 
@@ -140,6 +141,32 @@ class RegistrationError(SynapseError):
     pass
 
 
+class FederationDeniedError(SynapseError):
+    """An error raised when the server tries to federate with a server which
+    is not on its federation whitelist.
+
+    Attributes:
+        destination (str): The destination which has been denied
+    """
+
+    def __init__(self, destination):
+        """Raised by federation client or server to indicate that we are
+        are deliberately not attempting to contact a given server because it is
+        not on our federation whitelist.
+
+        Args:
+            destination (str): the domain in question
+        """
+
+        self.destination = destination
+
+        super(FederationDeniedError, self).__init__(
+            code=403,
+            msg="Federation denied with %s." % (self.destination,),
+            errcode=Codes.FORBIDDEN,
+        )
+
+
 class InteractiveAuthIncompleteError(Exception):
     """An error raised when UI auth is not yet complete
 
diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py
index 7d0c2879ae..c6fe4516d1 100644
--- a/synapse/app/appservice.py
+++ b/synapse/app/appservice.py
@@ -49,19 +49,6 @@ class AppserviceSlaveStore(
 
 
 class AppserviceServer(HomeServer):
-    def get_db_conn(self, run_new_connection=True):
-        # Any param beginning with cp_ is a parameter for adbapi, and should
-        # not be passed to the database engine.
-        db_params = {
-            k: v for k, v in self.db_config.get("args", {}).items()
-            if not k.startswith("cp_")
-        }
-        db_conn = self.database_engine.module.connect(**db_params)
-
-        if run_new_connection:
-            self.database_engine.on_new_connection(db_conn)
-        return db_conn
-
     def setup(self):
         logger.info("Setting up.")
         self.datastore = AppserviceSlaveStore(self.get_db_conn(), self)
diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py
index dc3f6efd43..3b3352798d 100644
--- a/synapse/app/client_reader.py
+++ b/synapse/app/client_reader.py
@@ -64,19 +64,6 @@ class ClientReaderSlavedStore(
 
 
 class ClientReaderServer(HomeServer):
-    def get_db_conn(self, run_new_connection=True):
-        # Any param beginning with cp_ is a parameter for adbapi, and should
-        # not be passed to the database engine.
-        db_params = {
-            k: v for k, v in self.db_config.get("args", {}).items()
-            if not k.startswith("cp_")
-        }
-        db_conn = self.database_engine.module.connect(**db_params)
-
-        if run_new_connection:
-            self.database_engine.on_new_connection(db_conn)
-        return db_conn
-
     def setup(self):
         logger.info("Setting up.")
         self.datastore = ClientReaderSlavedStore(self.get_db_conn(), self)
diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py
index a072291e1f..4de43c41f0 100644
--- a/synapse/app/federation_reader.py
+++ b/synapse/app/federation_reader.py
@@ -58,19 +58,6 @@ class FederationReaderSlavedStore(
 
 
 class FederationReaderServer(HomeServer):
-    def get_db_conn(self, run_new_connection=True):
-        # Any param beginning with cp_ is a parameter for adbapi, and should
-        # not be passed to the database engine.
-        db_params = {
-            k: v for k, v in self.db_config.get("args", {}).items()
-            if not k.startswith("cp_")
-        }
-        db_conn = self.database_engine.module.connect(**db_params)
-
-        if run_new_connection:
-            self.database_engine.on_new_connection(db_conn)
-        return db_conn
-
     def setup(self):
         logger.info("Setting up.")
         self.datastore = FederationReaderSlavedStore(self.get_db_conn(), self)
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 09e9488f06..f760826d27 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -76,19 +76,6 @@ class FederationSenderSlaveStore(
 
 
 class FederationSenderServer(HomeServer):
-    def get_db_conn(self, run_new_connection=True):
-        # Any param beginning with cp_ is a parameter for adbapi, and should
-        # not be passed to the database engine.
-        db_params = {
-            k: v for k, v in self.db_config.get("args", {}).items()
-            if not k.startswith("cp_")
-        }
-        db_conn = self.database_engine.module.connect(**db_params)
-
-        if run_new_connection:
-            self.database_engine.on_new_connection(db_conn)
-        return db_conn
-
     def setup(self):
         logger.info("Setting up.")
         self.datastore = FederationSenderSlaveStore(self.get_db_conn(), self)
diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py
index ae531c0aa4..e32ee8fe93 100644
--- a/synapse/app/frontend_proxy.py
+++ b/synapse/app/frontend_proxy.py
@@ -118,19 +118,6 @@ class FrontendProxySlavedStore(
 
 
 class FrontendProxyServer(HomeServer):
-    def get_db_conn(self, run_new_connection=True):
-        # Any param beginning with cp_ is a parameter for adbapi, and should
-        # not be passed to the database engine.
-        db_params = {
-            k: v for k, v in self.db_config.get("args", {}).items()
-            if not k.startswith("cp_")
-        }
-        db_conn = self.database_engine.module.connect(**db_params)
-
-        if run_new_connection:
-            self.database_engine.on_new_connection(db_conn)
-        return db_conn
-
     def setup(self):
         logger.info("Setting up.")
         self.datastore = FrontendProxySlavedStore(self.get_db_conn(), self)
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 92ab3b311b..cb82a415a6 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -266,19 +266,6 @@ class SynapseHomeServer(HomeServer):
         except IncorrectDatabaseSetup as e:
             quit_with_error(e.message)
 
-    def get_db_conn(self, run_new_connection=True):
-        # Any param beginning with cp_ is a parameter for adbapi, and should
-        # not be passed to the database engine.
-        db_params = {
-            k: v for k, v in self.db_config.get("args", {}).items()
-            if not k.startswith("cp_")
-        }
-        db_conn = self.database_engine.module.connect(**db_params)
-
-        if run_new_connection:
-            self.database_engine.on_new_connection(db_conn)
-        return db_conn
-
 
 def setup(config_options):
     """
diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py
index eab1597aaa..1ed1ca8772 100644
--- a/synapse/app/media_repository.py
+++ b/synapse/app/media_repository.py
@@ -60,19 +60,6 @@ class MediaRepositorySlavedStore(
 
 
 class MediaRepositoryServer(HomeServer):
-    def get_db_conn(self, run_new_connection=True):
-        # Any param beginning with cp_ is a parameter for adbapi, and should
-        # not be passed to the database engine.
-        db_params = {
-            k: v for k, v in self.db_config.get("args", {}).items()
-            if not k.startswith("cp_")
-        }
-        db_conn = self.database_engine.module.connect(**db_params)
-
-        if run_new_connection:
-            self.database_engine.on_new_connection(db_conn)
-        return db_conn
-
     def setup(self):
         logger.info("Setting up.")
         self.datastore = MediaRepositorySlavedStore(self.get_db_conn(), self)
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index 7fbbb0b0e1..32ccea3f13 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -81,19 +81,6 @@ class PusherSlaveStore(
 
 
 class PusherServer(HomeServer):
-    def get_db_conn(self, run_new_connection=True):
-        # Any param beginning with cp_ is a parameter for adbapi, and should
-        # not be passed to the database engine.
-        db_params = {
-            k: v for k, v in self.db_config.get("args", {}).items()
-            if not k.startswith("cp_")
-        }
-        db_conn = self.database_engine.module.connect(**db_params)
-
-        if run_new_connection:
-            self.database_engine.on_new_connection(db_conn)
-        return db_conn
-
     def setup(self):
         logger.info("Setting up.")
         self.datastore = PusherSlaveStore(self.get_db_conn(), self)
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 0abba3016e..f87531f1b6 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -246,19 +246,6 @@ class SynchrotronApplicationService(object):
 
 
 class SynchrotronServer(HomeServer):
-    def get_db_conn(self, run_new_connection=True):
-        # Any param beginning with cp_ is a parameter for adbapi, and should
-        # not be passed to the database engine.
-        db_params = {
-            k: v for k, v in self.db_config.get("args", {}).items()
-            if not k.startswith("cp_")
-        }
-        db_conn = self.database_engine.module.connect(**db_params)
-
-        if run_new_connection:
-            self.database_engine.on_new_connection(db_conn)
-        return db_conn
-
     def setup(self):
         logger.info("Setting up.")
         self.datastore = SynchrotronSlavedStore(self.get_db_conn(), self)
diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py
index a48c4a2ae6..494ccb702c 100644
--- a/synapse/app/user_dir.py
+++ b/synapse/app/user_dir.py
@@ -92,19 +92,6 @@ class UserDirectorySlaveStore(
 
 
 class UserDirectoryServer(HomeServer):
-    def get_db_conn(self, run_new_connection=True):
-        # Any param beginning with cp_ is a parameter for adbapi, and should
-        # not be passed to the database engine.
-        db_params = {
-            k: v for k, v in self.db_config.get("args", {}).items()
-            if not k.startswith("cp_")
-        }
-        db_conn = self.database_engine.module.connect(**db_params)
-
-        if run_new_connection:
-            self.database_engine.on_new_connection(db_conn)
-        return db_conn
-
     def setup(self):
         logger.info("Setting up.")
         self.datastore = UserDirectorySlaveStore(self.get_db_conn(), self)
diff --git a/synapse/config/registration.py b/synapse/config/registration.py
index ef917fc9f2..336959094b 100644
--- a/synapse/config/registration.py
+++ b/synapse/config/registration.py
@@ -31,6 +31,8 @@ class RegistrationConfig(Config):
                 strtobool(str(config["disable_registration"]))
             )
 
+        self.registrations_require_3pid = config.get("registrations_require_3pid", [])
+        self.allowed_local_3pids = config.get("allowed_local_3pids", [])
         self.registration_shared_secret = config.get("registration_shared_secret")
 
         self.bcrypt_rounds = config.get("bcrypt_rounds", 12)
@@ -52,6 +54,23 @@ class RegistrationConfig(Config):
         # Enable registration for new users.
         enable_registration: False
 
+        # The user must provide all of the below types of 3PID when registering.
+        #
+        # registrations_require_3pid:
+        #     - email
+        #     - msisdn
+
+        # Mandate that users are only allowed to associate certain formats of
+        # 3PIDs with accounts on this server.
+        #
+        # allowed_local_3pids:
+        #     - medium: email
+        #       pattern: ".*@matrix\\.org"
+        #     - medium: email
+        #       pattern: ".*@vector\\.im"
+        #     - medium: msisdn
+        #       pattern: "\\+44"
+
         # If set, allows registration by anyone who also has the shared
         # secret, even if registration is otherwise disabled.
         registration_shared_secret: "%(registration_shared_secret)s"
diff --git a/synapse/config/repository.py b/synapse/config/repository.py
index 6baa474931..25ea77738a 100644
--- a/synapse/config/repository.py
+++ b/synapse/config/repository.py
@@ -16,6 +16,8 @@
 from ._base import Config, ConfigError
 from collections import namedtuple
 
+from synapse.util.module_loader import load_module
+
 
 MISSING_NETADDR = (
     "Missing netaddr library. This is required for URL preview API."
@@ -36,6 +38,14 @@ ThumbnailRequirement = namedtuple(
     "ThumbnailRequirement", ["width", "height", "method", "media_type"]
 )
 
+MediaStorageProviderConfig = namedtuple(
+    "MediaStorageProviderConfig", (
+        "store_local",  # Whether to store newly uploaded local files
+        "store_remote",  # Whether to store newly downloaded remote files
+        "store_synchronous",  # Whether to wait for successful storage for local uploads
+    ),
+)
+
 
 def parse_thumbnail_requirements(thumbnail_sizes):
     """ Takes a list of dictionaries with "width", "height", and "method" keys
@@ -73,16 +83,61 @@ class ContentRepositoryConfig(Config):
 
         self.media_store_path = self.ensure_directory(config["media_store_path"])
 
-        self.backup_media_store_path = config.get("backup_media_store_path")
-        if self.backup_media_store_path:
-            self.backup_media_store_path = self.ensure_directory(
-                self.backup_media_store_path
-            )
+        backup_media_store_path = config.get("backup_media_store_path")
 
-        self.synchronous_backup_media_store = config.get(
+        synchronous_backup_media_store = config.get(
             "synchronous_backup_media_store", False
         )
 
+        storage_providers = config.get("media_storage_providers", [])
+
+        if backup_media_store_path:
+            if storage_providers:
+                raise ConfigError(
+                    "Cannot use both 'backup_media_store_path' and 'storage_providers'"
+                )
+
+            storage_providers = [{
+                "module": "file_system",
+                "store_local": True,
+                "store_synchronous": synchronous_backup_media_store,
+                "store_remote": True,
+                "config": {
+                    "directory": backup_media_store_path,
+                }
+            }]
+
+        # This is a list of config that can be used to create the storage
+        # providers. The entries are tuples of (Class, class_config,
+        # MediaStorageProviderConfig), where Class is the class of the provider,
+        # the class_config the config to pass to it, and
+        # MediaStorageProviderConfig are options for StorageProviderWrapper.
+        #
+        # We don't create the storage providers here as not all workers need
+        # them to be started.
+        self.media_storage_providers = []
+
+        for provider_config in storage_providers:
+            # We special case the module "file_system" so as not to need to
+            # expose FileStorageProviderBackend
+            if provider_config["module"] == "file_system":
+                provider_config["module"] = (
+                    "synapse.rest.media.v1.storage_provider"
+                    ".FileStorageProviderBackend"
+                )
+
+            provider_class, parsed_config = load_module(provider_config)
+
+            wrapper_config = MediaStorageProviderConfig(
+                provider_config.get("store_local", False),
+                provider_config.get("store_remote", False),
+                provider_config.get("store_synchronous", False),
+            )
+
+            self.media_storage_providers.append(
+                (provider_class, parsed_config, wrapper_config,)
+            )
+
         self.uploads_path = self.ensure_directory(config["uploads_path"])
         self.dynamic_thumbnails = config["dynamic_thumbnails"]
         self.thumbnail_requirements = parse_thumbnail_requirements(
@@ -127,13 +182,19 @@ class ContentRepositoryConfig(Config):
         # Directory where uploaded images and attachments are stored.
         media_store_path: "%(media_store)s"
 
-        # A secondary directory where uploaded images and attachments are
-        # stored as a backup.
-        # backup_media_store_path: "%(media_store)s"
-
-        # Whether to wait for successful write to backup media store before
-        # returning successfully.
-        # synchronous_backup_media_store: false
+        # Media storage providers allow media to be stored in different
+        # locations.
+        # media_storage_providers:
+        # - module: file_system
+        #   # Whether to write new local files.
+        #   store_local: false
+        #   # Whether to write new remote media
+        #   store_remote: false
+        #   # Whether to block upload requests waiting for write to this
+        #   # provider to complete
+        #   store_synchronous: false
+        #   config:
+        #     directory: /mnt/some/other/directory
 
         # Directory where in-progress uploads are stored.
         uploads_path: "%(uploads_path)s"
diff --git a/synapse/config/server.py b/synapse/config/server.py
index 436dd8a6fe..8f0b6d1f28 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -55,6 +55,17 @@ class ServerConfig(Config):
             "block_non_admin_invites", False,
         )
 
+        # FIXME: federation_domain_whitelist needs sytests
+        self.federation_domain_whitelist = None
+        federation_domain_whitelist = config.get(
+            "federation_domain_whitelist", None
+        )
+        # turn the whitelist into a hash for speed of lookup
+        if federation_domain_whitelist is not None:
+            self.federation_domain_whitelist = {}
+            for domain in federation_domain_whitelist:
+                self.federation_domain_whitelist[domain] = True
+
         if self.public_baseurl is not None:
             if self.public_baseurl[-1] != '/':
                 self.public_baseurl += '/'
@@ -210,6 +221,17 @@ class ServerConfig(Config):
         # (except those sent by local server admins). The default is False.
         # block_non_admin_invites: True
 
+        # Restrict federation to the following whitelist of domains.
+        # N.B. we recommend also firewalling your federation listener to limit
+        # inbound federation traffic as early as possible, rather than relying
+        # purely on this application-layer restriction.  If not specified, the
+        # default is to whitelist everything.
+        #
+        # federation_domain_whitelist:
+        #  - lon.example.com
+        #  - nyc.example.com
+        #  - syd.example.com
+
         # List of ports that Synapse should listen on, their purpose and their
         # configuration.
         listeners:
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index b1fe03f702..813907f7f2 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -23,7 +23,7 @@ from twisted.internet import defer
 
 from synapse.api.constants import Membership
 from synapse.api.errors import (
-    CodeMessageException, HttpResponseException, SynapseError,
+    CodeMessageException, HttpResponseException, SynapseError, FederationDeniedError
 )
 from synapse.events import builder
 from synapse.federation.federation_base import (
@@ -266,6 +266,9 @@ class FederationClient(FederationBase):
             except NotRetryingDestination as e:
                 logger.info(e.message)
                 continue
+            except FederationDeniedError as e:
+                logger.info(e.message)
+                continue
             except Exception as e:
                 pdu_attempts[destination] = now
 
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 9d39f46583..a141ec9953 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -19,7 +19,7 @@ from twisted.internet import defer
 from .persistence import TransactionActions
 from .units import Transaction, Edu
 
-from synapse.api.errors import HttpResponseException
+from synapse.api.errors import HttpResponseException, FederationDeniedError
 from synapse.util import logcontext, PreserveLoggingContext
 from synapse.util.async import run_on_reactor
 from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
@@ -490,6 +490,8 @@ class TransactionQueue(object):
                     (e.retry_last_ts + e.retry_interval) / 1000.0
                 ),
             )
+        except FederationDeniedError as e:
+            logger.info(e)
         except Exception as e:
             logger.warn(
                 "TX [%s] Failed to send transaction: %s",
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 1f3ce238f6..5488e82985 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -212,6 +212,9 @@ class TransportLayerClient(object):
 
             Fails with ``NotRetryingDestination`` if we are not yet ready
             to retry this server.
+
+            Fails with ``FederationDeniedError`` if the remote destination
+            is not in our federation whitelist
         """
         valid_memberships = {Membership.JOIN, Membership.LEAVE}
         if membership not in valid_memberships:
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 2b02b021ec..06c16ba4fa 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -16,7 +16,7 @@
 from twisted.internet import defer
 
 from synapse.api.urls import FEDERATION_PREFIX as PREFIX
-from synapse.api.errors import Codes, SynapseError
+from synapse.api.errors import Codes, SynapseError, FederationDeniedError
 from synapse.http.server import JsonResource
 from synapse.http.servlet import (
     parse_json_object_from_request, parse_integer_from_args, parse_string_from_args,
@@ -81,6 +81,7 @@ class Authenticator(object):
         self.keyring = hs.get_keyring()
         self.server_name = hs.hostname
         self.store = hs.get_datastore()
+        self.federation_domain_whitelist = hs.config.federation_domain_whitelist
 
     # A method just so we can pass 'self' as the authenticator to the Servlets
     @defer.inlineCallbacks
@@ -92,6 +93,12 @@ class Authenticator(object):
             "signatures": {},
         }
 
+        if (
+            self.federation_domain_whitelist is not None and
+            self.server_name not in self.federation_domain_whitelist
+        ):
+            raise FederationDeniedError(self.server_name)
+
         if content is not None:
             json_request["content"] = content
 
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 2152efc692..0e83453851 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -14,6 +14,7 @@
 # limitations under the License.
 from synapse.api import errors
 from synapse.api.constants import EventTypes
+from synapse.api.errors import FederationDeniedError
 from synapse.util import stringutils
 from synapse.util.async import Linearizer
 from synapse.util.caches.expiringcache import ExpiringCache
@@ -513,6 +514,9 @@ class DeviceListEduUpdater(object):
                     # This makes it more likely that the device lists will
                     # eventually become consistent.
                     return
+                except FederationDeniedError as e:
+                    logger.info(e)
+                    return
                 except Exception:
                     # TODO: Remember that we are now out of sync and try again
                     # later
diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py
index f7fad15c62..d996aa90bb 100644
--- a/synapse/handlers/devicemessage.py
+++ b/synapse/handlers/devicemessage.py
@@ -17,7 +17,8 @@ import logging
 
 from twisted.internet import defer
 
-from synapse.types import get_domain_from_id
+from synapse.api.errors import SynapseError
+from synapse.types import get_domain_from_id, UserID
 from synapse.util.stringutils import random_string
 
 
@@ -33,7 +34,7 @@ class DeviceMessageHandler(object):
         """
         self.store = hs.get_datastore()
         self.notifier = hs.get_notifier()
-        self.is_mine_id = hs.is_mine_id
+        self.is_mine = hs.is_mine
         self.federation = hs.get_federation_sender()
 
         hs.get_replication_layer().register_edu_handler(
@@ -52,6 +53,12 @@ class DeviceMessageHandler(object):
         message_type = content["type"]
         message_id = content["message_id"]
         for user_id, by_device in content["messages"].items():
+            # we use UserID.from_string to catch invalid user ids
+            if not self.is_mine(UserID.from_string(user_id)):
+                logger.warning("Request for keys for non-local user %s",
+                               user_id)
+                raise SynapseError(400, "Not a user here")
+
             messages_by_device = {
                 device_id: {
                     "content": message_content,
@@ -77,7 +84,8 @@ class DeviceMessageHandler(object):
         local_messages = {}
         remote_messages = {}
         for user_id, by_device in messages.items():
-            if self.is_mine_id(user_id):
+            # we use UserID.from_string to catch invalid user ids
+            if self.is_mine(UserID.from_string(user_id)):
                 messages_by_device = {
                     device_id: {
                         "content": message_content,
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 668a90e495..9aa95f89e6 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -19,8 +19,10 @@ import logging
 from canonicaljson import encode_canonical_json
 from twisted.internet import defer
 
-from synapse.api.errors import SynapseError, CodeMessageException
-from synapse.types import get_domain_from_id
+from synapse.api.errors import (
+    SynapseError, CodeMessageException, FederationDeniedError,
+)
+from synapse.types import get_domain_from_id, UserID
 from synapse.util.logcontext import preserve_fn, make_deferred_yieldable
 from synapse.util.retryutils import NotRetryingDestination
 
@@ -32,7 +34,7 @@ class E2eKeysHandler(object):
         self.store = hs.get_datastore()
         self.federation = hs.get_replication_layer()
         self.device_handler = hs.get_device_handler()
-        self.is_mine_id = hs.is_mine_id
+        self.is_mine = hs.is_mine
         self.clock = hs.get_clock()
 
         # doesn't really work as part of the generic query API, because the
@@ -70,7 +72,8 @@ class E2eKeysHandler(object):
         remote_queries = {}
 
         for user_id, device_ids in device_keys_query.items():
-            if self.is_mine_id(user_id):
+            # we use UserID.from_string to catch invalid user ids
+            if self.is_mine(UserID.from_string(user_id)):
                 local_query[user_id] = device_ids
             else:
                 remote_queries[user_id] = device_ids
@@ -139,6 +142,10 @@ class E2eKeysHandler(object):
                 failures[destination] = {
                     "status": 503, "message": "Not ready for retry",
                 }
+            except FederationDeniedError as e:
+                failures[destination] = {
+                    "status": 403, "message": "Federation Denied",
+                }
             except Exception as e:
                 # include ConnectionRefused and other errors
                 failures[destination] = {
@@ -170,7 +177,8 @@ class E2eKeysHandler(object):
 
         result_dict = {}
         for user_id, device_ids in query.items():
-            if not self.is_mine_id(user_id):
+            # we use UserID.from_string to catch invalid user ids
+            if not self.is_mine(UserID.from_string(user_id)):
                 logger.warning("Request for keys for non-local user %s",
                                user_id)
                 raise SynapseError(400, "Not a user here")
@@ -213,7 +221,8 @@ class E2eKeysHandler(object):
         remote_queries = {}
 
         for user_id, device_keys in query.get("one_time_keys", {}).items():
-            if self.is_mine_id(user_id):
+            # we use UserID.from_string to catch invalid user ids
+            if self.is_mine(UserID.from_string(user_id)):
                 for device_id, algorithm in device_keys.items():
                     local_query.append((user_id, device_id, algorithm))
             else:
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index ac70730885..8ee9434c9b 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -22,6 +22,7 @@ from ._base import BaseHandler
 
 from synapse.api.errors import (
     AuthError, FederationError, StoreError, CodeMessageException, SynapseError,
+    FederationDeniedError,
 )
 from synapse.api.constants import EventTypes, Membership, RejectedReason
 from synapse.events.validator import EventValidator
@@ -782,6 +783,9 @@ class FederationHandler(BaseHandler):
                 except NotRetryingDestination as e:
                     logger.info(e.message)
                     continue
+                except FederationDeniedError as e:
+                    logger.info(e)
+                    continue
                 except Exception as e:
                     logger.exception(
                         "Failed to backfill from %s because %s",
@@ -804,13 +808,12 @@ class FederationHandler(BaseHandler):
         event_ids = list(extremities.keys())
 
         logger.debug("calling resolve_state_groups in _maybe_backfill")
+        resolve = logcontext.preserve_fn(
+            self.state_handler.resolve_state_groups_for_events
+        )
         states = yield logcontext.make_deferred_yieldable(defer.gatherResults(
-            [
-                logcontext.preserve_fn(self.state_handler.resolve_state_groups)(
-                    room_id, [e]
-                )
-                for e in event_ids
-            ], consumeErrors=True,
+            [resolve(room_id, [e]) for e in event_ids],
+            consumeErrors=True,
         ))
         states = dict(zip(event_ids, [s.state for s in states]))
 
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 5b808beac1..9021d4d57f 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -25,6 +25,7 @@ from synapse.http.client import CaptchaServerHttpClient
 from synapse import types
 from synapse.types import UserID
 from synapse.util.async import run_on_reactor
+from synapse.util.threepids import check_3pid_allowed
 from ._base import BaseHandler
 
 logger = logging.getLogger(__name__)
@@ -293,7 +294,7 @@ class RegistrationHandler(BaseHandler):
         """
 
         for c in threepidCreds:
-            logger.info("validating theeepidcred sid %s on id server %s",
+            logger.info("validating threepidcred sid %s on id server %s",
                         c['sid'], c['idServer'])
             try:
                 identity_handler = self.hs.get_handlers().identity_handler
@@ -307,6 +308,11 @@ class RegistrationHandler(BaseHandler):
             logger.info("got threepid with medium '%s' and address '%s'",
                         threepid['medium'], threepid['address'])
 
+            if not check_3pid_allowed(self.hs, threepid['medium'], threepid['address']):
+                raise RegistrationError(
+                    403, "Third party identifier is not allowed"
+                )
+
     @defer.inlineCallbacks
     def bind_emails(self, user_id, threepidCreds):
         """Links emails with a user ID and informs an identity server.
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index bb40075387..dfa09141ed 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -203,7 +203,8 @@ class RoomListHandler(BaseHandler):
         if limit:
             step = limit + 1
         else:
-            step = len(rooms_to_scan)
+            # step cannot be zero
+            step = len(rooms_to_scan) if len(rooms_to_scan) != 0 else 1
 
         chunk = []
         for i in xrange(0, len(rooms_to_scan), step):
diff --git a/synapse/http/client.py b/synapse/http/client.py
index 4abb479ae3..f3e4973c2e 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -18,6 +18,7 @@ from OpenSSL.SSL import VERIFY_NONE
 from synapse.api.errors import (
     CodeMessageException, MatrixCodeMessageException, SynapseError, Codes,
 )
+from synapse.util.caches import CACHE_SIZE_FACTOR
 from synapse.util.logcontext import make_deferred_yieldable
 from synapse.util import logcontext
 import synapse.metrics
@@ -30,6 +31,7 @@ from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
 from twisted.web.client import (
     BrowserLikeRedirectAgent, ContentDecoderAgent, GzipDecoder, Agent,
     readBody, PartialDownloadError,
+    HTTPConnectionPool,
 )
 from twisted.web.client import FileBodyProducer as TwistedFileBodyProducer
 from twisted.web.http import PotentialDataLoss
@@ -64,13 +66,23 @@ class SimpleHttpClient(object):
     """
     def __init__(self, hs):
         self.hs = hs
+
+        pool = HTTPConnectionPool(reactor)
+
+        # the pusher makes lots of concurrent SSL connections to sygnal, and
+        # tends to do so in batches, so we need to allow the pool to keep lots
+        # of idle connections around.
+        pool.maxPersistentPerHost = max((100 * CACHE_SIZE_FACTOR, 5))
+        pool.cachedConnectionTimeout = 2 * 60
+
         # The default context factory in Twisted 14.0.0 (which we require) is
         # BrowserLikePolicyForHTTPS which will do regular cert validation
         # 'like a browser'
         self.agent = Agent(
             reactor,
             connectTimeout=15,
-            contextFactory=hs.get_http_client_context_factory()
+            contextFactory=hs.get_http_client_context_factory(),
+            pool=pool,
         )
         self.user_agent = hs.version_string
         self.clock = hs.get_clock()
diff --git a/synapse/http/endpoint.py b/synapse/http/endpoint.py
index e2b99ef3bd..87639b9151 100644
--- a/synapse/http/endpoint.py
+++ b/synapse/http/endpoint.py
@@ -357,8 +357,7 @@ def _get_hosts_for_srv_record(dns_client, host):
     def eb(res, record_type):
         if res.check(DNSNameError):
             return []
-        logger.warn("Error looking up %s for %s: %s",
-                    record_type, host, res, res.value)
+        logger.warn("Error looking up %s for %s: %s", record_type, host, res)
         return res
 
     # no logcontexts here, so we can safely fire these off and gatherResults
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 833496b72d..9145405cb0 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -27,7 +27,7 @@ import synapse.metrics
 from canonicaljson import encode_canonical_json
 
 from synapse.api.errors import (
-    SynapseError, Codes, HttpResponseException,
+    SynapseError, Codes, HttpResponseException, FederationDeniedError,
 )
 
 from signedjson.sign import sign_json
@@ -123,11 +123,22 @@ class MatrixFederationHttpClient(object):
 
             Fails with ``HTTPRequestException``: if we get an HTTP response
                 code >= 300.
+
             Fails with ``NotRetryingDestination`` if we are not yet ready
                 to retry this server.
+
+            Fails with ``FederationDeniedError`` if this destination
+                is not on our federation whitelist
+
             (May also fail with plenty of other Exceptions for things like DNS
                 failures, connection failures, SSL failures.)
         """
+        if (
+            self.hs.config.federation_domain_whitelist and
+            destination not in self.hs.config.federation_domain_whitelist
+        ):
+            raise FederationDeniedError(destination)
+
         limiter = yield synapse.util.retryutils.get_retry_limiter(
             destination,
             self.clock,
@@ -308,6 +319,9 @@ class MatrixFederationHttpClient(object):
 
             Fails with ``NotRetryingDestination`` if we are not yet ready
             to retry this server.
+
+            Fails with ``FederationDeniedError`` if this destination
+            is not on our federation whitelist
         """
 
         if not json_data_callback:
@@ -368,6 +382,9 @@ class MatrixFederationHttpClient(object):
 
             Fails with ``NotRetryingDestination`` if we are not yet ready
             to retry this server.
+
+            Fails with ``FederationDeniedError`` if this destination
+            is not on our federation whitelist
         """
 
         def body_callback(method, url_bytes, headers_dict):
@@ -422,6 +439,9 @@ class MatrixFederationHttpClient(object):
 
             Fails with ``NotRetryingDestination`` if we are not yet ready
             to retry this server.
+
+            Fails with ``FederationDeniedError`` if this destination
+            is not on our federation whitelist
         """
         logger.debug("get_json args: %s", args)
 
@@ -475,6 +495,9 @@ class MatrixFederationHttpClient(object):
 
             Fails with ``NotRetryingDestination`` if we are not yet ready
             to retry this server.
+
+            Fails with ``FederationDeniedError`` if this destination
+            is not on our federation whitelist
         """
 
         response = yield self._request(
@@ -518,6 +541,9 @@ class MatrixFederationHttpClient(object):
 
             Fails with ``NotRetryingDestination`` if we are not yet ready
             to retry this server.
+
+            Fails with ``FederationDeniedError`` if this destination
+            is not on our federation whitelist
         """
 
         encoded_args = {}
diff --git a/synapse/http/server.py b/synapse/http/server.py
index 269b65ca41..165c684d0d 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -93,6 +93,8 @@ response_db_txn_count = metrics.register_counter(
     ),
 )
 
+# seconds spent waiting for db txns, excluding scheduling time, when processing
+# this request
 response_db_txn_duration = metrics.register_counter(
     "response_db_txn_duration_seconds", labels=["method", "servlet", "tag"],
     alternative_names=(
@@ -100,6 +102,10 @@ response_db_txn_duration = metrics.register_counter(
     ),
 )
 
+# seconds spent waiting for a db connection, when processing this request
+response_db_sched_duration = metrics.register_counter(
+    "response_db_sched_duration_seconds", labels=["method", "servlet", "tag"]
+)
 
 _next_request_id = 0
 
@@ -316,15 +322,6 @@ class JsonResource(HttpServer, resource.Resource):
 
     def _send_response(self, request, code, response_json_object,
                        response_code_message=None):
-        # could alternatively use request.notifyFinish() and flip a flag when
-        # the Deferred fires, but since the flag is RIGHT THERE it seems like
-        # a waste.
-        if request._disconnected:
-            logger.warn(
-                "Not sending response to request %s, already disconnected.",
-                request)
-            return
-
         outgoing_responses_counter.inc(request.method, str(code))
 
         # TODO: Only enable CORS for the requests that need it.
@@ -377,7 +374,10 @@ class RequestMetrics(object):
             context.db_txn_count, request.method, self.name, tag
         )
         response_db_txn_duration.inc_by(
-            context.db_txn_duration, request.method, self.name, tag
+            context.db_txn_duration_ms / 1000., request.method, self.name, tag
+        )
+        response_db_sched_duration.inc_by(
+            context.db_sched_duration_ms / 1000., request.method, self.name, tag
         )
 
 
@@ -400,6 +400,15 @@ class RootRedirect(resource.Resource):
 def respond_with_json(request, code, json_object, send_cors=False,
                       response_code_message=None, pretty_print=False,
                       version_string="", canonical_json=True):
+    # could alternatively use request.notifyFinish() and flip a flag when
+    # the Deferred fires, but since the flag is RIGHT THERE it seems like
+    # a waste.
+    if request._disconnected:
+        logger.warn(
+            "Not sending response to request %s, already disconnected.",
+            request)
+        return
+
     if pretty_print:
         json_bytes = encode_pretty_printed_json(json_object) + "\n"
     else:
diff --git a/synapse/http/site.py b/synapse/http/site.py
index cd1492b1c3..e422c8dfae 100644
--- a/synapse/http/site.py
+++ b/synapse/http/site.py
@@ -66,14 +66,15 @@ class SynapseRequest(Request):
             context = LoggingContext.current_context()
             ru_utime, ru_stime = context.get_resource_usage()
             db_txn_count = context.db_txn_count
-            db_txn_duration = context.db_txn_duration
+            db_txn_duration_ms = context.db_txn_duration_ms
+            db_sched_duration_ms = context.db_sched_duration_ms
         except Exception:
             ru_utime, ru_stime = (0, 0)
-            db_txn_count, db_txn_duration = (0, 0)
+            db_txn_count, db_txn_duration_ms = (0, 0)
 
         self.site.access_logger.info(
             "%s - %s - {%s}"
-            " Processed request: %dms (%dms, %dms) (%dms/%d)"
+            " Processed request: %dms (%dms, %dms) (%dms/%dms/%d)"
             " %sB %s \"%s %s %s\" \"%s\"",
             self.getClientIP(),
             self.site.site_tag,
@@ -81,7 +82,8 @@ class SynapseRequest(Request):
             int(time.time() * 1000) - self.start_time,
             int(ru_utime * 1000),
             int(ru_stime * 1000),
-            int(db_txn_duration * 1000),
+            db_sched_duration_ms,
+            db_txn_duration_ms,
             int(db_txn_count),
             self.sentLength,
             self.code,
diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py
index 2265e6e8d6..e0cfb7d08f 100644
--- a/synapse/metrics/__init__.py
+++ b/synapse/metrics/__init__.py
@@ -146,10 +146,15 @@ def runUntilCurrentTimer(func):
             num_pending += 1
 
         num_pending += len(reactor.threadCallQueue)
-
         start = time.time() * 1000
         ret = func(*args, **kwargs)
         end = time.time() * 1000
+
+        # record the amount of wallclock time spent running pending calls.
+        # This is a proxy for the actual amount of time between reactor polls,
+        # since about 25% of time is actually spent running things triggered by
+        # I/O events, but that is harder to capture without rewriting half the
+        # reactor.
         tick_time.inc_by(end - start)
         pending_calls_metric.inc_by(num_pending)
 
diff --git a/synapse/metrics/metric.py b/synapse/metrics/metric.py
index f480aae614..1e783e5ff4 100644
--- a/synapse/metrics/metric.py
+++ b/synapse/metrics/metric.py
@@ -15,6 +15,9 @@
 
 
 from itertools import chain
+import logging
+
+logger = logging.getLogger(__name__)
 
 
 def flatten(items):
@@ -153,7 +156,11 @@ class CallbackMetric(BaseMetric):
         self.callback = callback
 
     def render(self):
-        value = self.callback()
+        try:
+            value = self.callback()
+        except Exception:
+            logger.exception("Failed to render %s", self.name)
+            return ["# FAILED to render " + self.name]
 
         if self.is_scalar():
             return list(self._render_for_labels([], value))
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index c16f61452c..2cbac571b8 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -13,21 +13,30 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-from synapse.push import PusherConfigException
+import logging
 
 from twisted.internet import defer, reactor
 from twisted.internet.error import AlreadyCalled, AlreadyCancelled
 
-import logging
 import push_rule_evaluator
 import push_tools
-
+import synapse
+from synapse.push import PusherConfigException
 from synapse.util.logcontext import LoggingContext
 from synapse.util.metrics import Measure
 
 logger = logging.getLogger(__name__)
 
+metrics = synapse.metrics.get_metrics_for(__name__)
+
+http_push_processed_counter = metrics.register_counter(
+    "http_pushes_processed",
+)
+
+http_push_failed_counter = metrics.register_counter(
+    "http_pushes_failed",
+)
+
 
 class HttpPusher(object):
     INITIAL_BACKOFF_SEC = 1  # in seconds because that's what Twisted takes
@@ -152,9 +161,16 @@ class HttpPusher(object):
             self.user_id, self.last_stream_ordering, self.max_stream_ordering
         )
 
+        logger.info(
+            "Processing %i unprocessed push actions for %s starting at "
+            "stream_ordering %s",
+            len(unprocessed), self.name, self.last_stream_ordering,
+        )
+
         for push_action in unprocessed:
             processed = yield self._process_one(push_action)
             if processed:
+                http_push_processed_counter.inc()
                 self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
                 self.last_stream_ordering = push_action['stream_ordering']
                 yield self.store.update_pusher_last_stream_ordering_and_success(
@@ -169,6 +185,7 @@ class HttpPusher(object):
                         self.failing_since
                     )
             else:
+                http_push_failed_counter.inc()
                 if not self.failing_since:
                     self.failing_since = self.clock.time_msec()
                     yield self.store.update_pusher_failing_since(
@@ -316,7 +333,10 @@ class HttpPusher(object):
         try:
             resp = yield self.http_client.post_json_get_json(self.url, notification_dict)
         except Exception:
-            logger.warn("Failed to push %s ", self.url)
+            logger.warn(
+                "Failed to push event %s to %s",
+                event.event_id, self.name, exc_info=True,
+            )
             defer.returnValue(False)
         rejected = []
         if 'rejected' in resp:
@@ -325,7 +345,7 @@ class HttpPusher(object):
 
     @defer.inlineCallbacks
     def _send_badge(self, badge):
-        logger.info("Sending updated badge count %d to %r", badge, self.user_id)
+        logger.info("Sending updated badge count %d to %s", badge, self.name)
         d = {
             'notification': {
                 'id': '',
@@ -347,7 +367,10 @@ class HttpPusher(object):
         try:
             resp = yield self.http_client.post_json_get_json(self.url, d)
         except Exception:
-            logger.exception("Failed to push %s ", self.url)
+            logger.warn(
+                "Failed to send badge count to %s",
+                self.name, exc_info=True,
+            )
             defer.returnValue(False)
         rejected = []
         if 'rejected' in resp:
diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py
index 5022808ea9..0615e5d807 100644
--- a/synapse/rest/client/v1/admin.py
+++ b/synapse/rest/client/v1/admin.py
@@ -289,6 +289,27 @@ class QuarantineMediaInRoom(ClientV1RestServlet):
         defer.returnValue((200, {"num_quarantined": num_quarantined}))
 
 
+class ListMediaInRoom(ClientV1RestServlet):
+    """Lists all of the media in a given room.
+    """
+    PATTERNS = client_path_patterns("/admin/room/(?P<room_id>[^/]+)/media")
+
+    def __init__(self, hs):
+        super(ListMediaInRoom, self).__init__(hs)
+        self.store = hs.get_datastore()
+
+    @defer.inlineCallbacks
+    def on_GET(self, request, room_id):
+        requester = yield self.auth.get_user_by_req(request)
+        is_admin = yield self.auth.is_server_admin(requester.user)
+        if not is_admin:
+            raise AuthError(403, "You are not a server admin")
+
+        local_mxcs, remote_mxcs = yield self.store.get_media_mxcs_in_room(room_id)
+
+        defer.returnValue((200, {"local": local_mxcs, "remote": remote_mxcs}))
+
+
 class ResetPasswordRestServlet(ClientV1RestServlet):
     """Post request to allow an administrator reset password for a user.
     This needs user to have administrator access in Synapse.
@@ -487,3 +508,4 @@ def register_servlets(hs, http_server):
     SearchUsersRestServlet(hs).register(http_server)
     ShutdownRoomRestServlet(hs).register(http_server)
     QuarantineMediaInRoom(hs).register(http_server)
+    ListMediaInRoom(hs).register(http_server)
diff --git a/synapse/rest/client/v1/register.py b/synapse/rest/client/v1/register.py
index 32ed1d3ab2..5c5fa8f7ab 100644
--- a/synapse/rest/client/v1/register.py
+++ b/synapse/rest/client/v1/register.py
@@ -70,10 +70,15 @@ class RegisterRestServlet(ClientV1RestServlet):
         self.handlers = hs.get_handlers()
 
     def on_GET(self, request):
+
+        require_email = 'email' in self.hs.config.registrations_require_3pid
+        require_msisdn = 'msisdn' in self.hs.config.registrations_require_3pid
+
+        flows = []
         if self.hs.config.enable_registration_captcha:
-            return (
-                200,
-                {"flows": [
+            # only support the email-only flow if we don't require MSISDN 3PIDs
+            if not require_msisdn:
+                flows.extend([
                     {
                         "type": LoginType.RECAPTCHA,
                         "stages": [
@@ -82,27 +87,34 @@ class RegisterRestServlet(ClientV1RestServlet):
                             LoginType.PASSWORD
                         ]
                     },
+                ])
+            # only support 3PIDless registration if no 3PIDs are required
+            if not require_email and not require_msisdn:
+                flows.extend([
                     {
                         "type": LoginType.RECAPTCHA,
                         "stages": [LoginType.RECAPTCHA, LoginType.PASSWORD]
                     }
-                ]}
-            )
+                ])
         else:
-            return (
-                200,
-                {"flows": [
+            # only support the email-only flow if we don't require MSISDN 3PIDs
+            if require_email or not require_msisdn:
+                flows.extend([
                     {
                         "type": LoginType.EMAIL_IDENTITY,
                         "stages": [
                             LoginType.EMAIL_IDENTITY, LoginType.PASSWORD
                         ]
-                    },
+                    }
+                ])
+            # only support 3PIDless registration if no 3PIDs are required
+            if not require_email and not require_msisdn:
+                flows.extend([
                     {
                         "type": LoginType.PASSWORD
                     }
-                ]}
-            )
+                ])
+        return (200, {"flows": flows})
 
     @defer.inlineCallbacks
     def on_POST(self, request):
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index 682a0af9fc..867ec8602c 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -195,15 +195,20 @@ class RoomSendEventRestServlet(ClientV1RestServlet):
         requester = yield self.auth.get_user_by_req(request, allow_guest=True)
         content = parse_json_object_from_request(request)
 
+        event_dict = {
+            "type": event_type,
+            "content": content,
+            "room_id": room_id,
+            "sender": requester.user.to_string(),
+        }
+
+        if 'ts' in request.args and requester.app_service:
+            event_dict['origin_server_ts'] = parse_integer(request, "ts", 0)
+
         msg_handler = self.handlers.message_handler
         event = yield msg_handler.create_and_send_nonmember_event(
             requester,
-            {
-                "type": event_type,
-                "content": content,
-                "room_id": room_id,
-                "sender": requester.user.to_string(),
-            },
+            event_dict,
             txn_id=txn_id,
         )
 
diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py
index 385a3ad2ec..30523995af 100644
--- a/synapse/rest/client/v2_alpha/account.py
+++ b/synapse/rest/client/v2_alpha/account.py
@@ -26,6 +26,7 @@ from synapse.http.servlet import (
 )
 from synapse.util.async import run_on_reactor
 from synapse.util.msisdn import phone_number_to_msisdn
+from synapse.util.threepids import check_3pid_allowed
 from ._base import client_v2_patterns, interactive_auth_handler
 
 logger = logging.getLogger(__name__)
@@ -47,6 +48,11 @@ class EmailPasswordRequestTokenRestServlet(RestServlet):
             'id_server', 'client_secret', 'email', 'send_attempt'
         ])
 
+        if not check_3pid_allowed(self.hs, "email", body['email']):
+            raise SynapseError(
+                403, "Third party identifier is not allowed", Codes.THREEPID_DENIED,
+            )
+
         existingUid = yield self.hs.get_datastore().get_user_id_by_threepid(
             'email', body['email']
         )
@@ -78,6 +84,11 @@ class MsisdnPasswordRequestTokenRestServlet(RestServlet):
 
         msisdn = phone_number_to_msisdn(body['country'], body['phone_number'])
 
+        if not check_3pid_allowed(self.hs, "msisdn", msisdn):
+            raise SynapseError(
+                403, "Third party identifier is not allowed", Codes.THREEPID_DENIED,
+            )
+
         existingUid = yield self.datastore.get_user_id_by_threepid(
             'msisdn', msisdn
         )
@@ -217,6 +228,11 @@ class EmailThreepidRequestTokenRestServlet(RestServlet):
         if absent:
             raise SynapseError(400, "Missing params: %r" % absent, Codes.MISSING_PARAM)
 
+        if not check_3pid_allowed(self.hs, "email", body['email']):
+            raise SynapseError(
+                403, "Third party identifier is not allowed", Codes.THREEPID_DENIED,
+            )
+
         existingUid = yield self.datastore.get_user_id_by_threepid(
             'email', body['email']
         )
@@ -255,6 +271,11 @@ class MsisdnThreepidRequestTokenRestServlet(RestServlet):
 
         msisdn = phone_number_to_msisdn(body['country'], body['phone_number'])
 
+        if not check_3pid_allowed(self.hs, "msisdn", msisdn):
+            raise SynapseError(
+                403, "Third party identifier is not allowed", Codes.THREEPID_DENIED,
+            )
+
         existingUid = yield self.datastore.get_user_id_by_threepid(
             'msisdn', msisdn
         )
diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py
index e9d88a8895..c6f4680a76 100644
--- a/synapse/rest/client/v2_alpha/register.py
+++ b/synapse/rest/client/v2_alpha/register.py
@@ -26,6 +26,7 @@ from synapse.http.servlet import (
     RestServlet, parse_json_object_from_request, assert_params_in_request, parse_string
 )
 from synapse.util.msisdn import phone_number_to_msisdn
+from synapse.util.threepids import check_3pid_allowed
 
 from ._base import client_v2_patterns, interactive_auth_handler
 
@@ -70,6 +71,11 @@ class EmailRegisterRequestTokenRestServlet(RestServlet):
             'id_server', 'client_secret', 'email', 'send_attempt'
         ])
 
+        if not check_3pid_allowed(self.hs, "email", body['email']):
+            raise SynapseError(
+                403, "Third party identifier is not allowed", Codes.THREEPID_DENIED,
+            )
+
         existingUid = yield self.hs.get_datastore().get_user_id_by_threepid(
             'email', body['email']
         )
@@ -105,6 +111,11 @@ class MsisdnRegisterRequestTokenRestServlet(RestServlet):
 
         msisdn = phone_number_to_msisdn(body['country'], body['phone_number'])
 
+        if not check_3pid_allowed(self.hs, "msisdn", msisdn):
+            raise SynapseError(
+                403, "Third party identifier is not allowed", Codes.THREEPID_DENIED,
+            )
+
         existingUid = yield self.hs.get_datastore().get_user_id_by_threepid(
             'msisdn', msisdn
         )
@@ -305,31 +316,67 @@ class RegisterRestServlet(RestServlet):
         if 'x_show_msisdn' in body and body['x_show_msisdn']:
             show_msisdn = True
 
+        # FIXME: need a better error than "no auth flow found" for scenarios
+        # where we required 3PID for registration but the user didn't give one
+        require_email = 'email' in self.hs.config.registrations_require_3pid
+        require_msisdn = 'msisdn' in self.hs.config.registrations_require_3pid
+
+        flows = []
         if self.hs.config.enable_registration_captcha:
-            flows = [
-                [LoginType.RECAPTCHA],
-                [LoginType.EMAIL_IDENTITY, LoginType.RECAPTCHA],
-            ]
+            # only support 3PIDless registration if no 3PIDs are required
+            if not require_email and not require_msisdn:
+                flows.extend([[LoginType.RECAPTCHA]])
+            # only support the email-only flow if we don't require MSISDN 3PIDs
+            if not require_msisdn:
+                flows.extend([[LoginType.EMAIL_IDENTITY, LoginType.RECAPTCHA]])
+
             if show_msisdn:
+                # only support the MSISDN-only flow if we don't require email 3PIDs
+                if not require_email:
+                    flows.extend([[LoginType.MSISDN, LoginType.RECAPTCHA]])
+                # always let users provide both MSISDN & email
                 flows.extend([
-                    [LoginType.MSISDN, LoginType.RECAPTCHA],
                     [LoginType.MSISDN, LoginType.EMAIL_IDENTITY, LoginType.RECAPTCHA],
                 ])
         else:
-            flows = [
-                [LoginType.DUMMY],
-                [LoginType.EMAIL_IDENTITY],
-            ]
+            # only support 3PIDless registration if no 3PIDs are required
+            if not require_email and not require_msisdn:
+                flows.extend([[LoginType.DUMMY]])
+            # only support the email-only flow if we don't require MSISDN 3PIDs
+            if not require_msisdn:
+                flows.extend([[LoginType.EMAIL_IDENTITY]])
+
             if show_msisdn:
+                # only support the MSISDN-only flow if we don't require email 3PIDs
+                if not require_email or require_msisdn:
+                    flows.extend([[LoginType.MSISDN]])
+                # always let users provide both MSISDN & email
                 flows.extend([
-                    [LoginType.MSISDN],
-                    [LoginType.MSISDN, LoginType.EMAIL_IDENTITY],
+                    [LoginType.MSISDN, LoginType.EMAIL_IDENTITY]
                 ])
 
         auth_result, params, session_id = yield self.auth_handler.check_auth(
             flows, body, self.hs.get_ip_from_request(request)
         )
 
+        # Check that we're not trying to register a denied 3pid.
+        #
+        # the user-facing checks will probably already have happened in
+        # /register/email/requestToken when we requested a 3pid, but that's not
+        # guaranteed.
+
+        if auth_result:
+            for login_type in [LoginType.EMAIL_IDENTITY, LoginType.MSISDN]:
+                if login_type in auth_result:
+                    medium = auth_result[login_type]['medium']
+                    address = auth_result[login_type]['address']
+
+                    if not check_3pid_allowed(self.hs, medium, address):
+                        raise SynapseError(
+                            403, "Third party identifier is not allowed",
+                            Codes.THREEPID_DENIED,
+                        )
+
         if registered_user_id is not None:
             logger.info(
                 "Already registered user ID %r for this session",
diff --git a/synapse/rest/key/v2/remote_key_resource.py b/synapse/rest/key/v2/remote_key_resource.py
index cc2842aa72..17e6079cba 100644
--- a/synapse/rest/key/v2/remote_key_resource.py
+++ b/synapse/rest/key/v2/remote_key_resource.py
@@ -93,6 +93,7 @@ class RemoteKey(Resource):
         self.store = hs.get_datastore()
         self.version_string = hs.version_string
         self.clock = hs.get_clock()
+        self.federation_domain_whitelist = hs.config.federation_domain_whitelist
 
     def render_GET(self, request):
         self.async_render_GET(request)
@@ -137,6 +138,13 @@ class RemoteKey(Resource):
         logger.info("Handling query for keys %r", query)
         store_queries = []
         for server_name, key_ids in query.items():
+            if (
+                self.federation_domain_whitelist is not None and
+                server_name not in self.federation_domain_whitelist
+            ):
+                logger.debug("Federation denied with %s", server_name)
+                continue
+
             if not key_ids:
                 key_ids = (None,)
             for key_id in key_ids:
diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py
index 578d7f07c5..bb79599379 100644
--- a/synapse/rest/media/v1/media_repository.py
+++ b/synapse/rest/media/v1/media_repository.py
@@ -27,15 +27,14 @@ from .identicon_resource import IdenticonResource
 from .preview_url_resource import PreviewUrlResource
 from .filepath import MediaFilePaths
 from .thumbnailer import Thumbnailer
-from .storage_provider import (
-    StorageProviderWrapper, FileStorageProviderBackend,
-)
+from .storage_provider import StorageProviderWrapper
 from .media_storage import MediaStorage
 
 from synapse.http.matrixfederationclient import MatrixFederationHttpClient
 from synapse.util.stringutils import random_string
-from synapse.api.errors import SynapseError, HttpResponseException, \
-    NotFoundError
+from synapse.api.errors import (
+    SynapseError, HttpResponseException, NotFoundError, FederationDeniedError,
+)
 
 from synapse.util.async import Linearizer
 from synapse.util.stringutils import is_ascii
@@ -53,7 +52,7 @@ import urlparse
 logger = logging.getLogger(__name__)
 
 
-UPDATE_RECENTLY_ACCESSED_REMOTES_TS = 60 * 1000
+UPDATE_RECENTLY_ACCESSED_TS = 60 * 1000
 
 
 class MediaRepository(object):
@@ -75,22 +74,21 @@ class MediaRepository(object):
         self.remote_media_linearizer = Linearizer(name="media_remote")
 
         self.recently_accessed_remotes = set()
+        self.recently_accessed_locals = set()
+
+        self.federation_domain_whitelist = hs.config.federation_domain_whitelist
 
         # List of StorageProviders where we should search for media and
         # potentially upload to.
         storage_providers = []
 
-        # TODO: Move this into config and allow other storage providers to be
-        # defined.
-        if hs.config.backup_media_store_path:
-            backend = FileStorageProviderBackend(
-                self.primary_base_path, hs.config.backup_media_store_path,
-            )
+        for clz, provider_config, wrapper_config in hs.config.media_storage_providers:
+            backend = clz(hs, provider_config)
             provider = StorageProviderWrapper(
                 backend,
-                store=True,
-                store_synchronous=hs.config.synchronous_backup_media_store,
-                store_remote=True,
+                store_local=wrapper_config.store_local,
+                store_remote=wrapper_config.store_remote,
+                store_synchronous=wrapper_config.store_synchronous,
             )
             storage_providers.append(provider)
 
@@ -99,19 +97,34 @@ class MediaRepository(object):
         )
 
         self.clock.looping_call(
-            self._update_recently_accessed_remotes,
-            UPDATE_RECENTLY_ACCESSED_REMOTES_TS
+            self._update_recently_accessed,
+            UPDATE_RECENTLY_ACCESSED_TS,
         )
 
     @defer.inlineCallbacks
-    def _update_recently_accessed_remotes(self):
-        media = self.recently_accessed_remotes
+    def _update_recently_accessed(self):
+        remote_media = self.recently_accessed_remotes
         self.recently_accessed_remotes = set()
 
+        local_media = self.recently_accessed_locals
+        self.recently_accessed_locals = set()
+
         yield self.store.update_cached_last_access_time(
-            media, self.clock.time_msec()
+            local_media, remote_media, self.clock.time_msec()
         )
 
+    def mark_recently_accessed(self, server_name, media_id):
+        """Mark the given media as recently accessed.
+
+        Args:
+            server_name (str|None): Origin server of media, or None if local
+            media_id (str): The media ID of the content
+        """
+        if server_name:
+            self.recently_accessed_remotes.add((server_name, media_id))
+        else:
+            self.recently_accessed_locals.add(media_id)
+
     @defer.inlineCallbacks
     def create_content(self, media_type, upload_name, content, content_length,
                        auth_user):
@@ -173,6 +186,8 @@ class MediaRepository(object):
             respond_404(request)
             return
 
+        self.mark_recently_accessed(None, media_id)
+
         media_type = media_info["media_type"]
         media_length = media_info["media_length"]
         upload_name = name if name else media_info["upload_name"]
@@ -204,7 +219,13 @@ class MediaRepository(object):
             Deferred: Resolves once a response has successfully been written
                 to request
         """
-        self.recently_accessed_remotes.add((server_name, media_id))
+        if (
+            self.federation_domain_whitelist is not None and
+            server_name not in self.federation_domain_whitelist
+        ):
+            raise FederationDeniedError(server_name)
+
+        self.mark_recently_accessed(server_name, media_id)
 
         # We linearize here to ensure that we don't try and download remote
         # media multiple times concurrently
@@ -238,6 +259,12 @@ class MediaRepository(object):
         Returns:
             Deferred[dict]: The media_info of the file
         """
+        if (
+            self.federation_domain_whitelist is not None and
+            server_name not in self.federation_domain_whitelist
+        ):
+            raise FederationDeniedError(server_name)
+
         # We linearize here to ensure that we don't try and download remote
         # media multiple times concurrently
         key = (server_name, media_id)
diff --git a/synapse/rest/media/v1/media_storage.py b/synapse/rest/media/v1/media_storage.py
index 5c3e4e5a65..e8e8b3986d 100644
--- a/synapse/rest/media/v1/media_storage.py
+++ b/synapse/rest/media/v1/media_storage.py
@@ -197,6 +197,14 @@ class MediaStorage(object):
             str
         """
         if file_info.url_cache:
+            if file_info.thumbnail:
+                return self.filepaths.url_cache_thumbnail_rel(
+                    media_id=file_info.file_id,
+                    width=file_info.thumbnail_width,
+                    height=file_info.thumbnail_height,
+                    content_type=file_info.thumbnail_type,
+                    method=file_info.thumbnail_method,
+                )
             return self.filepaths.url_cache_filepath_rel(file_info.file_id)
 
         if file_info.server_name:
diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py
index 981f01e417..31fe7aa75c 100644
--- a/synapse/rest/media/v1/preview_url_resource.py
+++ b/synapse/rest/media/v1/preview_url_resource.py
@@ -12,6 +12,19 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import cgi
+import datetime
+import errno
+import fnmatch
+import itertools
+import logging
+import os
+import re
+import shutil
+import sys
+import traceback
+import ujson as json
+import urlparse
 
 from twisted.web.server import NOT_DONE_YET
 from twisted.internet import defer
@@ -33,18 +46,6 @@ from synapse.http.server import (
 from synapse.util.async import ObservableDeferred
 from synapse.util.stringutils import is_ascii
 
-import os
-import re
-import fnmatch
-import cgi
-import ujson as json
-import urlparse
-import itertools
-import datetime
-import errno
-import shutil
-
-import logging
 logger = logging.getLogger(__name__)
 
 
@@ -286,17 +287,28 @@ class PreviewUrlResource(Resource):
             url_cache=True,
         )
 
-        try:
-            with self.media_storage.store_into_file(file_info) as (f, fname, finish):
+        with self.media_storage.store_into_file(file_info) as (f, fname, finish):
+            try:
                 logger.debug("Trying to get url '%s'" % url)
                 length, headers, uri, code = yield self.client.get_file(
                     url, output_stream=f, max_size=self.max_spider_size,
                 )
+            except Exception as e:
                 # FIXME: pass through 404s and other error messages nicely
+                logger.warn("Error downloading %s: %r", url, e)
+                raise SynapseError(
+                    500, "Failed to download content: %s" % (
+                        traceback.format_exception_only(sys.exc_type, e),
+                    ),
+                    Codes.UNKNOWN,
+                )
+            yield finish()
 
-                yield finish()
-
-            media_type = headers["Content-Type"][0]
+        try:
+            if "Content-Type" in headers:
+                media_type = headers["Content-Type"][0]
+            else:
+                media_type = "application/octet-stream"
             time_now_ms = self.clock.time_msec()
 
             content_disposition = headers.get("Content-Disposition", None)
@@ -336,10 +348,11 @@ class PreviewUrlResource(Resource):
             )
 
         except Exception as e:
-            raise SynapseError(
-                500, ("Failed to download content: %s" % e),
-                Codes.UNKNOWN
-            )
+            logger.error("Error handling downloaded %s: %r", url, e)
+            # TODO: we really ought to delete the downloaded file in this
+            # case, since we won't have recorded it in the db, and will
+            # therefore not expire it.
+            raise
 
         defer.returnValue({
             "media_type": media_type,
diff --git a/synapse/rest/media/v1/storage_provider.py b/synapse/rest/media/v1/storage_provider.py
index 2ad602e101..c188192f2b 100644
--- a/synapse/rest/media/v1/storage_provider.py
+++ b/synapse/rest/media/v1/storage_provider.py
@@ -17,6 +17,7 @@ from twisted.internet import defer, threads
 
 from .media_storage import FileResponder
 
+from synapse.config._base import Config
 from synapse.util.logcontext import preserve_fn
 
 import logging
@@ -64,19 +65,19 @@ class StorageProviderWrapper(StorageProvider):
 
     Args:
         backend (StorageProvider)
-        store (bool): Whether to store new files or not.
+        store_local (bool): Whether to store new local files or not.
         store_synchronous (bool): Whether to wait for file to be successfully
             uploaded, or todo the upload in the backgroud.
         store_remote (bool): Whether remote media should be uploaded
     """
-    def __init__(self, backend, store, store_synchronous, store_remote):
+    def __init__(self, backend, store_local, store_synchronous, store_remote):
         self.backend = backend
-        self.store = store
+        self.store_local = store_local
         self.store_synchronous = store_synchronous
         self.store_remote = store_remote
 
     def store_file(self, path, file_info):
-        if not self.store:
+        if not file_info.server_name and not self.store_local:
             return defer.succeed(None)
 
         if file_info.server_name and not self.store_remote:
@@ -97,13 +98,13 @@ class FileStorageProviderBackend(StorageProvider):
     """A storage provider that stores files in a directory on a filesystem.
 
     Args:
-        cache_directory (str): Base path of the local media repository
-        base_directory (str): Base path to store new files
+        hs (HomeServer)
+        config: The config returned by `parse_config`.
     """
 
-    def __init__(self, cache_directory, base_directory):
-        self.cache_directory = cache_directory
-        self.base_directory = base_directory
+    def __init__(self, hs, config):
+        self.cache_directory = hs.config.media_store_path
+        self.base_directory = config
 
     def store_file(self, path, file_info):
         """See StorageProvider.store_file"""
@@ -125,3 +126,15 @@ class FileStorageProviderBackend(StorageProvider):
         backup_fname = os.path.join(self.base_directory, path)
         if os.path.isfile(backup_fname):
             return FileResponder(open(backup_fname, "rb"))
+
+    @staticmethod
+    def parse_config(config):
+        """Called on startup to parse config supplied. This should parse
+        the config and raise if there is a problem.
+
+        The returned value is passed into the constructor.
+
+        In this case we only care about a single param, the directory, so let's
+        just pull that out.
+        """
+        return Config.ensure_directory(config["directory"])
diff --git a/synapse/rest/media/v1/thumbnail_resource.py b/synapse/rest/media/v1/thumbnail_resource.py
index 49e4af514c..58ada49711 100644
--- a/synapse/rest/media/v1/thumbnail_resource.py
+++ b/synapse/rest/media/v1/thumbnail_resource.py
@@ -67,6 +67,7 @@ class ThumbnailResource(Resource):
                 yield self._respond_local_thumbnail(
                     request, media_id, width, height, method, m_type
                 )
+            self.media_repo.mark_recently_accessed(None, media_id)
         else:
             if self.dynamic_thumbnails:
                 yield self._select_or_generate_remote_thumbnail(
@@ -78,6 +79,7 @@ class ThumbnailResource(Resource):
                     request, server_name, media_id,
                     width, height, method, m_type
                 )
+            self.media_repo.mark_recently_accessed(server_name, media_id)
 
     @defer.inlineCallbacks
     def _respond_local_thumbnail(self, request, media_id, width, height,
diff --git a/synapse/server.py b/synapse/server.py
index 99693071b6..3173aed1d0 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -66,7 +66,7 @@ from synapse.rest.media.v1.media_repository import (
     MediaRepository,
     MediaRepositoryResource,
 )
-from synapse.state import StateHandler
+from synapse.state import StateHandler, StateResolutionHandler
 from synapse.storage import DataStore
 from synapse.streams.events import EventSources
 from synapse.util import Clock
@@ -102,6 +102,7 @@ class HomeServer(object):
         'v1auth',
         'auth',
         'state_handler',
+        'state_resolution_handler',
         'presence_handler',
         'sync_handler',
         'typing_handler',
@@ -224,6 +225,9 @@ class HomeServer(object):
     def build_state_handler(self):
         return StateHandler(self)
 
+    def build_state_resolution_handler(self):
+        return StateResolutionHandler(self)
+
     def build_presence_handler(self):
         return PresenceHandler(self)
 
@@ -307,6 +311,23 @@ class HomeServer(object):
             **self.db_config.get("args", {})
         )
 
+    def get_db_conn(self, run_new_connection=True):
+        """Makes a new connection to the database, skipping the db pool
+
+        Returns:
+            Connection: a connection object implementing the PEP-249 spec
+        """
+        # Any param beginning with cp_ is a parameter for adbapi, and should
+        # not be passed to the database engine.
+        db_params = {
+            k: v for k, v in self.db_config.get("args", {}).items()
+            if not k.startswith("cp_")
+        }
+        db_conn = self.database_engine.module.connect(**db_params)
+        if run_new_connection:
+            self.database_engine.on_new_connection(db_conn)
+        return db_conn
+
     def build_media_repository_resource(self):
         # build the media repo resource. This indirects through the HomeServer
         # to ensure that we only have a single instance of
diff --git a/synapse/server.pyi b/synapse/server.pyi
index 41416ef252..c3a9a3847b 100644
--- a/synapse/server.pyi
+++ b/synapse/server.pyi
@@ -34,6 +34,9 @@ class HomeServer(object):
     def get_state_handler(self) -> synapse.state.StateHandler:
         pass
 
+    def get_state_resolution_handler(self) -> synapse.state.StateResolutionHandler:
+        pass
+
     def get_deactivate_account_handler(self) -> synapse.handlers.deactivate_account.DeactivateAccountHandler:
         pass
 
diff --git a/synapse/state.py b/synapse/state.py
index 9e624b4937..273f9911ca 100644
--- a/synapse/state.py
+++ b/synapse/state.py
@@ -58,7 +58,11 @@ class _StateCacheEntry(object):
     __slots__ = ["state", "state_group", "state_id", "prev_group", "delta_ids"]
 
     def __init__(self, state, state_group, prev_group=None, delta_ids=None):
+        # dict[(str, str), str] map  from (type, state_key) to event_id
         self.state = frozendict(state)
+
+        # the ID of a state group if one and only one is involved.
+        # otherwise, None otherwise?
         self.state_group = state_group
 
         self.prev_group = prev_group
@@ -81,31 +85,19 @@ class _StateCacheEntry(object):
 
 
 class StateHandler(object):
-    """ Responsible for doing state conflict resolution.
+    """Fetches bits of state from the stores, and does state resolution
+    where necessary
     """
 
     def __init__(self, hs):
         self.clock = hs.get_clock()
         self.store = hs.get_datastore()
         self.hs = hs
-
-        # dict of set of event_ids -> _StateCacheEntry.
-        self._state_cache = None
-        self.resolve_linearizer = Linearizer(name="state_resolve_lock")
+        self._state_resolution_handler = hs.get_state_resolution_handler()
 
     def start_caching(self):
-        logger.debug("start_caching")
-
-        self._state_cache = ExpiringCache(
-            cache_name="state_cache",
-            clock=self.clock,
-            max_len=SIZE_OF_CACHE,
-            expiry_ms=EVICTION_TIMEOUT_SECONDS * 1000,
-            iterable=True,
-            reset_expiry_on_get=True,
-        )
-
-        self._state_cache.start()
+        # TODO: remove this shim
+        self._state_resolution_handler.start_caching()
 
     @defer.inlineCallbacks
     def get_current_state(self, room_id, event_type=None, state_key="",
@@ -127,7 +119,7 @@ class StateHandler(object):
             latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id)
 
         logger.debug("calling resolve_state_groups from get_current_state")
-        ret = yield self.resolve_state_groups(room_id, latest_event_ids)
+        ret = yield self.resolve_state_groups_for_events(room_id, latest_event_ids)
         state = ret.state
 
         if event_type:
@@ -146,19 +138,27 @@ class StateHandler(object):
         defer.returnValue(state)
 
     @defer.inlineCallbacks
-    def get_current_state_ids(self, room_id, event_type=None, state_key="",
-                              latest_event_ids=None):
+    def get_current_state_ids(self, room_id, latest_event_ids=None):
+        """Get the current state, or the state at a set of events, for a room
+
+        Args:
+            room_id (str):
+
+            latest_event_ids (iterable[str]|None): if given, the forward
+                extremities to resolve. If None, we look them up from the
+                database (via a cache)
+
+        Returns:
+            Deferred[dict[(str, str), str)]]: the state dict, mapping from
+                (event_type, state_key) -> event_id
+        """
         if not latest_event_ids:
             latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id)
 
         logger.debug("calling resolve_state_groups from get_current_state_ids")
-        ret = yield self.resolve_state_groups(room_id, latest_event_ids)
+        ret = yield self.resolve_state_groups_for_events(room_id, latest_event_ids)
         state = ret.state
 
-        if event_type:
-            defer.returnValue(state.get((event_type, state_key)))
-            return
-
         defer.returnValue(state)
 
     @defer.inlineCallbacks
@@ -166,7 +166,7 @@ class StateHandler(object):
         if not latest_event_ids:
             latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id)
         logger.debug("calling resolve_state_groups from get_current_user_in_room")
-        entry = yield self.resolve_state_groups(room_id, latest_event_ids)
+        entry = yield self.resolve_state_groups_for_events(room_id, latest_event_ids)
         joined_users = yield self.store.get_joined_users_from_state(room_id, entry)
         defer.returnValue(joined_users)
 
@@ -175,7 +175,7 @@ class StateHandler(object):
         if not latest_event_ids:
             latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id)
         logger.debug("calling resolve_state_groups from get_current_hosts_in_room")
-        entry = yield self.resolve_state_groups(room_id, latest_event_ids)
+        entry = yield self.resolve_state_groups_for_events(room_id, latest_event_ids)
         joined_hosts = yield self.store.get_joined_hosts(room_id, entry)
         defer.returnValue(joined_hosts)
 
@@ -233,7 +233,7 @@ class StateHandler(object):
             defer.returnValue(context)
 
         logger.debug("calling resolve_state_groups from compute_event_context")
-        entry = yield self.resolve_state_groups(
+        entry = yield self.resolve_state_groups_for_events(
             event.room_id, [e for e, _ in event.prev_events],
         )
 
@@ -275,16 +275,16 @@ class StateHandler(object):
         defer.returnValue(context)
 
     @defer.inlineCallbacks
-    @log_function
-    def resolve_state_groups(self, room_id, event_ids):
+    def resolve_state_groups_for_events(self, room_id, event_ids):
         """ Given a list of event_ids this method fetches the state at each
         event, resolves conflicts between them and returns them.
 
+        Args:
+            room_id (str):
+            event_ids (list[str]):
+
         Returns:
-            a Deferred tuple of (`state_group`, `state`, `prev_state`).
-            `state_group` is the name of a state group if one and only one is
-            involved. `state` is a map from (type, state_key) to event, and
-            `prev_state` is a list of event ids.
+            Deferred[_StateCacheEntry]: resolved state
         """
         logger.debug("resolve_state_groups event_ids %s", event_ids)
 
@@ -295,13 +295,7 @@ class StateHandler(object):
             room_id, event_ids
         )
 
-        logger.debug(
-            "resolve_state_groups state_groups %s",
-            state_groups_ids.keys()
-        )
-
-        group_names = frozenset(state_groups_ids.keys())
-        if len(group_names) == 1:
+        if len(state_groups_ids) == 1:
             name, state_list = state_groups_ids.items().pop()
 
             prev_group, delta_ids = yield self.store.get_state_group_delta(name)
@@ -313,6 +307,92 @@ class StateHandler(object):
                 delta_ids=delta_ids,
             ))
 
+        result = yield self._state_resolution_handler.resolve_state_groups(
+            room_id, state_groups_ids, self._state_map_factory,
+        )
+        defer.returnValue(result)
+
+    def _state_map_factory(self, ev_ids):
+        return self.store.get_events(
+            ev_ids, get_prev_content=False, check_redacted=False,
+        )
+
+    def resolve_events(self, state_sets, event):
+        logger.info(
+            "Resolving state for %s with %d groups", event.room_id, len(state_sets)
+        )
+        state_set_ids = [{
+            (ev.type, ev.state_key): ev.event_id
+            for ev in st
+        } for st in state_sets]
+
+        state_map = {
+            ev.event_id: ev
+            for st in state_sets
+            for ev in st
+        }
+
+        with Measure(self.clock, "state._resolve_events"):
+            new_state = resolve_events_with_state_map(state_set_ids, state_map)
+
+        new_state = {
+            key: state_map[ev_id] for key, ev_id in new_state.items()
+        }
+
+        return new_state
+
+
+class StateResolutionHandler(object):
+    """Responsible for doing state conflict resolution.
+
+    Note that the storage layer depends on this handler, so all functions must
+    be storage-independent.
+    """
+    def __init__(self, hs):
+        self.clock = hs.get_clock()
+
+        # dict of set of event_ids -> _StateCacheEntry.
+        self._state_cache = None
+        self.resolve_linearizer = Linearizer(name="state_resolve_lock")
+
+    def start_caching(self):
+        logger.debug("start_caching")
+
+        self._state_cache = ExpiringCache(
+            cache_name="state_cache",
+            clock=self.clock,
+            max_len=SIZE_OF_CACHE,
+            expiry_ms=EVICTION_TIMEOUT_SECONDS * 1000,
+            iterable=True,
+            reset_expiry_on_get=True,
+        )
+
+        self._state_cache.start()
+
+    @defer.inlineCallbacks
+    @log_function
+    def resolve_state_groups(self, room_id, state_groups_ids, state_map_factory):
+        """Resolves conflicts between a set of state groups
+
+        Always generates a new state group (unless we hit the cache), so should
+        not be called for a single state group
+
+        Args:
+            room_id (str): room we are resolving for (used for logging)
+            state_groups_ids (dict[int, dict[(str, str), str]]):
+                 map from state group id to the state in that state group
+                (where 'state' is a map from state key to event id)
+
+        Returns:
+            Deferred[_StateCacheEntry]: resolved state
+        """
+        logger.debug(
+            "resolve_state_groups state_groups %s",
+            state_groups_ids.keys()
+        )
+
+        group_names = frozenset(state_groups_ids.keys())
+
         with (yield self.resolve_linearizer.queue(group_names)):
             if self._state_cache is not None:
                 cache = self._state_cache.get(group_names, None)
@@ -341,17 +421,19 @@ class StateHandler(object):
             if conflicted_state:
                 logger.info("Resolving conflicted state for %r", room_id)
                 with Measure(self.clock, "state._resolve_events"):
-                    new_state = yield resolve_events(
+                    new_state = yield resolve_events_with_factory(
                         state_groups_ids.values(),
-                        state_map_factory=lambda ev_ids: self.store.get_events(
-                            ev_ids, get_prev_content=False, check_redacted=False,
-                        ),
+                        state_map_factory=state_map_factory,
                     )
             else:
                 new_state = {
                     key: e_ids.pop() for key, e_ids in state.items()
                 }
 
+            # if the new state matches any of the input state groups, we can
+            # use that state group again. Otherwise we will generate a state_id
+            # which will be used as a cache key for future resolutions, but
+            # not get persisted.
             state_group = None
             new_state_event_ids = frozenset(new_state.values())
             for sg, events in state_groups_ids.items():
@@ -388,30 +470,6 @@ class StateHandler(object):
 
             defer.returnValue(cache)
 
-    def resolve_events(self, state_sets, event):
-        logger.info(
-            "Resolving state for %s with %d groups", event.room_id, len(state_sets)
-        )
-        state_set_ids = [{
-            (ev.type, ev.state_key): ev.event_id
-            for ev in st
-        } for st in state_sets]
-
-        state_map = {
-            ev.event_id: ev
-            for st in state_sets
-            for ev in st
-        }
-
-        with Measure(self.clock, "state._resolve_events"):
-            new_state = resolve_events(state_set_ids, state_map)
-
-        new_state = {
-            key: state_map[ev_id] for key, ev_id in new_state.items()
-        }
-
-        return new_state
-
 
 def _ordered_events(events):
     def key_func(e):
@@ -420,19 +478,17 @@ def _ordered_events(events):
     return sorted(events, key=key_func)
 
 
-def resolve_events(state_sets, state_map_factory):
+def resolve_events_with_state_map(state_sets, state_map):
     """
     Args:
         state_sets(list): List of dicts of (type, state_key) -> event_id,
             which are the different state groups to resolve.
-        state_map_factory(dict|callable): If callable, then will be called
-            with a list of event_ids that are needed, and should return with
-            a Deferred of dict of event_id to event. Otherwise, should be
-            a dict from event_id to event of all events in state_sets.
+        state_map(dict): a dict from event_id to event, for all events in
+            state_sets.
 
     Returns
-        dict[(str, str), synapse.events.FrozenEvent] is a map from
-        (type, state_key) to event.
+        dict[(str, str), str]:
+            a map from (type, state_key) to event_id.
     """
     if len(state_sets) == 1:
         return state_sets[0]
@@ -441,13 +497,6 @@ def resolve_events(state_sets, state_map_factory):
         state_sets,
     )
 
-    if callable(state_map_factory):
-        return _resolve_with_state_fac(
-            unconflicted_state, conflicted_state, state_map_factory
-        )
-
-    state_map = state_map_factory
-
     auth_events = _create_auth_events_from_maps(
         unconflicted_state, conflicted_state, state_map
     )
@@ -461,6 +510,21 @@ def _seperate(state_sets):
     """Takes the state_sets and figures out which keys are conflicted and
     which aren't. i.e., which have multiple different event_ids associated
     with them in different state sets.
+
+    Args:
+        state_sets(list[dict[(str, str), str]]):
+            List of dicts of (type, state_key) -> event_id, which are the
+            different state groups to resolve.
+
+    Returns:
+        (dict[(str, str), str], dict[(str, str), set[str]]):
+            A tuple of (unconflicted_state, conflicted_state), where:
+
+            unconflicted_state is a dict mapping (type, state_key)->event_id
+            for unconflicted state keys.
+
+            conflicted_state is a dict mapping (type, state_key) to a set of
+            event ids for conflicted state keys.
     """
     unconflicted_state = dict(state_sets[0])
     conflicted_state = {}
@@ -491,8 +555,26 @@ def _seperate(state_sets):
 
 
 @defer.inlineCallbacks
-def _resolve_with_state_fac(unconflicted_state, conflicted_state,
-                            state_map_factory):
+def resolve_events_with_factory(state_sets, state_map_factory):
+    """
+    Args:
+        state_sets(list): List of dicts of (type, state_key) -> event_id,
+            which are the different state groups to resolve.
+        state_map_factory(func): will be called
+            with a list of event_ids that are needed, and should return with
+            a Deferred of dict of event_id to event.
+
+    Returns
+        Deferred[dict[(str, str), str]]:
+            a map from (type, state_key) to event_id.
+    """
+    if len(state_sets) == 1:
+        defer.returnValue(state_sets[0])
+
+    unconflicted_state, conflicted_state = _seperate(
+        state_sets,
+    )
+
     needed_events = set(
         event_id
         for event_ids in conflicted_state.itervalues()
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index b971f0cb18..68125006eb 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -291,33 +291,33 @@ class SQLBaseStore(object):
 
     @defer.inlineCallbacks
     def runInteraction(self, desc, func, *args, **kwargs):
-        """Wraps the .runInteraction() method on the underlying db_pool."""
-        current_context = LoggingContext.current_context()
+        """Starts a transaction on the database and runs a given function
 
-        start_time = time.time() * 1000
+        Arguments:
+            desc (str): description of the transaction, for logging and metrics
+            func (func): callback function, which will be called with a
+                database transaction (twisted.enterprise.adbapi.Transaction) as
+                its first argument, followed by `args` and `kwargs`.
+
+            args (list): positional args to pass to `func`
+            kwargs (dict): named args to pass to `func`
+
+        Returns:
+            Deferred: The result of func
+        """
+        current_context = LoggingContext.current_context()
 
         after_callbacks = []
         final_callbacks = []
 
         def inner_func(conn, *args, **kwargs):
-            with LoggingContext("runInteraction") as context:
-                sql_scheduling_timer.inc_by(time.time() * 1000 - start_time)
-
-                if self.database_engine.is_connection_closed(conn):
-                    logger.debug("Reconnecting closed database connection")
-                    conn.reconnect()
-
-                current_context.copy_to(context)
-                return self._new_transaction(
-                    conn, desc, after_callbacks, final_callbacks, current_context,
-                    func, *args, **kwargs
-                )
+            return self._new_transaction(
+                conn, desc, after_callbacks, final_callbacks, current_context,
+                func, *args, **kwargs
+            )
 
         try:
-            with PreserveLoggingContext():
-                result = yield self._db_pool.runWithConnection(
-                    inner_func, *args, **kwargs
-                )
+            result = yield self.runWithConnection(inner_func, *args, **kwargs)
 
             for after_callback, after_args, after_kwargs in after_callbacks:
                 after_callback(*after_args, **after_kwargs)
@@ -329,14 +329,27 @@ class SQLBaseStore(object):
 
     @defer.inlineCallbacks
     def runWithConnection(self, func, *args, **kwargs):
-        """Wraps the .runInteraction() method on the underlying db_pool."""
+        """Wraps the .runWithConnection() method on the underlying db_pool.
+
+        Arguments:
+            func (func): callback function, which will be called with a
+                database connection (twisted.enterprise.adbapi.Connection) as
+                its first argument, followed by `args` and `kwargs`.
+            args (list): positional args to pass to `func`
+            kwargs (dict): named args to pass to `func`
+
+        Returns:
+            Deferred: The result of func
+        """
         current_context = LoggingContext.current_context()
 
         start_time = time.time() * 1000
 
         def inner_func(conn, *args, **kwargs):
             with LoggingContext("runWithConnection") as context:
-                sql_scheduling_timer.inc_by(time.time() * 1000 - start_time)
+                sched_duration_ms = time.time() * 1000 - start_time
+                sql_scheduling_timer.inc_by(sched_duration_ms)
+                current_context.add_database_scheduled(sched_duration_ms)
 
                 if self.database_engine.is_connection_closed(conn):
                     logger.debug("Reconnecting closed database connection")
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index ad1d782705..dd28c2efe3 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -27,7 +27,7 @@ from synapse.util.logutils import log_function
 from synapse.util.metrics import Measure
 from synapse.api.constants import EventTypes
 from synapse.api.errors import SynapseError
-from synapse.state import resolve_events
+from synapse.state import resolve_events_with_factory
 from synapse.util.caches.descriptors import cached
 from synapse.types import get_domain_from_id
 
@@ -110,7 +110,7 @@ class _EventPeristenceQueue(object):
                 end_item.events_and_contexts.extend(events_and_contexts)
                 return end_item.deferred.observe()
 
-        deferred = ObservableDeferred(defer.Deferred())
+        deferred = ObservableDeferred(defer.Deferred(), consumeErrors=True)
 
         queue.append(self._EventPersistQueueItem(
             events_and_contexts=events_and_contexts,
@@ -152,8 +152,8 @@ class _EventPeristenceQueue(object):
                     try:
                         ret = yield per_item_callback(item)
                         item.deferred.callback(ret)
-                    except Exception as e:
-                        item.deferred.errback(e)
+                    except Exception:
+                        item.deferred.errback()
             finally:
                 queue = self._event_persist_queues.pop(room_id, None)
                 if queue:
@@ -386,11 +386,18 @@ class EventsStore(SQLBaseStore):
                                 if all_single_prev_not_state:
                                     continue
 
-                            state = yield self._calculate_state_delta(
-                                room_id, ev_ctx_rm, new_latest_event_ids
+                            logger.info(
+                                "Calculating state delta for room %s", room_id,
                             )
-                            if state:
-                                current_state_for_room[room_id] = state
+                            current_state = yield self._get_new_state_after_events(
+                                ev_ctx_rm, new_latest_event_ids,
+                            )
+                            if current_state is not None:
+                                delta = yield self._calculate_state_delta(
+                                    room_id, current_state,
+                                )
+                                if delta is not None:
+                                    current_state_for_room[room_id] = delta
 
                 yield self.runInteraction(
                     "persist_events",
@@ -467,20 +474,22 @@ class EventsStore(SQLBaseStore):
         defer.returnValue(new_latest_event_ids)
 
     @defer.inlineCallbacks
-    def _calculate_state_delta(self, room_id, events_context, new_latest_event_ids):
-        """Calculate the new state deltas for a room.
+    def _get_new_state_after_events(self, events_context, new_latest_event_ids):
+        """Calculate the current state dict after adding some new events to
+        a room
 
-        Assumes that we are only persisting events for one room at a time.
+        Args:
+            events_context (list[(EventBase, EventContext)]):
+                events and contexts which are being added to the room
+
+            new_latest_event_ids (iterable[str]):
+                the new forward extremities for the room.
 
         Returns:
-            3-tuple (to_delete, to_insert, new_state) where both are state dicts,
-            i.e. (type, state_key) -> event_id. `to_delete` are the entries to
-            first be deleted from current_state_events, `to_insert` are entries
-            to insert. `new_state` is the full set of state.
-            May return None if there are no changes to be applied.
+            Deferred[dict[(str,str), str]|None]:
+                None if there are no changes to the room state, or
+                a dict of (type, state_key) -> event_id].
         """
-        # Now we need to work out the different state sets for
-        # each state extremities
         state_sets = []
         state_groups = set()
         missing_event_ids = []
@@ -523,18 +532,23 @@ class EventsStore(SQLBaseStore):
                 state_sets.extend(group_to_state.itervalues())
 
         if not new_latest_event_ids:
-            current_state = {}
+            defer.returnValue({})
         elif was_updated:
             if len(state_sets) == 1:
                 # If there is only one state set, then we know what the current
                 # state is.
-                current_state = state_sets[0]
+                defer.returnValue(state_sets[0])
             else:
                 # We work out the current state by passing the state sets to the
                 # state resolution algorithm. It may ask for some events, including
                 # the events we have yet to persist, so we need a slightly more
                 # complicated event lookup function than simply looking the events
                 # up in the db.
+
+                logger.info(
+                    "Resolving state with %i state sets", len(state_sets),
+                )
+
                 events_map = {ev.event_id: ev for ev, _ in events_context}
 
                 @defer.inlineCallbacks
@@ -557,13 +571,26 @@ class EventsStore(SQLBaseStore):
                         to_return.update(evs)
                     defer.returnValue(to_return)
 
-                current_state = yield resolve_events(
+                current_state = yield resolve_events_with_factory(
                     state_sets,
                     state_map_factory=get_events,
                 )
+                defer.returnValue(current_state)
         else:
             return
 
+    @defer.inlineCallbacks
+    def _calculate_state_delta(self, room_id, current_state):
+        """Calculate the new state deltas for a room.
+
+        Assumes that we are only persisting events for one room at a time.
+
+        Returns:
+            3-tuple (to_delete, to_insert, new_state) where both are state dicts,
+            i.e. (type, state_key) -> event_id. `to_delete` are the entries to
+            first be deleted from current_state_events, `to_insert` are entries
+            to insert. `new_state` is the full set of state.
+        """
         existing_state = yield self.get_current_state_ids(room_id)
 
         existing_events = set(existing_state.itervalues())
diff --git a/synapse/storage/media_repository.py b/synapse/storage/media_repository.py
index 6ebc372498..e6cdbb0545 100644
--- a/synapse/storage/media_repository.py
+++ b/synapse/storage/media_repository.py
@@ -173,7 +173,14 @@ class MediaRepositoryStore(BackgroundUpdateStore):
             desc="store_cached_remote_media",
         )
 
-    def update_cached_last_access_time(self, origin_id_tuples, time_ts):
+    def update_cached_last_access_time(self, local_media, remote_media, time_ms):
+        """Updates the last access time of the given media
+
+        Args:
+            local_media (iterable[str]): Set of media_ids
+            remote_media (iterable[(str, str)]): Set of (server_name, media_id)
+            time_ms: Current time in milliseconds
+        """
         def update_cache_txn(txn):
             sql = (
                 "UPDATE remote_media_cache SET last_access_ts = ?"
@@ -181,8 +188,18 @@ class MediaRepositoryStore(BackgroundUpdateStore):
             )
 
             txn.executemany(sql, (
-                (time_ts, media_origin, media_id)
-                for media_origin, media_id in origin_id_tuples
+                (time_ms, media_origin, media_id)
+                for media_origin, media_id in remote_media
+            ))
+
+            sql = (
+                "UPDATE local_media_repository SET last_access_ts = ?"
+                " WHERE media_id = ?"
+            )
+
+            txn.executemany(sql, (
+                (time_ms, media_id)
+                for media_id in local_media
             ))
 
         return self.runInteraction("update_cached_last_access_time", update_cache_txn)
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index d1691bbac2..c845a0cec5 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -25,7 +25,7 @@ logger = logging.getLogger(__name__)
 
 # Remember to update this number every time a change is made to database
 # schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 46
+SCHEMA_VERSION = 47
 
 dir_path = os.path.abspath(os.path.dirname(__file__))
 
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 23688430b7..cf2c4dae39 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -533,73 +533,114 @@ class RoomStore(SQLBaseStore):
         )
         self.is_room_blocked.invalidate((room_id,))
 
+    def get_media_mxcs_in_room(self, room_id):
+        """Retrieves all the local and remote media MXC URIs in a given room
+
+        Args:
+            room_id (str)
+
+        Returns:
+            The local and remote media as a lists of tuples where the key is
+            the hostname and the value is the media ID.
+        """
+        def _get_media_mxcs_in_room_txn(txn):
+            local_mxcs, remote_mxcs = self._get_media_mxcs_in_room_txn(txn, room_id)
+            local_media_mxcs = []
+            remote_media_mxcs = []
+
+            # Convert the IDs to MXC URIs
+            for media_id in local_mxcs:
+                local_media_mxcs.append("mxc://%s/%s" % (self.hostname, media_id))
+            for hostname, media_id in remote_mxcs:
+                remote_media_mxcs.append("mxc://%s/%s" % (hostname, media_id))
+
+            return local_media_mxcs, remote_media_mxcs
+        return self.runInteraction("get_media_ids_in_room", _get_media_mxcs_in_room_txn)
+
     def quarantine_media_ids_in_room(self, room_id, quarantined_by):
         """For a room loops through all events with media and quarantines
         the associated media
         """
-        def _get_media_ids_in_room(txn):
-            mxc_re = re.compile("^mxc://([^/]+)/([^/#?]+)")
+        def _quarantine_media_in_room_txn(txn):
+            local_mxcs, remote_mxcs = self._get_media_mxcs_in_room_txn(txn, room_id)
+            total_media_quarantined = 0
 
-            next_token = self.get_current_events_token() + 1
+            # Now update all the tables to set the quarantined_by flag
 
-            total_media_quarantined = 0
+            txn.executemany("""
+                UPDATE local_media_repository
+                SET quarantined_by = ?
+                WHERE media_id = ?
+            """, ((quarantined_by, media_id) for media_id in local_mxcs))
 
-            while next_token:
-                sql = """
-                    SELECT stream_ordering, content FROM events
-                    WHERE room_id = ?
-                        AND stream_ordering < ?
-                        AND contains_url = ? AND outlier = ?
-                    ORDER BY stream_ordering DESC
-                    LIMIT ?
+            txn.executemany(
                 """
-                txn.execute(sql, (room_id, next_token, True, False, 100))
-
-                next_token = None
-                local_media_mxcs = []
-                remote_media_mxcs = []
-                for stream_ordering, content_json in txn:
-                    next_token = stream_ordering
-                    content = json.loads(content_json)
-
-                    content_url = content.get("url")
-                    thumbnail_url = content.get("info", {}).get("thumbnail_url")
-
-                    for url in (content_url, thumbnail_url):
-                        if not url:
-                            continue
-                        matches = mxc_re.match(url)
-                        if matches:
-                            hostname = matches.group(1)
-                            media_id = matches.group(2)
-                            if hostname == self.hostname:
-                                local_media_mxcs.append(media_id)
-                            else:
-                                remote_media_mxcs.append((hostname, media_id))
-
-                # Now update all the tables to set the quarantined_by flag
-
-                txn.executemany("""
-                    UPDATE local_media_repository
+                    UPDATE remote_media_cache
                     SET quarantined_by = ?
-                    WHERE media_id = ?
-                """, ((quarantined_by, media_id) for media_id in local_media_mxcs))
-
-                txn.executemany(
-                    """
-                        UPDATE remote_media_cache
-                        SET quarantined_by = ?
-                        WHERE media_origin AND media_id = ?
-                    """,
-                    (
-                        (quarantined_by, origin, media_id)
-                        for origin, media_id in remote_media_mxcs
-                    )
+                    WHERE media_origin = ? AND media_id = ?
+                """,
+                (
+                    (quarantined_by, origin, media_id)
+                    for origin, media_id in remote_mxcs
                 )
+            )
 
-                total_media_quarantined += len(local_media_mxcs)
-                total_media_quarantined += len(remote_media_mxcs)
+            total_media_quarantined += len(local_mxcs)
+            total_media_quarantined += len(remote_mxcs)
 
             return total_media_quarantined
 
-        return self.runInteraction("get_media_ids_in_room", _get_media_ids_in_room)
+        return self.runInteraction(
+            "quarantine_media_in_room",
+            _quarantine_media_in_room_txn,
+        )
+
+    def _get_media_mxcs_in_room_txn(self, txn, room_id):
+        """Retrieves all the local and remote media MXC URIs in a given room
+
+        Args:
+            txn (cursor)
+            room_id (str)
+
+        Returns:
+            The local and remote media as a lists of tuples where the key is
+            the hostname and the value is the media ID.
+        """
+        mxc_re = re.compile("^mxc://([^/]+)/([^/#?]+)")
+
+        next_token = self.get_current_events_token() + 1
+        local_media_mxcs = []
+        remote_media_mxcs = []
+
+        while next_token:
+            sql = """
+                SELECT stream_ordering, content FROM events
+                WHERE room_id = ?
+                    AND stream_ordering < ?
+                    AND contains_url = ? AND outlier = ?
+                ORDER BY stream_ordering DESC
+                LIMIT ?
+            """
+            txn.execute(sql, (room_id, next_token, True, False, 100))
+
+            next_token = None
+            for stream_ordering, content_json in txn:
+                next_token = stream_ordering
+                content = json.loads(content_json)
+
+                content_url = content.get("url")
+                thumbnail_url = content.get("info", {}).get("thumbnail_url")
+
+                for url in (content_url, thumbnail_url):
+                    if not url:
+                        continue
+                    matches = mxc_re.match(url)
+                    if matches:
+                        hostname = matches.group(1)
+                        media_id = matches.group(2)
+                        if hostname == self.hostname:
+                            local_media_mxcs.append(media_id)
+                        else:
+                            remote_media_mxcs.append((hostname, media_id))
+
+        return local_media_mxcs, remote_media_mxcs
diff --git a/synapse/storage/schema/delta/47/last_access_media.sql b/synapse/storage/schema/delta/47/last_access_media.sql
new file mode 100644
index 0000000000..f505fb22b5
--- /dev/null
+++ b/synapse/storage/schema/delta/47/last_access_media.sql
@@ -0,0 +1,16 @@
+/* Copyright 2018 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ALTER TABLE local_media_repository ADD COLUMN last_access_ts BIGINT;
diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py
index c9bff408ef..dfdcbb3181 100644
--- a/synapse/storage/user_directory.py
+++ b/synapse/storage/user_directory.py
@@ -641,8 +641,12 @@ class UserDirectoryStore(SQLBaseStore):
         """
 
         if self.hs.config.user_directory_search_all_users:
-            join_clause = ""
-            where_clause = "?<>''"  # naughty hack to keep the same number of binds
+            # make s.user_id null to keep the ordering algorithm happy
+            join_clause = """
+                CROSS JOIN (SELECT NULL as user_id) AS s
+            """
+            join_args = ()
+            where_clause = "1=1"
         else:
             join_clause = """
                 LEFT JOIN users_in_public_rooms AS p USING (user_id)
@@ -651,6 +655,7 @@ class UserDirectoryStore(SQLBaseStore):
                     WHERE user_id = ? AND share_private
                 ) AS s USING (user_id)
             """
+            join_args = (user_id,)
             where_clause = "(s.user_id IS NOT NULL OR p.user_id IS NOT NULL)"
 
         if isinstance(self.database_engine, PostgresEngine):
@@ -692,7 +697,7 @@ class UserDirectoryStore(SQLBaseStore):
                     avatar_url IS NULL
                 LIMIT ?
             """ % (join_clause, where_clause)
-            args = (user_id, full_query, exact_query, prefix_query, limit + 1,)
+            args = join_args + (full_query, exact_query, prefix_query, limit + 1,)
         elif isinstance(self.database_engine, Sqlite3Engine):
             search_query = _parse_query_sqlite(search_term)
 
@@ -710,7 +715,7 @@ class UserDirectoryStore(SQLBaseStore):
                     avatar_url IS NULL
                 LIMIT ?
             """ % (join_clause, where_clause)
-            args = (user_id, search_query, limit + 1)
+            args = join_args + (search_query, limit + 1)
         else:
             # This should be unreachable.
             raise Exception("Unrecognized database engine")
diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py
index 48c9f6802d..94fa7cac98 100644
--- a/synapse/util/logcontext.py
+++ b/synapse/util/logcontext.py
@@ -52,13 +52,17 @@ except Exception:
 class LoggingContext(object):
     """Additional context for log formatting. Contexts are scoped within a
     "with" block.
+
     Args:
         name (str): Name for the context for debugging.
     """
 
     __slots__ = [
-        "previous_context", "name", "usage_start", "usage_end", "main_thread",
-        "__dict__", "tag", "alive",
+        "previous_context", "name", "ru_stime", "ru_utime",
+        "db_txn_count", "db_txn_duration_ms", "db_sched_duration_ms",
+        "usage_start", "usage_end",
+        "main_thread", "alive",
+        "request", "tag",
     ]
 
     thread_local = threading.local()
@@ -83,6 +87,9 @@ class LoggingContext(object):
         def add_database_transaction(self, duration_ms):
             pass
 
+        def add_database_scheduled(self, sched_ms):
+            pass
+
         def __nonzero__(self):
             return False
 
@@ -94,9 +101,17 @@ class LoggingContext(object):
         self.ru_stime = 0.
         self.ru_utime = 0.
         self.db_txn_count = 0
-        self.db_txn_duration = 0.
+
+        # ms spent waiting for db txns, excluding scheduling time
+        self.db_txn_duration_ms = 0
+
+        # ms spent waiting for db txns to be scheduled
+        self.db_sched_duration_ms = 0
+
         self.usage_start = None
+        self.usage_end = None
         self.main_thread = threading.current_thread()
+        self.request = None
         self.tag = ""
         self.alive = True
 
@@ -105,7 +120,11 @@ class LoggingContext(object):
 
     @classmethod
     def current_context(cls):
-        """Get the current logging context from thread local storage"""
+        """Get the current logging context from thread local storage
+
+        Returns:
+            LoggingContext: the current logging context
+        """
         return getattr(cls.thread_local, "current_context", cls.sentinel)
 
     @classmethod
@@ -155,11 +174,13 @@ class LoggingContext(object):
         self.alive = False
 
     def copy_to(self, record):
-        """Copy fields from this context to the record"""
-        for key, value in self.__dict__.items():
-            setattr(record, key, value)
+        """Copy logging fields from this context to a log record or
+        another LoggingContext
+        """
 
-        record.ru_utime, record.ru_stime = self.get_resource_usage()
+        # 'request' is the only field we currently use in the logger, so that's
+        # all we need to copy
+        record.request = self.request
 
     def start(self):
         if threading.current_thread() is not self.main_thread:
@@ -194,7 +215,16 @@ class LoggingContext(object):
 
     def add_database_transaction(self, duration_ms):
         self.db_txn_count += 1
-        self.db_txn_duration += duration_ms / 1000.
+        self.db_txn_duration_ms += duration_ms
+
+    def add_database_scheduled(self, sched_ms):
+        """Record a use of the database pool
+
+        Args:
+            sched_ms (int): number of milliseconds it took us to get a
+                connection
+        """
+        self.db_sched_duration_ms += sched_ms
 
 
 class LoggingContextFilter(logging.Filter):
diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py
index 8d22ff3068..e4b5687a4b 100644
--- a/synapse/util/metrics.py
+++ b/synapse/util/metrics.py
@@ -28,7 +28,7 @@ logger = logging.getLogger(__name__)
 metrics = synapse.metrics.get_metrics_for(__name__)
 
 # total number of times we have hit this block
-response_count = metrics.register_counter(
+block_counter = metrics.register_counter(
     "block_count",
     labels=["block_name"],
     alternative_names=(
@@ -72,13 +72,19 @@ block_db_txn_count = metrics.register_counter(
     ),
 )
 
+# seconds spent waiting for db txns, excluding scheduling time, in this block
 block_db_txn_duration = metrics.register_counter(
     "block_db_txn_duration_seconds", labels=["block_name"],
     alternative_names=(
-        metrics.name_prefix + "_block_db_txn_count:total",
+        metrics.name_prefix + "_block_db_txn_duration:total",
     ),
 )
 
+# seconds spent waiting for a db connection, in this block
+block_db_sched_duration = metrics.register_counter(
+    "block_db_sched_duration_seconds", labels=["block_name"],
+)
+
 
 def measure_func(name):
     def wrapper(func):
@@ -95,7 +101,9 @@ def measure_func(name):
 class Measure(object):
     __slots__ = [
         "clock", "name", "start_context", "start", "new_context", "ru_utime",
-        "ru_stime", "db_txn_count", "db_txn_duration", "created_context"
+        "ru_stime",
+        "db_txn_count", "db_txn_duration_ms", "db_sched_duration_ms",
+        "created_context",
     ]
 
     def __init__(self, clock, name):
@@ -115,13 +123,16 @@ class Measure(object):
 
         self.ru_utime, self.ru_stime = self.start_context.get_resource_usage()
         self.db_txn_count = self.start_context.db_txn_count
-        self.db_txn_duration = self.start_context.db_txn_duration
+        self.db_txn_duration_ms = self.start_context.db_txn_duration_ms
+        self.db_sched_duration_ms = self.start_context.db_sched_duration_ms
 
     def __exit__(self, exc_type, exc_val, exc_tb):
         if isinstance(exc_type, Exception) or not self.start_context:
             return
 
         duration = self.clock.time_msec() - self.start
+
+        block_counter.inc(self.name)
         block_timer.inc_by(duration, self.name)
 
         context = LoggingContext.current_context()
@@ -145,7 +156,12 @@ class Measure(object):
             context.db_txn_count - self.db_txn_count, self.name
         )
         block_db_txn_duration.inc_by(
-            context.db_txn_duration - self.db_txn_duration, self.name
+            (context.db_txn_duration_ms - self.db_txn_duration_ms) / 1000.,
+            self.name
+        )
+        block_db_sched_duration.inc_by(
+            (context.db_sched_duration_ms - self.db_sched_duration_ms) / 1000.,
+            self.name
         )
 
         if self.created_context:
diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py
index 1adedbb361..47b0bb5eb3 100644
--- a/synapse/util/retryutils.py
+++ b/synapse/util/retryutils.py
@@ -26,6 +26,18 @@ logger = logging.getLogger(__name__)
 
 class NotRetryingDestination(Exception):
     def __init__(self, retry_last_ts, retry_interval, destination):
+        """Raised by the limiter (and federation client) to indicate that we are
+        are deliberately not attempting to contact a given server.
+
+        Args:
+            retry_last_ts (int): the unix ts in milliseconds of our last attempt
+                to contact the server.  0 indicates that the last attempt was
+                successful or that we've never actually attempted to connect.
+            retry_interval (int): the time in milliseconds to wait until the next
+                attempt.
+            destination (str): the domain in question
+        """
+
         msg = "Not retrying server %s." % (destination,)
         super(NotRetryingDestination, self).__init__(msg)
 
diff --git a/synapse/util/threepids.py b/synapse/util/threepids.py
new file mode 100644
index 0000000000..75efa0117b
--- /dev/null
+++ b/synapse/util/threepids.py
@@ -0,0 +1,48 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import re
+
+logger = logging.getLogger(__name__)
+
+
+def check_3pid_allowed(hs, medium, address):
+    """Checks whether a given format of 3PID is allowed to be used on this HS
+
+    Args:
+        hs (synapse.server.HomeServer): server
+        medium (str): 3pid medium - e.g. email, msisdn
+        address (str): address within that medium (e.g. "wotan@matrix.org")
+            msisdns need to first have been canonicalised
+    Returns:
+        bool: whether the 3PID medium/address is allowed to be added to this HS
+    """
+
+    if hs.config.allowed_local_3pids:
+        for constraint in hs.config.allowed_local_3pids:
+            logger.debug(
+                "Checking 3PID %s (%s) against %s (%s)",
+                address, medium, constraint['pattern'], constraint['medium'],
+            )
+            if (
+                medium == constraint['medium'] and
+                re.match(constraint['pattern'], address)
+            ):
+                return True
+    else:
+        return True
+
+    return False