summary refs log tree commit diff
diff options
context:
space:
mode:
-rwxr-xr-xscripts/move_remote_media_to_new_store.py133
-rw-r--r--synapse/api/errors.py27
-rw-r--r--synapse/app/appservice.py13
-rw-r--r--synapse/app/client_reader.py13
-rw-r--r--synapse/app/federation_reader.py13
-rw-r--r--synapse/app/federation_sender.py13
-rw-r--r--synapse/app/frontend_proxy.py13
-rwxr-xr-xsynapse/app/homeserver.py13
-rw-r--r--synapse/app/media_repository.py13
-rw-r--r--synapse/app/pusher.py13
-rw-r--r--synapse/app/synchrotron.py13
-rw-r--r--synapse/app/user_dir.py13
-rw-r--r--synapse/config/registration.py19
-rw-r--r--synapse/config/repository.py87
-rw-r--r--synapse/config/server.py22
-rw-r--r--synapse/federation/federation_client.py5
-rw-r--r--synapse/federation/transaction_queue.py4
-rw-r--r--synapse/federation/transport/client.py3
-rw-r--r--synapse/federation/transport/server.py9
-rw-r--r--synapse/handlers/device.py4
-rw-r--r--synapse/handlers/e2e_keys.py8
-rw-r--r--synapse/handlers/federation.py4
-rw-r--r--synapse/handlers/register.py8
-rw-r--r--synapse/handlers/room_list.py3
-rw-r--r--synapse/http/matrixfederationclient.py28
-rw-r--r--synapse/metrics/__init__.py7
-rw-r--r--synapse/rest/client/v1/register.py34
-rw-r--r--synapse/rest/client/v1/room.py17
-rw-r--r--synapse/rest/client/v2_alpha/account.py21
-rw-r--r--synapse/rest/client/v2_alpha/register.py69
-rw-r--r--synapse/rest/key/v2/remote_key_resource.py8
-rw-r--r--synapse/rest/media/v1/media_repository.py37
-rw-r--r--synapse/rest/media/v1/media_storage.py8
-rw-r--r--synapse/rest/media/v1/storage_provider.py31
-rw-r--r--synapse/rest/media/v1/thumbnail_resource.py4
-rw-r--r--synapse/server.py17
-rw-r--r--synapse/storage/user_directory.py14
-rw-r--r--synapse/util/file_consumer.py139
-rw-r--r--synapse/util/retryutils.py12
-rw-r--r--synapse/util/threepids.py48
-rw-r--r--tests/crypto/test_keyring.py4
-rw-r--r--tests/handlers/test_e2e_keys.py1
-rw-r--r--tests/replication/slave/storage/_base.py8
-rw-r--r--tests/rest/client/v2_alpha/test_register.py1
-rw-r--r--tests/storage/test_user_directory.py88
-rw-r--r--tests/util/test_file_consumer.py176
-rw-r--r--tests/utils.py245
47 files changed, 1081 insertions, 402 deletions
diff --git a/scripts/move_remote_media_to_new_store.py b/scripts/move_remote_media_to_new_store.py
new file mode 100755
index 0000000000..7914ead889
--- /dev/null
+++ b/scripts/move_remote_media_to_new_store.py
@@ -0,0 +1,133 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# Copyright 2017 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Moves a list of remote media from one media store to another.
+
+The input should be a list of media files to be moved, one per line. Each line
+should be formatted::
+
+    <origin server>|<file id>
+
+This can be extracted from postgres with::
+
+    psql --tuples-only -A -c "select media_origin, filesystem_id from
+        matrix.remote_media_cache where ..."
+
+To use, pipe the above into::
+
+    PYTHON_PATH=. ./scripts/move_remote_media_to_new_store.py <source repo> <dest repo>
+"""
+
+from __future__ import print_function
+
+import argparse
+import logging
+
+import sys
+
+import os
+
+import shutil
+
+from synapse.rest.media.v1.filepath import MediaFilePaths
+
+logger = logging.getLogger()
+
+
+def main(src_repo, dest_repo):
+    src_paths = MediaFilePaths(src_repo)
+    dest_paths = MediaFilePaths(dest_repo)
+    for line in sys.stdin:
+        line = line.strip()
+        parts = line.split('|')
+        if len(parts) != 2:
+            print("Unable to parse input line %s" % line, file=sys.stderr)
+            exit(1)
+
+        move_media(parts[0], parts[1], src_paths, dest_paths)
+
+
+def move_media(origin_server, file_id, src_paths, dest_paths):
+    """Move the given file, and any thumbnails, to the dest repo
+
+    Args:
+        origin_server (str):
+        file_id (str):
+        src_paths (MediaFilePaths):
+        dest_paths (MediaFilePaths):
+    """
+    logger.info("%s/%s", origin_server, file_id)
+
+    # check that the original exists
+    original_file = src_paths.remote_media_filepath(origin_server, file_id)
+    if not os.path.exists(original_file):
+        logger.warn(
+            "Original for %s/%s (%s) does not exist",
+            origin_server, file_id, original_file,
+        )
+    else:
+        mkdir_and_move(
+            original_file,
+            dest_paths.remote_media_filepath(origin_server, file_id),
+        )
+
+    # now look for thumbnails
+    original_thumb_dir = src_paths.remote_media_thumbnail_dir(
+        origin_server, file_id,
+    )
+    if not os.path.exists(original_thumb_dir):
+        return
+
+    mkdir_and_move(
+        original_thumb_dir,
+        dest_paths.remote_media_thumbnail_dir(origin_server, file_id)
+    )
+
+
+def mkdir_and_move(original_file, dest_file):
+    dirname = os.path.dirname(dest_file)
+    if not os.path.exists(dirname):
+        logger.debug("mkdir %s", dirname)
+        os.makedirs(dirname)
+    logger.debug("mv %s %s", original_file, dest_file)
+    shutil.move(original_file, dest_file)
+
+
+if __name__ == "__main__":
+    parser = argparse.ArgumentParser(
+        description=__doc__,
+        formatter_class = argparse.RawDescriptionHelpFormatter,
+    )
+    parser.add_argument(
+        "-v", action='store_true', help='enable debug logging')
+    parser.add_argument(
+        "src_repo",
+        help="Path to source content repo",
+    )
+    parser.add_argument(
+        "dest_repo",
+        help="Path to source content repo",
+    )
+    args = parser.parse_args()
+
+    logging_config = {
+        "level": logging.DEBUG if args.v else logging.INFO,
+        "format": "%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(message)s"
+    }
+    logging.basicConfig(**logging_config)
+
+    main(args.src_repo, args.dest_repo)
diff --git a/synapse/api/errors.py b/synapse/api/errors.py
index 79b35b3e7c..aa15f73f36 100644
--- a/synapse/api/errors.py
+++ b/synapse/api/errors.py
@@ -46,6 +46,7 @@ class Codes(object):
     THREEPID_AUTH_FAILED = "M_THREEPID_AUTH_FAILED"
     THREEPID_IN_USE = "M_THREEPID_IN_USE"
     THREEPID_NOT_FOUND = "M_THREEPID_NOT_FOUND"
+    THREEPID_DENIED = "M_THREEPID_DENIED"
     INVALID_USERNAME = "M_INVALID_USERNAME"
     SERVER_NOT_TRUSTED = "M_SERVER_NOT_TRUSTED"
 
@@ -140,6 +141,32 @@ class RegistrationError(SynapseError):
     pass
 
 
+class FederationDeniedError(SynapseError):
+    """An error raised when the server tries to federate with a server which
+    is not on its federation whitelist.
+
+    Attributes:
+        destination (str): The destination which has been denied
+    """
+
+    def __init__(self, destination):
+        """Raised by federation client or server to indicate that we are
+        are deliberately not attempting to contact a given server because it is
+        not on our federation whitelist.
+
+        Args:
+            destination (str): the domain in question
+        """
+
+        self.destination = destination
+
+        super(FederationDeniedError, self).__init__(
+            code=403,
+            msg="Federation denied with %s." % (self.destination,),
+            errcode=Codes.FORBIDDEN,
+        )
+
+
 class InteractiveAuthIncompleteError(Exception):
     """An error raised when UI auth is not yet complete
 
diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py
index 7d0c2879ae..c6fe4516d1 100644
--- a/synapse/app/appservice.py
+++ b/synapse/app/appservice.py
@@ -49,19 +49,6 @@ class AppserviceSlaveStore(
 
 
 class AppserviceServer(HomeServer):
-    def get_db_conn(self, run_new_connection=True):
-        # Any param beginning with cp_ is a parameter for adbapi, and should
-        # not be passed to the database engine.
-        db_params = {
-            k: v for k, v in self.db_config.get("args", {}).items()
-            if not k.startswith("cp_")
-        }
-        db_conn = self.database_engine.module.connect(**db_params)
-
-        if run_new_connection:
-            self.database_engine.on_new_connection(db_conn)
-        return db_conn
-
     def setup(self):
         logger.info("Setting up.")
         self.datastore = AppserviceSlaveStore(self.get_db_conn(), self)
diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py
index dc3f6efd43..3b3352798d 100644
--- a/synapse/app/client_reader.py
+++ b/synapse/app/client_reader.py
@@ -64,19 +64,6 @@ class ClientReaderSlavedStore(
 
 
 class ClientReaderServer(HomeServer):
-    def get_db_conn(self, run_new_connection=True):
-        # Any param beginning with cp_ is a parameter for adbapi, and should
-        # not be passed to the database engine.
-        db_params = {
-            k: v for k, v in self.db_config.get("args", {}).items()
-            if not k.startswith("cp_")
-        }
-        db_conn = self.database_engine.module.connect(**db_params)
-
-        if run_new_connection:
-            self.database_engine.on_new_connection(db_conn)
-        return db_conn
-
     def setup(self):
         logger.info("Setting up.")
         self.datastore = ClientReaderSlavedStore(self.get_db_conn(), self)
diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py
index a072291e1f..4de43c41f0 100644
--- a/synapse/app/federation_reader.py
+++ b/synapse/app/federation_reader.py
@@ -58,19 +58,6 @@ class FederationReaderSlavedStore(
 
 
 class FederationReaderServer(HomeServer):
-    def get_db_conn(self, run_new_connection=True):
-        # Any param beginning with cp_ is a parameter for adbapi, and should
-        # not be passed to the database engine.
-        db_params = {
-            k: v for k, v in self.db_config.get("args", {}).items()
-            if not k.startswith("cp_")
-        }
-        db_conn = self.database_engine.module.connect(**db_params)
-
-        if run_new_connection:
-            self.database_engine.on_new_connection(db_conn)
-        return db_conn
-
     def setup(self):
         logger.info("Setting up.")
         self.datastore = FederationReaderSlavedStore(self.get_db_conn(), self)
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 09e9488f06..f760826d27 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -76,19 +76,6 @@ class FederationSenderSlaveStore(
 
 
 class FederationSenderServer(HomeServer):
-    def get_db_conn(self, run_new_connection=True):
-        # Any param beginning with cp_ is a parameter for adbapi, and should
-        # not be passed to the database engine.
-        db_params = {
-            k: v for k, v in self.db_config.get("args", {}).items()
-            if not k.startswith("cp_")
-        }
-        db_conn = self.database_engine.module.connect(**db_params)
-
-        if run_new_connection:
-            self.database_engine.on_new_connection(db_conn)
-        return db_conn
-
     def setup(self):
         logger.info("Setting up.")
         self.datastore = FederationSenderSlaveStore(self.get_db_conn(), self)
diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py
index ae531c0aa4..e32ee8fe93 100644
--- a/synapse/app/frontend_proxy.py
+++ b/synapse/app/frontend_proxy.py
@@ -118,19 +118,6 @@ class FrontendProxySlavedStore(
 
 
 class FrontendProxyServer(HomeServer):
-    def get_db_conn(self, run_new_connection=True):
-        # Any param beginning with cp_ is a parameter for adbapi, and should
-        # not be passed to the database engine.
-        db_params = {
-            k: v for k, v in self.db_config.get("args", {}).items()
-            if not k.startswith("cp_")
-        }
-        db_conn = self.database_engine.module.connect(**db_params)
-
-        if run_new_connection:
-            self.database_engine.on_new_connection(db_conn)
-        return db_conn
-
     def setup(self):
         logger.info("Setting up.")
         self.datastore = FrontendProxySlavedStore(self.get_db_conn(), self)
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 92ab3b311b..cb82a415a6 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -266,19 +266,6 @@ class SynapseHomeServer(HomeServer):
         except IncorrectDatabaseSetup as e:
             quit_with_error(e.message)
 
-    def get_db_conn(self, run_new_connection=True):
-        # Any param beginning with cp_ is a parameter for adbapi, and should
-        # not be passed to the database engine.
-        db_params = {
-            k: v for k, v in self.db_config.get("args", {}).items()
-            if not k.startswith("cp_")
-        }
-        db_conn = self.database_engine.module.connect(**db_params)
-
-        if run_new_connection:
-            self.database_engine.on_new_connection(db_conn)
-        return db_conn
-
 
 def setup(config_options):
     """
diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py
index eab1597aaa..1ed1ca8772 100644
--- a/synapse/app/media_repository.py
+++ b/synapse/app/media_repository.py
@@ -60,19 +60,6 @@ class MediaRepositorySlavedStore(
 
 
 class MediaRepositoryServer(HomeServer):
-    def get_db_conn(self, run_new_connection=True):
-        # Any param beginning with cp_ is a parameter for adbapi, and should
-        # not be passed to the database engine.
-        db_params = {
-            k: v for k, v in self.db_config.get("args", {}).items()
-            if not k.startswith("cp_")
-        }
-        db_conn = self.database_engine.module.connect(**db_params)
-
-        if run_new_connection:
-            self.database_engine.on_new_connection(db_conn)
-        return db_conn
-
     def setup(self):
         logger.info("Setting up.")
         self.datastore = MediaRepositorySlavedStore(self.get_db_conn(), self)
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index 7fbbb0b0e1..32ccea3f13 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -81,19 +81,6 @@ class PusherSlaveStore(
 
 
 class PusherServer(HomeServer):
-    def get_db_conn(self, run_new_connection=True):
-        # Any param beginning with cp_ is a parameter for adbapi, and should
-        # not be passed to the database engine.
-        db_params = {
-            k: v for k, v in self.db_config.get("args", {}).items()
-            if not k.startswith("cp_")
-        }
-        db_conn = self.database_engine.module.connect(**db_params)
-
-        if run_new_connection:
-            self.database_engine.on_new_connection(db_conn)
-        return db_conn
-
     def setup(self):
         logger.info("Setting up.")
         self.datastore = PusherSlaveStore(self.get_db_conn(), self)
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 0abba3016e..f87531f1b6 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -246,19 +246,6 @@ class SynchrotronApplicationService(object):
 
 
 class SynchrotronServer(HomeServer):
-    def get_db_conn(self, run_new_connection=True):
-        # Any param beginning with cp_ is a parameter for adbapi, and should
-        # not be passed to the database engine.
-        db_params = {
-            k: v for k, v in self.db_config.get("args", {}).items()
-            if not k.startswith("cp_")
-        }
-        db_conn = self.database_engine.module.connect(**db_params)
-
-        if run_new_connection:
-            self.database_engine.on_new_connection(db_conn)
-        return db_conn
-
     def setup(self):
         logger.info("Setting up.")
         self.datastore = SynchrotronSlavedStore(self.get_db_conn(), self)
diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py
index a48c4a2ae6..494ccb702c 100644
--- a/synapse/app/user_dir.py
+++ b/synapse/app/user_dir.py
@@ -92,19 +92,6 @@ class UserDirectorySlaveStore(
 
 
 class UserDirectoryServer(HomeServer):
-    def get_db_conn(self, run_new_connection=True):
-        # Any param beginning with cp_ is a parameter for adbapi, and should
-        # not be passed to the database engine.
-        db_params = {
-            k: v for k, v in self.db_config.get("args", {}).items()
-            if not k.startswith("cp_")
-        }
-        db_conn = self.database_engine.module.connect(**db_params)
-
-        if run_new_connection:
-            self.database_engine.on_new_connection(db_conn)
-        return db_conn
-
     def setup(self):
         logger.info("Setting up.")
         self.datastore = UserDirectorySlaveStore(self.get_db_conn(), self)
diff --git a/synapse/config/registration.py b/synapse/config/registration.py
index ef917fc9f2..336959094b 100644
--- a/synapse/config/registration.py
+++ b/synapse/config/registration.py
@@ -31,6 +31,8 @@ class RegistrationConfig(Config):
                 strtobool(str(config["disable_registration"]))
             )
 
+        self.registrations_require_3pid = config.get("registrations_require_3pid", [])
+        self.allowed_local_3pids = config.get("allowed_local_3pids", [])
         self.registration_shared_secret = config.get("registration_shared_secret")
 
         self.bcrypt_rounds = config.get("bcrypt_rounds", 12)
@@ -52,6 +54,23 @@ class RegistrationConfig(Config):
         # Enable registration for new users.
         enable_registration: False
 
+        # The user must provide all of the below types of 3PID when registering.
+        #
+        # registrations_require_3pid:
+        #     - email
+        #     - msisdn
+
+        # Mandate that users are only allowed to associate certain formats of
+        # 3PIDs with accounts on this server.
+        #
+        # allowed_local_3pids:
+        #     - medium: email
+        #       pattern: ".*@matrix\\.org"
+        #     - medium: email
+        #       pattern: ".*@vector\\.im"
+        #     - medium: msisdn
+        #       pattern: "\\+44"
+
         # If set, allows registration by anyone who also has the shared
         # secret, even if registration is otherwise disabled.
         registration_shared_secret: "%(registration_shared_secret)s"
diff --git a/synapse/config/repository.py b/synapse/config/repository.py
index 6baa474931..25ea77738a 100644
--- a/synapse/config/repository.py
+++ b/synapse/config/repository.py
@@ -16,6 +16,8 @@
 from ._base import Config, ConfigError
 from collections import namedtuple
 
+from synapse.util.module_loader import load_module
+
 
 MISSING_NETADDR = (
     "Missing netaddr library. This is required for URL preview API."
@@ -36,6 +38,14 @@ ThumbnailRequirement = namedtuple(
     "ThumbnailRequirement", ["width", "height", "method", "media_type"]
 )
 
+MediaStorageProviderConfig = namedtuple(
+    "MediaStorageProviderConfig", (
+        "store_local",  # Whether to store newly uploaded local files
+        "store_remote",  # Whether to store newly downloaded remote files
+        "store_synchronous",  # Whether to wait for successful storage for local uploads
+    ),
+)
+
 
 def parse_thumbnail_requirements(thumbnail_sizes):
     """ Takes a list of dictionaries with "width", "height", and "method" keys
@@ -73,16 +83,61 @@ class ContentRepositoryConfig(Config):
 
         self.media_store_path = self.ensure_directory(config["media_store_path"])
 
-        self.backup_media_store_path = config.get("backup_media_store_path")
-        if self.backup_media_store_path:
-            self.backup_media_store_path = self.ensure_directory(
-                self.backup_media_store_path
-            )
+        backup_media_store_path = config.get("backup_media_store_path")
 
-        self.synchronous_backup_media_store = config.get(
+        synchronous_backup_media_store = config.get(
             "synchronous_backup_media_store", False
         )
 
+        storage_providers = config.get("media_storage_providers", [])
+
+        if backup_media_store_path:
+            if storage_providers:
+                raise ConfigError(
+                    "Cannot use both 'backup_media_store_path' and 'storage_providers'"
+                )
+
+            storage_providers = [{
+                "module": "file_system",
+                "store_local": True,
+                "store_synchronous": synchronous_backup_media_store,
+                "store_remote": True,
+                "config": {
+                    "directory": backup_media_store_path,
+                }
+            }]
+
+        # This is a list of config that can be used to create the storage
+        # providers. The entries are tuples of (Class, class_config,
+        # MediaStorageProviderConfig), where Class is the class of the provider,
+        # the class_config the config to pass to it, and
+        # MediaStorageProviderConfig are options for StorageProviderWrapper.
+        #
+        # We don't create the storage providers here as not all workers need
+        # them to be started.
+        self.media_storage_providers = []
+
+        for provider_config in storage_providers:
+            # We special case the module "file_system" so as not to need to
+            # expose FileStorageProviderBackend
+            if provider_config["module"] == "file_system":
+                provider_config["module"] = (
+                    "synapse.rest.media.v1.storage_provider"
+                    ".FileStorageProviderBackend"
+                )
+
+            provider_class, parsed_config = load_module(provider_config)
+
+            wrapper_config = MediaStorageProviderConfig(
+                provider_config.get("store_local", False),
+                provider_config.get("store_remote", False),
+                provider_config.get("store_synchronous", False),
+            )
+
+            self.media_storage_providers.append(
+                (provider_class, parsed_config, wrapper_config,)
+            )
+
         self.uploads_path = self.ensure_directory(config["uploads_path"])
         self.dynamic_thumbnails = config["dynamic_thumbnails"]
         self.thumbnail_requirements = parse_thumbnail_requirements(
@@ -127,13 +182,19 @@ class ContentRepositoryConfig(Config):
         # Directory where uploaded images and attachments are stored.
         media_store_path: "%(media_store)s"
 
-        # A secondary directory where uploaded images and attachments are
-        # stored as a backup.
-        # backup_media_store_path: "%(media_store)s"
-
-        # Whether to wait for successful write to backup media store before
-        # returning successfully.
-        # synchronous_backup_media_store: false
+        # Media storage providers allow media to be stored in different
+        # locations.
+        # media_storage_providers:
+        # - module: file_system
+        #   # Whether to write new local files.
+        #   store_local: false
+        #   # Whether to write new remote media
+        #   store_remote: false
+        #   # Whether to block upload requests waiting for write to this
+        #   # provider to complete
+        #   store_synchronous: false
+        #   config:
+        #     directory: /mnt/some/other/directory
 
         # Directory where in-progress uploads are stored.
         uploads_path: "%(uploads_path)s"
diff --git a/synapse/config/server.py b/synapse/config/server.py
index 436dd8a6fe..8f0b6d1f28 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -55,6 +55,17 @@ class ServerConfig(Config):
             "block_non_admin_invites", False,
         )
 
+        # FIXME: federation_domain_whitelist needs sytests
+        self.federation_domain_whitelist = None
+        federation_domain_whitelist = config.get(
+            "federation_domain_whitelist", None
+        )
+        # turn the whitelist into a hash for speed of lookup
+        if federation_domain_whitelist is not None:
+            self.federation_domain_whitelist = {}
+            for domain in federation_domain_whitelist:
+                self.federation_domain_whitelist[domain] = True
+
         if self.public_baseurl is not None:
             if self.public_baseurl[-1] != '/':
                 self.public_baseurl += '/'
@@ -210,6 +221,17 @@ class ServerConfig(Config):
         # (except those sent by local server admins). The default is False.
         # block_non_admin_invites: True
 
+        # Restrict federation to the following whitelist of domains.
+        # N.B. we recommend also firewalling your federation listener to limit
+        # inbound federation traffic as early as possible, rather than relying
+        # purely on this application-layer restriction.  If not specified, the
+        # default is to whitelist everything.
+        #
+        # federation_domain_whitelist:
+        #  - lon.example.com
+        #  - nyc.example.com
+        #  - syd.example.com
+
         # List of ports that Synapse should listen on, their purpose and their
         # configuration.
         listeners:
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index b1fe03f702..813907f7f2 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -23,7 +23,7 @@ from twisted.internet import defer
 
 from synapse.api.constants import Membership
 from synapse.api.errors import (
-    CodeMessageException, HttpResponseException, SynapseError,
+    CodeMessageException, HttpResponseException, SynapseError, FederationDeniedError
 )
 from synapse.events import builder
 from synapse.federation.federation_base import (
@@ -266,6 +266,9 @@ class FederationClient(FederationBase):
             except NotRetryingDestination as e:
                 logger.info(e.message)
                 continue
+            except FederationDeniedError as e:
+                logger.info(e.message)
+                continue
             except Exception as e:
                 pdu_attempts[destination] = now
 
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 9d39f46583..a141ec9953 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -19,7 +19,7 @@ from twisted.internet import defer
 from .persistence import TransactionActions
 from .units import Transaction, Edu
 
-from synapse.api.errors import HttpResponseException
+from synapse.api.errors import HttpResponseException, FederationDeniedError
 from synapse.util import logcontext, PreserveLoggingContext
 from synapse.util.async import run_on_reactor
 from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
@@ -490,6 +490,8 @@ class TransactionQueue(object):
                     (e.retry_last_ts + e.retry_interval) / 1000.0
                 ),
             )
+        except FederationDeniedError as e:
+            logger.info(e)
         except Exception as e:
             logger.warn(
                 "TX [%s] Failed to send transaction: %s",
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 1f3ce238f6..5488e82985 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -212,6 +212,9 @@ class TransportLayerClient(object):
 
             Fails with ``NotRetryingDestination`` if we are not yet ready
             to retry this server.
+
+            Fails with ``FederationDeniedError`` if the remote destination
+            is not in our federation whitelist
         """
         valid_memberships = {Membership.JOIN, Membership.LEAVE}
         if membership not in valid_memberships:
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 2b02b021ec..06c16ba4fa 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -16,7 +16,7 @@
 from twisted.internet import defer
 
 from synapse.api.urls import FEDERATION_PREFIX as PREFIX
-from synapse.api.errors import Codes, SynapseError
+from synapse.api.errors import Codes, SynapseError, FederationDeniedError
 from synapse.http.server import JsonResource
 from synapse.http.servlet import (
     parse_json_object_from_request, parse_integer_from_args, parse_string_from_args,
@@ -81,6 +81,7 @@ class Authenticator(object):
         self.keyring = hs.get_keyring()
         self.server_name = hs.hostname
         self.store = hs.get_datastore()
+        self.federation_domain_whitelist = hs.config.federation_domain_whitelist
 
     # A method just so we can pass 'self' as the authenticator to the Servlets
     @defer.inlineCallbacks
@@ -92,6 +93,12 @@ class Authenticator(object):
             "signatures": {},
         }
 
+        if (
+            self.federation_domain_whitelist is not None and
+            self.server_name not in self.federation_domain_whitelist
+        ):
+            raise FederationDeniedError(self.server_name)
+
         if content is not None:
             json_request["content"] = content
 
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 2152efc692..0e83453851 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -14,6 +14,7 @@
 # limitations under the License.
 from synapse.api import errors
 from synapse.api.constants import EventTypes
+from synapse.api.errors import FederationDeniedError
 from synapse.util import stringutils
 from synapse.util.async import Linearizer
 from synapse.util.caches.expiringcache import ExpiringCache
@@ -513,6 +514,9 @@ class DeviceListEduUpdater(object):
                     # This makes it more likely that the device lists will
                     # eventually become consistent.
                     return
+                except FederationDeniedError as e:
+                    logger.info(e)
+                    return
                 except Exception:
                     # TODO: Remember that we are now out of sync and try again
                     # later
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 5af8abf66b..9aa95f89e6 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -19,7 +19,9 @@ import logging
 from canonicaljson import encode_canonical_json
 from twisted.internet import defer
 
-from synapse.api.errors import SynapseError, CodeMessageException
+from synapse.api.errors import (
+    SynapseError, CodeMessageException, FederationDeniedError,
+)
 from synapse.types import get_domain_from_id, UserID
 from synapse.util.logcontext import preserve_fn, make_deferred_yieldable
 from synapse.util.retryutils import NotRetryingDestination
@@ -140,6 +142,10 @@ class E2eKeysHandler(object):
                 failures[destination] = {
                     "status": 503, "message": "Not ready for retry",
                 }
+            except FederationDeniedError as e:
+                failures[destination] = {
+                    "status": 403, "message": "Federation Denied",
+                }
             except Exception as e:
                 # include ConnectionRefused and other errors
                 failures[destination] = {
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index ac70730885..677532c87b 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -22,6 +22,7 @@ from ._base import BaseHandler
 
 from synapse.api.errors import (
     AuthError, FederationError, StoreError, CodeMessageException, SynapseError,
+    FederationDeniedError,
 )
 from synapse.api.constants import EventTypes, Membership, RejectedReason
 from synapse.events.validator import EventValidator
@@ -782,6 +783,9 @@ class FederationHandler(BaseHandler):
                 except NotRetryingDestination as e:
                     logger.info(e.message)
                     continue
+                except FederationDeniedError as e:
+                    logger.info(e)
+                    continue
                 except Exception as e:
                     logger.exception(
                         "Failed to backfill from %s because %s",
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 5b808beac1..9021d4d57f 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -25,6 +25,7 @@ from synapse.http.client import CaptchaServerHttpClient
 from synapse import types
 from synapse.types import UserID
 from synapse.util.async import run_on_reactor
+from synapse.util.threepids import check_3pid_allowed
 from ._base import BaseHandler
 
 logger = logging.getLogger(__name__)
@@ -293,7 +294,7 @@ class RegistrationHandler(BaseHandler):
         """
 
         for c in threepidCreds:
-            logger.info("validating theeepidcred sid %s on id server %s",
+            logger.info("validating threepidcred sid %s on id server %s",
                         c['sid'], c['idServer'])
             try:
                 identity_handler = self.hs.get_handlers().identity_handler
@@ -307,6 +308,11 @@ class RegistrationHandler(BaseHandler):
             logger.info("got threepid with medium '%s' and address '%s'",
                         threepid['medium'], threepid['address'])
 
+            if not check_3pid_allowed(self.hs, threepid['medium'], threepid['address']):
+                raise RegistrationError(
+                    403, "Third party identifier is not allowed"
+                )
+
     @defer.inlineCallbacks
     def bind_emails(self, user_id, threepidCreds):
         """Links emails with a user ID and informs an identity server.
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index bb40075387..dfa09141ed 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -203,7 +203,8 @@ class RoomListHandler(BaseHandler):
         if limit:
             step = limit + 1
         else:
-            step = len(rooms_to_scan)
+            # step cannot be zero
+            step = len(rooms_to_scan) if len(rooms_to_scan) != 0 else 1
 
         chunk = []
         for i in xrange(0, len(rooms_to_scan), step):
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 833496b72d..9145405cb0 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -27,7 +27,7 @@ import synapse.metrics
 from canonicaljson import encode_canonical_json
 
 from synapse.api.errors import (
-    SynapseError, Codes, HttpResponseException,
+    SynapseError, Codes, HttpResponseException, FederationDeniedError,
 )
 
 from signedjson.sign import sign_json
@@ -123,11 +123,22 @@ class MatrixFederationHttpClient(object):
 
             Fails with ``HTTPRequestException``: if we get an HTTP response
                 code >= 300.
+
             Fails with ``NotRetryingDestination`` if we are not yet ready
                 to retry this server.
+
+            Fails with ``FederationDeniedError`` if this destination
+                is not on our federation whitelist
+
             (May also fail with plenty of other Exceptions for things like DNS
                 failures, connection failures, SSL failures.)
         """
+        if (
+            self.hs.config.federation_domain_whitelist and
+            destination not in self.hs.config.federation_domain_whitelist
+        ):
+            raise FederationDeniedError(destination)
+
         limiter = yield synapse.util.retryutils.get_retry_limiter(
             destination,
             self.clock,
@@ -308,6 +319,9 @@ class MatrixFederationHttpClient(object):
 
             Fails with ``NotRetryingDestination`` if we are not yet ready
             to retry this server.
+
+            Fails with ``FederationDeniedError`` if this destination
+            is not on our federation whitelist
         """
 
         if not json_data_callback:
@@ -368,6 +382,9 @@ class MatrixFederationHttpClient(object):
 
             Fails with ``NotRetryingDestination`` if we are not yet ready
             to retry this server.
+
+            Fails with ``FederationDeniedError`` if this destination
+            is not on our federation whitelist
         """
 
         def body_callback(method, url_bytes, headers_dict):
@@ -422,6 +439,9 @@ class MatrixFederationHttpClient(object):
 
             Fails with ``NotRetryingDestination`` if we are not yet ready
             to retry this server.
+
+            Fails with ``FederationDeniedError`` if this destination
+            is not on our federation whitelist
         """
         logger.debug("get_json args: %s", args)
 
@@ -475,6 +495,9 @@ class MatrixFederationHttpClient(object):
 
             Fails with ``NotRetryingDestination`` if we are not yet ready
             to retry this server.
+
+            Fails with ``FederationDeniedError`` if this destination
+            is not on our federation whitelist
         """
 
         response = yield self._request(
@@ -518,6 +541,9 @@ class MatrixFederationHttpClient(object):
 
             Fails with ``NotRetryingDestination`` if we are not yet ready
             to retry this server.
+
+            Fails with ``FederationDeniedError`` if this destination
+            is not on our federation whitelist
         """
 
         encoded_args = {}
diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py
index 2265e6e8d6..e0cfb7d08f 100644
--- a/synapse/metrics/__init__.py
+++ b/synapse/metrics/__init__.py
@@ -146,10 +146,15 @@ def runUntilCurrentTimer(func):
             num_pending += 1
 
         num_pending += len(reactor.threadCallQueue)
-
         start = time.time() * 1000
         ret = func(*args, **kwargs)
         end = time.time() * 1000
+
+        # record the amount of wallclock time spent running pending calls.
+        # This is a proxy for the actual amount of time between reactor polls,
+        # since about 25% of time is actually spent running things triggered by
+        # I/O events, but that is harder to capture without rewriting half the
+        # reactor.
         tick_time.inc_by(end - start)
         pending_calls_metric.inc_by(num_pending)
 
diff --git a/synapse/rest/client/v1/register.py b/synapse/rest/client/v1/register.py
index 32ed1d3ab2..5c5fa8f7ab 100644
--- a/synapse/rest/client/v1/register.py
+++ b/synapse/rest/client/v1/register.py
@@ -70,10 +70,15 @@ class RegisterRestServlet(ClientV1RestServlet):
         self.handlers = hs.get_handlers()
 
     def on_GET(self, request):
+
+        require_email = 'email' in self.hs.config.registrations_require_3pid
+        require_msisdn = 'msisdn' in self.hs.config.registrations_require_3pid
+
+        flows = []
         if self.hs.config.enable_registration_captcha:
-            return (
-                200,
-                {"flows": [
+            # only support the email-only flow if we don't require MSISDN 3PIDs
+            if not require_msisdn:
+                flows.extend([
                     {
                         "type": LoginType.RECAPTCHA,
                         "stages": [
@@ -82,27 +87,34 @@ class RegisterRestServlet(ClientV1RestServlet):
                             LoginType.PASSWORD
                         ]
                     },
+                ])
+            # only support 3PIDless registration if no 3PIDs are required
+            if not require_email and not require_msisdn:
+                flows.extend([
                     {
                         "type": LoginType.RECAPTCHA,
                         "stages": [LoginType.RECAPTCHA, LoginType.PASSWORD]
                     }
-                ]}
-            )
+                ])
         else:
-            return (
-                200,
-                {"flows": [
+            # only support the email-only flow if we don't require MSISDN 3PIDs
+            if require_email or not require_msisdn:
+                flows.extend([
                     {
                         "type": LoginType.EMAIL_IDENTITY,
                         "stages": [
                             LoginType.EMAIL_IDENTITY, LoginType.PASSWORD
                         ]
-                    },
+                    }
+                ])
+            # only support 3PIDless registration if no 3PIDs are required
+            if not require_email and not require_msisdn:
+                flows.extend([
                     {
                         "type": LoginType.PASSWORD
                     }
-                ]}
-            )
+                ])
+        return (200, {"flows": flows})
 
     @defer.inlineCallbacks
     def on_POST(self, request):
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index 682a0af9fc..867ec8602c 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -195,15 +195,20 @@ class RoomSendEventRestServlet(ClientV1RestServlet):
         requester = yield self.auth.get_user_by_req(request, allow_guest=True)
         content = parse_json_object_from_request(request)
 
+        event_dict = {
+            "type": event_type,
+            "content": content,
+            "room_id": room_id,
+            "sender": requester.user.to_string(),
+        }
+
+        if 'ts' in request.args and requester.app_service:
+            event_dict['origin_server_ts'] = parse_integer(request, "ts", 0)
+
         msg_handler = self.handlers.message_handler
         event = yield msg_handler.create_and_send_nonmember_event(
             requester,
-            {
-                "type": event_type,
-                "content": content,
-                "room_id": room_id,
-                "sender": requester.user.to_string(),
-            },
+            event_dict,
             txn_id=txn_id,
         )
 
diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py
index 385a3ad2ec..30523995af 100644
--- a/synapse/rest/client/v2_alpha/account.py
+++ b/synapse/rest/client/v2_alpha/account.py
@@ -26,6 +26,7 @@ from synapse.http.servlet import (
 )
 from synapse.util.async import run_on_reactor
 from synapse.util.msisdn import phone_number_to_msisdn
+from synapse.util.threepids import check_3pid_allowed
 from ._base import client_v2_patterns, interactive_auth_handler
 
 logger = logging.getLogger(__name__)
@@ -47,6 +48,11 @@ class EmailPasswordRequestTokenRestServlet(RestServlet):
             'id_server', 'client_secret', 'email', 'send_attempt'
         ])
 
+        if not check_3pid_allowed(self.hs, "email", body['email']):
+            raise SynapseError(
+                403, "Third party identifier is not allowed", Codes.THREEPID_DENIED,
+            )
+
         existingUid = yield self.hs.get_datastore().get_user_id_by_threepid(
             'email', body['email']
         )
@@ -78,6 +84,11 @@ class MsisdnPasswordRequestTokenRestServlet(RestServlet):
 
         msisdn = phone_number_to_msisdn(body['country'], body['phone_number'])
 
+        if not check_3pid_allowed(self.hs, "msisdn", msisdn):
+            raise SynapseError(
+                403, "Third party identifier is not allowed", Codes.THREEPID_DENIED,
+            )
+
         existingUid = yield self.datastore.get_user_id_by_threepid(
             'msisdn', msisdn
         )
@@ -217,6 +228,11 @@ class EmailThreepidRequestTokenRestServlet(RestServlet):
         if absent:
             raise SynapseError(400, "Missing params: %r" % absent, Codes.MISSING_PARAM)
 
+        if not check_3pid_allowed(self.hs, "email", body['email']):
+            raise SynapseError(
+                403, "Third party identifier is not allowed", Codes.THREEPID_DENIED,
+            )
+
         existingUid = yield self.datastore.get_user_id_by_threepid(
             'email', body['email']
         )
@@ -255,6 +271,11 @@ class MsisdnThreepidRequestTokenRestServlet(RestServlet):
 
         msisdn = phone_number_to_msisdn(body['country'], body['phone_number'])
 
+        if not check_3pid_allowed(self.hs, "msisdn", msisdn):
+            raise SynapseError(
+                403, "Third party identifier is not allowed", Codes.THREEPID_DENIED,
+            )
+
         existingUid = yield self.datastore.get_user_id_by_threepid(
             'msisdn', msisdn
         )
diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py
index e9d88a8895..c6f4680a76 100644
--- a/synapse/rest/client/v2_alpha/register.py
+++ b/synapse/rest/client/v2_alpha/register.py
@@ -26,6 +26,7 @@ from synapse.http.servlet import (
     RestServlet, parse_json_object_from_request, assert_params_in_request, parse_string
 )
 from synapse.util.msisdn import phone_number_to_msisdn
+from synapse.util.threepids import check_3pid_allowed
 
 from ._base import client_v2_patterns, interactive_auth_handler
 
@@ -70,6 +71,11 @@ class EmailRegisterRequestTokenRestServlet(RestServlet):
             'id_server', 'client_secret', 'email', 'send_attempt'
         ])
 
+        if not check_3pid_allowed(self.hs, "email", body['email']):
+            raise SynapseError(
+                403, "Third party identifier is not allowed", Codes.THREEPID_DENIED,
+            )
+
         existingUid = yield self.hs.get_datastore().get_user_id_by_threepid(
             'email', body['email']
         )
@@ -105,6 +111,11 @@ class MsisdnRegisterRequestTokenRestServlet(RestServlet):
 
         msisdn = phone_number_to_msisdn(body['country'], body['phone_number'])
 
+        if not check_3pid_allowed(self.hs, "msisdn", msisdn):
+            raise SynapseError(
+                403, "Third party identifier is not allowed", Codes.THREEPID_DENIED,
+            )
+
         existingUid = yield self.hs.get_datastore().get_user_id_by_threepid(
             'msisdn', msisdn
         )
@@ -305,31 +316,67 @@ class RegisterRestServlet(RestServlet):
         if 'x_show_msisdn' in body and body['x_show_msisdn']:
             show_msisdn = True
 
+        # FIXME: need a better error than "no auth flow found" for scenarios
+        # where we required 3PID for registration but the user didn't give one
+        require_email = 'email' in self.hs.config.registrations_require_3pid
+        require_msisdn = 'msisdn' in self.hs.config.registrations_require_3pid
+
+        flows = []
         if self.hs.config.enable_registration_captcha:
-            flows = [
-                [LoginType.RECAPTCHA],
-                [LoginType.EMAIL_IDENTITY, LoginType.RECAPTCHA],
-            ]
+            # only support 3PIDless registration if no 3PIDs are required
+            if not require_email and not require_msisdn:
+                flows.extend([[LoginType.RECAPTCHA]])
+            # only support the email-only flow if we don't require MSISDN 3PIDs
+            if not require_msisdn:
+                flows.extend([[LoginType.EMAIL_IDENTITY, LoginType.RECAPTCHA]])
+
             if show_msisdn:
+                # only support the MSISDN-only flow if we don't require email 3PIDs
+                if not require_email:
+                    flows.extend([[LoginType.MSISDN, LoginType.RECAPTCHA]])
+                # always let users provide both MSISDN & email
                 flows.extend([
-                    [LoginType.MSISDN, LoginType.RECAPTCHA],
                     [LoginType.MSISDN, LoginType.EMAIL_IDENTITY, LoginType.RECAPTCHA],
                 ])
         else:
-            flows = [
-                [LoginType.DUMMY],
-                [LoginType.EMAIL_IDENTITY],
-            ]
+            # only support 3PIDless registration if no 3PIDs are required
+            if not require_email and not require_msisdn:
+                flows.extend([[LoginType.DUMMY]])
+            # only support the email-only flow if we don't require MSISDN 3PIDs
+            if not require_msisdn:
+                flows.extend([[LoginType.EMAIL_IDENTITY]])
+
             if show_msisdn:
+                # only support the MSISDN-only flow if we don't require email 3PIDs
+                if not require_email or require_msisdn:
+                    flows.extend([[LoginType.MSISDN]])
+                # always let users provide both MSISDN & email
                 flows.extend([
-                    [LoginType.MSISDN],
-                    [LoginType.MSISDN, LoginType.EMAIL_IDENTITY],
+                    [LoginType.MSISDN, LoginType.EMAIL_IDENTITY]
                 ])
 
         auth_result, params, session_id = yield self.auth_handler.check_auth(
             flows, body, self.hs.get_ip_from_request(request)
         )
 
+        # Check that we're not trying to register a denied 3pid.
+        #
+        # the user-facing checks will probably already have happened in
+        # /register/email/requestToken when we requested a 3pid, but that's not
+        # guaranteed.
+
+        if auth_result:
+            for login_type in [LoginType.EMAIL_IDENTITY, LoginType.MSISDN]:
+                if login_type in auth_result:
+                    medium = auth_result[login_type]['medium']
+                    address = auth_result[login_type]['address']
+
+                    if not check_3pid_allowed(self.hs, medium, address):
+                        raise SynapseError(
+                            403, "Third party identifier is not allowed",
+                            Codes.THREEPID_DENIED,
+                        )
+
         if registered_user_id is not None:
             logger.info(
                 "Already registered user ID %r for this session",
diff --git a/synapse/rest/key/v2/remote_key_resource.py b/synapse/rest/key/v2/remote_key_resource.py
index cc2842aa72..17e6079cba 100644
--- a/synapse/rest/key/v2/remote_key_resource.py
+++ b/synapse/rest/key/v2/remote_key_resource.py
@@ -93,6 +93,7 @@ class RemoteKey(Resource):
         self.store = hs.get_datastore()
         self.version_string = hs.version_string
         self.clock = hs.get_clock()
+        self.federation_domain_whitelist = hs.config.federation_domain_whitelist
 
     def render_GET(self, request):
         self.async_render_GET(request)
@@ -137,6 +138,13 @@ class RemoteKey(Resource):
         logger.info("Handling query for keys %r", query)
         store_queries = []
         for server_name, key_ids in query.items():
+            if (
+                self.federation_domain_whitelist is not None and
+                server_name not in self.federation_domain_whitelist
+            ):
+                logger.debug("Federation denied with %s", server_name)
+                continue
+
             if not key_ids:
                 key_ids = (None,)
             for key_id in key_ids:
diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py
index b2c76440b7..485db8577a 100644
--- a/synapse/rest/media/v1/media_repository.py
+++ b/synapse/rest/media/v1/media_repository.py
@@ -27,15 +27,14 @@ from .identicon_resource import IdenticonResource
 from .preview_url_resource import PreviewUrlResource
 from .filepath import MediaFilePaths
 from .thumbnailer import Thumbnailer
-from .storage_provider import (
-    StorageProviderWrapper, FileStorageProviderBackend,
-)
+from .storage_provider import StorageProviderWrapper
 from .media_storage import MediaStorage
 
 from synapse.http.matrixfederationclient import MatrixFederationHttpClient
 from synapse.util.stringutils import random_string
-from synapse.api.errors import SynapseError, HttpResponseException, \
-    NotFoundError
+from synapse.api.errors import (
+    SynapseError, HttpResponseException, NotFoundError, FederationDeniedError,
+)
 
 from synapse.util.async import Linearizer
 from synapse.util.stringutils import is_ascii
@@ -77,21 +76,19 @@ class MediaRepository(object):
         self.recently_accessed_remotes = set()
         self.recently_accessed_locals = set()
 
+        self.federation_domain_whitelist = hs.config.federation_domain_whitelist
+
         # List of StorageProviders where we should search for media and
         # potentially upload to.
         storage_providers = []
 
-        # TODO: Move this into config and allow other storage providers to be
-        # defined.
-        if hs.config.backup_media_store_path:
-            backend = FileStorageProviderBackend(
-                self.primary_base_path, hs.config.backup_media_store_path,
-            )
+        for clz, provider_config, wrapper_config in hs.config.media_storage_providers:
+            backend = clz(hs, provider_config)
             provider = StorageProviderWrapper(
                 backend,
-                store=True,
-                store_synchronous=hs.config.synchronous_backup_media_store,
-                store_remote=True,
+                store_local=wrapper_config.store_local,
+                store_remote=wrapper_config.store_remote,
+                store_synchronous=wrapper_config.store_synchronous,
             )
             storage_providers.append(provider)
 
@@ -222,6 +219,12 @@ class MediaRepository(object):
             Deferred: Resolves once a response has successfully been written
                 to request
         """
+        if (
+            self.federation_domain_whitelist is not None and
+            server_name not in self.federation_domain_whitelist
+        ):
+            raise FederationDeniedError(server_name)
+
         self.mark_recently_accessed(server_name, media_id)
 
         # We linearize here to ensure that we don't try and download remote
@@ -256,6 +259,12 @@ class MediaRepository(object):
         Returns:
             Deferred[dict]: The media_info of the file
         """
+        if (
+            self.federation_domain_whitelist is not None and
+            server_name not in self.federation_domain_whitelist
+        ):
+            raise FederationDeniedError(server_name)
+
         # We linearize here to ensure that we don't try and download remote
         # media multiple times concurrently
         key = (server_name, media_id)
diff --git a/synapse/rest/media/v1/media_storage.py b/synapse/rest/media/v1/media_storage.py
index 001e84578e..041ae396cd 100644
--- a/synapse/rest/media/v1/media_storage.py
+++ b/synapse/rest/media/v1/media_storage.py
@@ -164,6 +164,14 @@ class MediaStorage(object):
             str
         """
         if file_info.url_cache:
+            if file_info.thumbnail:
+                return self.filepaths.url_cache_thumbnail_rel(
+                    media_id=file_info.file_id,
+                    width=file_info.thumbnail_width,
+                    height=file_info.thumbnail_height,
+                    content_type=file_info.thumbnail_type,
+                    method=file_info.thumbnail_method,
+                )
             return self.filepaths.url_cache_filepath_rel(file_info.file_id)
 
         if file_info.server_name:
diff --git a/synapse/rest/media/v1/storage_provider.py b/synapse/rest/media/v1/storage_provider.py
index 2ad602e101..c188192f2b 100644
--- a/synapse/rest/media/v1/storage_provider.py
+++ b/synapse/rest/media/v1/storage_provider.py
@@ -17,6 +17,7 @@ from twisted.internet import defer, threads
 
 from .media_storage import FileResponder
 
+from synapse.config._base import Config
 from synapse.util.logcontext import preserve_fn
 
 import logging
@@ -64,19 +65,19 @@ class StorageProviderWrapper(StorageProvider):
 
     Args:
         backend (StorageProvider)
-        store (bool): Whether to store new files or not.
+        store_local (bool): Whether to store new local files or not.
         store_synchronous (bool): Whether to wait for file to be successfully
             uploaded, or todo the upload in the backgroud.
         store_remote (bool): Whether remote media should be uploaded
     """
-    def __init__(self, backend, store, store_synchronous, store_remote):
+    def __init__(self, backend, store_local, store_synchronous, store_remote):
         self.backend = backend
-        self.store = store
+        self.store_local = store_local
         self.store_synchronous = store_synchronous
         self.store_remote = store_remote
 
     def store_file(self, path, file_info):
-        if not self.store:
+        if not file_info.server_name and not self.store_local:
             return defer.succeed(None)
 
         if file_info.server_name and not self.store_remote:
@@ -97,13 +98,13 @@ class FileStorageProviderBackend(StorageProvider):
     """A storage provider that stores files in a directory on a filesystem.
 
     Args:
-        cache_directory (str): Base path of the local media repository
-        base_directory (str): Base path to store new files
+        hs (HomeServer)
+        config: The config returned by `parse_config`.
     """
 
-    def __init__(self, cache_directory, base_directory):
-        self.cache_directory = cache_directory
-        self.base_directory = base_directory
+    def __init__(self, hs, config):
+        self.cache_directory = hs.config.media_store_path
+        self.base_directory = config
 
     def store_file(self, path, file_info):
         """See StorageProvider.store_file"""
@@ -125,3 +126,15 @@ class FileStorageProviderBackend(StorageProvider):
         backup_fname = os.path.join(self.base_directory, path)
         if os.path.isfile(backup_fname):
             return FileResponder(open(backup_fname, "rb"))
+
+    @staticmethod
+    def parse_config(config):
+        """Called on startup to parse config supplied. This should parse
+        the config and raise if there is a problem.
+
+        The returned value is passed into the constructor.
+
+        In this case we only care about a single param, the directory, so let's
+        just pull that out.
+        """
+        return Config.ensure_directory(config["directory"])
diff --git a/synapse/rest/media/v1/thumbnail_resource.py b/synapse/rest/media/v1/thumbnail_resource.py
index c09f2dec41..12e84a2b7c 100644
--- a/synapse/rest/media/v1/thumbnail_resource.py
+++ b/synapse/rest/media/v1/thumbnail_resource.py
@@ -67,7 +67,7 @@ class ThumbnailResource(Resource):
                 yield self._respond_local_thumbnail(
                     request, media_id, width, height, method, m_type
                 )
-            self.media_repo.mark_recently_accessed(server_name, media_id)
+            self.media_repo.mark_recently_accessed(None, media_id)
         else:
             if self.dynamic_thumbnails:
                 yield self._select_or_generate_remote_thumbnail(
@@ -79,7 +79,7 @@ class ThumbnailResource(Resource):
                     request, server_name, media_id,
                     width, height, method, m_type
                 )
-            self.media_repo.mark_recently_accessed(None, media_id)
+            self.media_repo.mark_recently_accessed(server_name, media_id)
 
     @defer.inlineCallbacks
     def _respond_local_thumbnail(self, request, media_id, width, height,
diff --git a/synapse/server.py b/synapse/server.py
index 99693071b6..ff8a8fbc46 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -307,6 +307,23 @@ class HomeServer(object):
             **self.db_config.get("args", {})
         )
 
+    def get_db_conn(self, run_new_connection=True):
+        """Makes a new connection to the database, skipping the db pool
+
+        Returns:
+            Connection: a connection object implementing the PEP-249 spec
+        """
+        # Any param beginning with cp_ is a parameter for adbapi, and should
+        # not be passed to the database engine.
+        db_params = {
+            k: v for k, v in self.db_config.get("args", {}).items()
+            if not k.startswith("cp_")
+        }
+        db_conn = self.database_engine.module.connect(**db_params)
+        if run_new_connection:
+            self.database_engine.on_new_connection(db_conn)
+        return db_conn
+
     def build_media_repository_resource(self):
         # build the media repo resource. This indirects through the HomeServer
         # to ensure that we only have a single instance of
diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py
index f150ef0103..dfdcbb3181 100644
--- a/synapse/storage/user_directory.py
+++ b/synapse/storage/user_directory.py
@@ -641,13 +641,12 @@ class UserDirectoryStore(SQLBaseStore):
         """
 
         if self.hs.config.user_directory_search_all_users:
-            # dummy to keep the number of binds & aliases the same
+            # make s.user_id null to keep the ordering algorithm happy
             join_clause = """
-                LEFT JOIN (
-                    SELECT NULL as user_id WHERE NULL = ?
-                ) AS s USING (user_id)"
+                CROSS JOIN (SELECT NULL as user_id) AS s
             """
-            where_clause = ""
+            join_args = ()
+            where_clause = "1=1"
         else:
             join_clause = """
                 LEFT JOIN users_in_public_rooms AS p USING (user_id)
@@ -656,6 +655,7 @@ class UserDirectoryStore(SQLBaseStore):
                     WHERE user_id = ? AND share_private
                 ) AS s USING (user_id)
             """
+            join_args = (user_id,)
             where_clause = "(s.user_id IS NOT NULL OR p.user_id IS NOT NULL)"
 
         if isinstance(self.database_engine, PostgresEngine):
@@ -697,7 +697,7 @@ class UserDirectoryStore(SQLBaseStore):
                     avatar_url IS NULL
                 LIMIT ?
             """ % (join_clause, where_clause)
-            args = (user_id, full_query, exact_query, prefix_query, limit + 1,)
+            args = join_args + (full_query, exact_query, prefix_query, limit + 1,)
         elif isinstance(self.database_engine, Sqlite3Engine):
             search_query = _parse_query_sqlite(search_term)
 
@@ -715,7 +715,7 @@ class UserDirectoryStore(SQLBaseStore):
                     avatar_url IS NULL
                 LIMIT ?
             """ % (join_clause, where_clause)
-            args = (user_id, search_query, limit + 1)
+            args = join_args + (search_query, limit + 1)
         else:
             # This should be unreachable.
             raise Exception("Unrecognized database engine")
diff --git a/synapse/util/file_consumer.py b/synapse/util/file_consumer.py
new file mode 100644
index 0000000000..90a2608d6f
--- /dev/null
+++ b/synapse/util/file_consumer.py
@@ -0,0 +1,139 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import threads, reactor
+
+from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
+
+import Queue
+
+
+class BackgroundFileConsumer(object):
+    """A consumer that writes to a file like object. Supports both push
+    and pull producers
+
+    Args:
+        file_obj (file): The file like object to write to. Closed when
+            finished.
+    """
+
+    # For PushProducers pause if we have this many unwritten slices
+    _PAUSE_ON_QUEUE_SIZE = 5
+    # And resume once the size of the queue is less than this
+    _RESUME_ON_QUEUE_SIZE = 2
+
+    def __init__(self, file_obj):
+        self._file_obj = file_obj
+
+        # Producer we're registered with
+        self._producer = None
+
+        # True if PushProducer, false if PullProducer
+        self.streaming = False
+
+        # For PushProducers, indicates whether we've paused the producer and
+        # need to call resumeProducing before we get more data.
+        self._paused_producer = False
+
+        # Queue of slices of bytes to be written. When producer calls
+        # unregister a final None is sent.
+        self._bytes_queue = Queue.Queue()
+
+        # Deferred that is resolved when finished writing
+        self._finished_deferred = None
+
+        # If the _writer thread throws an exception it gets stored here.
+        self._write_exception = None
+
+    def registerProducer(self, producer, streaming):
+        """Part of IConsumer interface
+
+        Args:
+            producer (IProducer)
+            streaming (bool): True if push based producer, False if pull
+                based.
+        """
+        if self._producer:
+            raise Exception("registerProducer called twice")
+
+        self._producer = producer
+        self.streaming = streaming
+        self._finished_deferred = preserve_fn(threads.deferToThread)(self._writer)
+        if not streaming:
+            self._producer.resumeProducing()
+
+    def unregisterProducer(self):
+        """Part of IProducer interface
+        """
+        self._producer = None
+        if not self._finished_deferred.called:
+            self._bytes_queue.put_nowait(None)
+
+    def write(self, bytes):
+        """Part of IProducer interface
+        """
+        if self._write_exception:
+            raise self._write_exception
+
+        if self._finished_deferred.called:
+            raise Exception("consumer has closed")
+
+        self._bytes_queue.put_nowait(bytes)
+
+        # If this is a PushProducer and the queue is getting behind
+        # then we pause the producer.
+        if self.streaming and self._bytes_queue.qsize() >= self._PAUSE_ON_QUEUE_SIZE:
+            self._paused_producer = True
+            self._producer.pauseProducing()
+
+    def _writer(self):
+        """This is run in a background thread to write to the file.
+        """
+        try:
+            while self._producer or not self._bytes_queue.empty():
+                # If we've paused the producer check if we should resume the
+                # producer.
+                if self._producer and self._paused_producer:
+                    if self._bytes_queue.qsize() <= self._RESUME_ON_QUEUE_SIZE:
+                        reactor.callFromThread(self._resume_paused_producer)
+
+                bytes = self._bytes_queue.get()
+
+                # If we get a None (or empty list) then that's a signal used
+                # to indicate we should check if we should stop.
+                if bytes:
+                    self._file_obj.write(bytes)
+
+                # If its a pull producer then we need to explicitly ask for
+                # more stuff.
+                if not self.streaming and self._producer:
+                    reactor.callFromThread(self._producer.resumeProducing)
+        except Exception as e:
+            self._write_exception = e
+            raise
+        finally:
+            self._file_obj.close()
+
+    def wait(self):
+        """Returns a deferred that resolves when finished writing to file
+        """
+        return make_deferred_yieldable(self._finished_deferred)
+
+    def _resume_paused_producer(self):
+        """Gets called if we should resume producing after being paused
+        """
+        if self._paused_producer and self._producer:
+            self._paused_producer = False
+            self._producer.resumeProducing()
diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py
index 1adedbb361..47b0bb5eb3 100644
--- a/synapse/util/retryutils.py
+++ b/synapse/util/retryutils.py
@@ -26,6 +26,18 @@ logger = logging.getLogger(__name__)
 
 class NotRetryingDestination(Exception):
     def __init__(self, retry_last_ts, retry_interval, destination):
+        """Raised by the limiter (and federation client) to indicate that we are
+        are deliberately not attempting to contact a given server.
+
+        Args:
+            retry_last_ts (int): the unix ts in milliseconds of our last attempt
+                to contact the server.  0 indicates that the last attempt was
+                successful or that we've never actually attempted to connect.
+            retry_interval (int): the time in milliseconds to wait until the next
+                attempt.
+            destination (str): the domain in question
+        """
+
         msg = "Not retrying server %s." % (destination,)
         super(NotRetryingDestination, self).__init__(msg)
 
diff --git a/synapse/util/threepids.py b/synapse/util/threepids.py
new file mode 100644
index 0000000000..75efa0117b
--- /dev/null
+++ b/synapse/util/threepids.py
@@ -0,0 +1,48 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import re
+
+logger = logging.getLogger(__name__)
+
+
+def check_3pid_allowed(hs, medium, address):
+    """Checks whether a given format of 3PID is allowed to be used on this HS
+
+    Args:
+        hs (synapse.server.HomeServer): server
+        medium (str): 3pid medium - e.g. email, msisdn
+        address (str): address within that medium (e.g. "wotan@matrix.org")
+            msisdns need to first have been canonicalised
+    Returns:
+        bool: whether the 3PID medium/address is allowed to be added to this HS
+    """
+
+    if hs.config.allowed_local_3pids:
+        for constraint in hs.config.allowed_local_3pids:
+            logger.debug(
+                "Checking 3PID %s (%s) against %s (%s)",
+                address, medium, constraint['pattern'], constraint['medium'],
+            )
+            if (
+                medium == constraint['medium'] and
+                re.match(constraint['pattern'], address)
+            ):
+                return True
+    else:
+        return True
+
+    return False
diff --git a/tests/crypto/test_keyring.py b/tests/crypto/test_keyring.py
index c899fecf5d..d4ec02ffc2 100644
--- a/tests/crypto/test_keyring.py
+++ b/tests/crypto/test_keyring.py
@@ -167,7 +167,7 @@ class KeyringTestCase(unittest.TestCase):
 
             # wait a tick for it to send the request to the perspectives server
             # (it first tries the datastore)
-            yield async.sleep(0.005)
+            yield async.sleep(1)   # XXX find out why this takes so long!
             self.http_client.post_json.assert_called_once()
 
             self.assertIs(LoggingContext.current_context(), context_11)
@@ -183,7 +183,7 @@ class KeyringTestCase(unittest.TestCase):
                 res_deferreds_2 = kr.verify_json_objects_for_server(
                     [("server10", json1)],
                 )
-                yield async.sleep(0.005)
+                yield async.sleep(01)
                 self.http_client.post_json.assert_not_called()
                 res_deferreds_2[0].addBoth(self.check_context, None)
 
diff --git a/tests/handlers/test_e2e_keys.py b/tests/handlers/test_e2e_keys.py
index 19f5ed6bce..d92bf240b1 100644
--- a/tests/handlers/test_e2e_keys.py
+++ b/tests/handlers/test_e2e_keys.py
@@ -143,7 +143,6 @@ class E2eKeysHandlerTestCase(unittest.TestCase):
         except errors.SynapseError:
             pass
 
-    @unittest.DEBUG
     @defer.inlineCallbacks
     def test_claim_one_time_key(self):
         local_user = "@boris:" + self.hs.hostname
diff --git a/tests/replication/slave/storage/_base.py b/tests/replication/slave/storage/_base.py
index 81063f19a1..74f104e3b8 100644
--- a/tests/replication/slave/storage/_base.py
+++ b/tests/replication/slave/storage/_base.py
@@ -15,6 +15,8 @@
 from twisted.internet import defer, reactor
 from tests import unittest
 
+import tempfile
+
 from mock import Mock, NonCallableMock
 from tests.utils import setup_test_homeserver
 from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
@@ -41,7 +43,9 @@ class BaseSlavedStoreTestCase(unittest.TestCase):
         self.event_id = 0
 
         server_factory = ReplicationStreamProtocolFactory(self.hs)
-        listener = reactor.listenUNIX("\0xxx", server_factory)
+        # XXX: mktemp is unsafe and should never be used. but we're just a test.
+        path = tempfile.mktemp(prefix="base_slaved_store_test_case_socket")
+        listener = reactor.listenUNIX(path, server_factory)
         self.addCleanup(listener.stopListening)
         self.streamer = server_factory.streamer
 
@@ -49,7 +53,7 @@ class BaseSlavedStoreTestCase(unittest.TestCase):
         client_factory = ReplicationClientFactory(
             self.hs, "client_name", self.replication_handler
         )
-        client_connector = reactor.connectUNIX("\0xxx", client_factory)
+        client_connector = reactor.connectUNIX(path, client_factory)
         self.addCleanup(client_factory.stopTrying)
         self.addCleanup(client_connector.disconnect)
 
diff --git a/tests/rest/client/v2_alpha/test_register.py b/tests/rest/client/v2_alpha/test_register.py
index 096f771bea..8aba456510 100644
--- a/tests/rest/client/v2_alpha/test_register.py
+++ b/tests/rest/client/v2_alpha/test_register.py
@@ -49,6 +49,7 @@ class RegisterRestServletTestCase(unittest.TestCase):
         self.hs.get_auth_handler = Mock(return_value=self.auth_handler)
         self.hs.get_device_handler = Mock(return_value=self.device_handler)
         self.hs.config.enable_registration = True
+        self.hs.config.registrations_require_3pid = []
         self.hs.config.auto_join_rooms = []
 
         # init the thing we're testing
diff --git a/tests/storage/test_user_directory.py b/tests/storage/test_user_directory.py
new file mode 100644
index 0000000000..0891308f25
--- /dev/null
+++ b/tests/storage/test_user_directory.py
@@ -0,0 +1,88 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+
+from synapse.storage import UserDirectoryStore
+from synapse.storage.roommember import ProfileInfo
+from tests import unittest
+from tests.utils import setup_test_homeserver
+
+ALICE = "@alice:a"
+BOB = "@bob:b"
+BOBBY = "@bobby:a"
+
+
+class UserDirectoryStoreTestCase(unittest.TestCase):
+    @defer.inlineCallbacks
+    def setUp(self):
+        self.hs = yield setup_test_homeserver()
+        self.store = UserDirectoryStore(None, self.hs)
+
+        # alice and bob are both in !room_id. bobby is not but shares
+        # a homeserver with alice.
+        yield self.store.add_profiles_to_user_dir(
+            "!room:id",
+            {
+                ALICE: ProfileInfo(None, "alice"),
+                BOB: ProfileInfo(None, "bob"),
+                BOBBY: ProfileInfo(None, "bobby")
+            },
+        )
+        yield self.store.add_users_to_public_room(
+            "!room:id",
+            [ALICE, BOB],
+        )
+        yield self.store.add_users_who_share_room(
+            "!room:id",
+            False,
+            (
+                (ALICE, BOB),
+                (BOB, ALICE),
+            ),
+        )
+
+    @defer.inlineCallbacks
+    def test_search_user_dir(self):
+        # normally when alice searches the directory she should just find
+        # bob because bobby doesn't share a room with her.
+        r = yield self.store.search_user_dir(ALICE, "bob", 10)
+        self.assertFalse(r["limited"])
+        self.assertEqual(1, len(r["results"]))
+        self.assertDictEqual(r["results"][0], {
+            "user_id": BOB,
+            "display_name": "bob",
+            "avatar_url": None,
+        })
+
+    @defer.inlineCallbacks
+    def test_search_user_dir_all_users(self):
+        self.hs.config.user_directory_search_all_users = True
+        try:
+            r = yield self.store.search_user_dir(ALICE, "bob", 10)
+            self.assertFalse(r["limited"])
+            self.assertEqual(2, len(r["results"]))
+            self.assertDictEqual(r["results"][0], {
+                "user_id": BOB,
+                "display_name": "bob",
+                "avatar_url": None,
+            })
+            self.assertDictEqual(r["results"][1], {
+                "user_id": BOBBY,
+                "display_name": "bobby",
+                "avatar_url": None,
+            })
+        finally:
+            self.hs.config.user_directory_search_all_users = False
diff --git a/tests/util/test_file_consumer.py b/tests/util/test_file_consumer.py
new file mode 100644
index 0000000000..76e2234255
--- /dev/null
+++ b/tests/util/test_file_consumer.py
@@ -0,0 +1,176 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from twisted.internet import defer, reactor
+from mock import NonCallableMock
+
+from synapse.util.file_consumer import BackgroundFileConsumer
+
+from tests import unittest
+from StringIO import StringIO
+
+import threading
+
+
+class FileConsumerTests(unittest.TestCase):
+
+    @defer.inlineCallbacks
+    def test_pull_consumer(self):
+        string_file = StringIO()
+        consumer = BackgroundFileConsumer(string_file)
+
+        try:
+            producer = DummyPullProducer()
+
+            yield producer.register_with_consumer(consumer)
+
+            yield producer.write_and_wait("Foo")
+
+            self.assertEqual(string_file.getvalue(), "Foo")
+
+            yield producer.write_and_wait("Bar")
+
+            self.assertEqual(string_file.getvalue(), "FooBar")
+        finally:
+            consumer.unregisterProducer()
+
+        yield consumer.wait()
+
+        self.assertTrue(string_file.closed)
+
+    @defer.inlineCallbacks
+    def test_push_consumer(self):
+        string_file = BlockingStringWrite()
+        consumer = BackgroundFileConsumer(string_file)
+
+        try:
+            producer = NonCallableMock(spec_set=[])
+
+            consumer.registerProducer(producer, True)
+
+            consumer.write("Foo")
+            yield string_file.wait_for_n_writes(1)
+
+            self.assertEqual(string_file.buffer, "Foo")
+
+            consumer.write("Bar")
+            yield string_file.wait_for_n_writes(2)
+
+            self.assertEqual(string_file.buffer, "FooBar")
+        finally:
+            consumer.unregisterProducer()
+
+        yield consumer.wait()
+
+        self.assertTrue(string_file.closed)
+
+    @defer.inlineCallbacks
+    def test_push_producer_feedback(self):
+        string_file = BlockingStringWrite()
+        consumer = BackgroundFileConsumer(string_file)
+
+        try:
+            producer = NonCallableMock(spec_set=["pauseProducing", "resumeProducing"])
+
+            resume_deferred = defer.Deferred()
+            producer.resumeProducing.side_effect = lambda: resume_deferred.callback(None)
+
+            consumer.registerProducer(producer, True)
+
+            number_writes = 0
+            with string_file.write_lock:
+                for _ in range(consumer._PAUSE_ON_QUEUE_SIZE):
+                    consumer.write("Foo")
+                    number_writes += 1
+
+                producer.pauseProducing.assert_called_once()
+
+            yield string_file.wait_for_n_writes(number_writes)
+
+            yield resume_deferred
+            producer.resumeProducing.assert_called_once()
+        finally:
+            consumer.unregisterProducer()
+
+        yield consumer.wait()
+
+        self.assertTrue(string_file.closed)
+
+
+class DummyPullProducer(object):
+    def __init__(self):
+        self.consumer = None
+        self.deferred = defer.Deferred()
+
+    def resumeProducing(self):
+        d = self.deferred
+        self.deferred = defer.Deferred()
+        d.callback(None)
+
+    def write_and_wait(self, bytes):
+        d = self.deferred
+        self.consumer.write(bytes)
+        return d
+
+    def register_with_consumer(self, consumer):
+        d = self.deferred
+        self.consumer = consumer
+        self.consumer.registerProducer(self, False)
+        return d
+
+
+class BlockingStringWrite(object):
+    def __init__(self):
+        self.buffer = ""
+        self.closed = False
+        self.write_lock = threading.Lock()
+
+        self._notify_write_deferred = None
+        self._number_of_writes = 0
+
+    def write(self, bytes):
+        with self.write_lock:
+            self.buffer += bytes
+            self._number_of_writes += 1
+
+        reactor.callFromThread(self._notify_write)
+
+    def close(self):
+        self.closed = True
+
+    def _notify_write(self):
+        "Called by write to indicate a write happened"
+        with self.write_lock:
+            if not self._notify_write_deferred:
+                return
+            d = self._notify_write_deferred
+            self._notify_write_deferred = None
+        d.callback(None)
+
+    @defer.inlineCallbacks
+    def wait_for_n_writes(self, n):
+        "Wait for n writes to have happened"
+        while True:
+            with self.write_lock:
+                if n <= self._number_of_writes:
+                    return
+
+                if not self._notify_write_deferred:
+                    self._notify_write_deferred = defer.Deferred()
+
+                d = self._notify_write_deferred
+
+            yield d
diff --git a/tests/utils.py b/tests/utils.py
index 44e5f75093..8efd3a3475 100644
--- a/tests/utils.py
+++ b/tests/utils.py
@@ -13,27 +13,28 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from synapse.http.server import HttpServer
-from synapse.api.errors import cs_error, CodeMessageException, StoreError
-from synapse.api.constants import EventTypes
-from synapse.storage.prepare_database import prepare_database
-from synapse.storage.engines import create_engine
-from synapse.server import HomeServer
-from synapse.federation.transport import server
-from synapse.util.ratelimitutils import FederationRateLimiter
-
-from synapse.util.logcontext import LoggingContext
-
-from twisted.internet import defer, reactor
-from twisted.enterprise.adbapi import ConnectionPool
-
-from collections import namedtuple
-from mock import patch, Mock
 import hashlib
+from inspect import getcallargs
 import urllib
 import urlparse
 
-from inspect import getcallargs
+from mock import Mock, patch
+from twisted.internet import defer, reactor
+
+from synapse.api.errors import CodeMessageException, cs_error
+from synapse.federation.transport import server
+from synapse.http.server import HttpServer
+from synapse.server import HomeServer
+from synapse.storage import PostgresEngine
+from synapse.storage.engines import create_engine
+from synapse.storage.prepare_database import prepare_database
+from synapse.util.logcontext import LoggingContext
+from synapse.util.ratelimitutils import FederationRateLimiter
+
+# set this to True to run the tests against postgres instead of sqlite.
+# It requires you to have a local postgres database called synapse_test, within
+# which ALL TABLES WILL BE DROPPED
+USE_POSTGRES_FOR_TESTS = False
 
 
 @defer.inlineCallbacks
@@ -57,36 +58,70 @@ def setup_test_homeserver(name="test", datastore=None, config=None, **kargs):
         config.worker_app = None
         config.email_enable_notifs = False
         config.block_non_admin_invites = False
+        config.federation_domain_whitelist = None
+        config.user_directory_search_all_users = False
 
         # disable user directory updates, because they get done in the
         # background, which upsets the test runner.
         config.update_user_directory = False
 
     config.use_frozen_dicts = True
-    config.database_config = {"name": "sqlite3"}
     config.ldap_enabled = False
 
     if "clock" not in kargs:
         kargs["clock"] = MockClock()
 
+    if USE_POSTGRES_FOR_TESTS:
+        config.database_config = {
+            "name": "psycopg2",
+            "args": {
+                "database": "synapse_test",
+                "cp_min": 1,
+                "cp_max": 5,
+            },
+        }
+    else:
+        config.database_config = {
+            "name": "sqlite3",
+            "args": {
+                "database": ":memory:",
+                "cp_min": 1,
+                "cp_max": 1,
+            },
+        }
+
+    db_engine = create_engine(config.database_config)
+
+    # we need to configure the connection pool to run the on_new_connection
+    # function, so that we can test code that uses custom sqlite functions
+    # (like rank).
+    config.database_config["args"]["cp_openfun"] = db_engine.on_new_connection
+
     if datastore is None:
-        db_pool = SQLiteMemoryDbPool()
-        yield db_pool.prepare()
         hs = HomeServer(
-            name, db_pool=db_pool, config=config,
+            name, config=config,
+            db_config=config.database_config,
             version_string="Synapse/tests",
-            database_engine=create_engine(config.database_config),
-            get_db_conn=db_pool.get_db_conn,
+            database_engine=db_engine,
             room_list_handler=object(),
             tls_server_context_factory=Mock(),
             **kargs
         )
+        db_conn = hs.get_db_conn()
+        # make sure that the database is empty
+        if isinstance(db_engine, PostgresEngine):
+            cur = db_conn.cursor()
+            cur.execute("SELECT tablename FROM pg_tables where schemaname='public'")
+            rows = cur.fetchall()
+            for r in rows:
+                cur.execute("DROP TABLE %s CASCADE" % r[0])
+        yield prepare_database(db_conn, db_engine, config)
         hs.setup()
     else:
         hs = HomeServer(
             name, db_pool=None, datastore=datastore, config=config,
             version_string="Synapse/tests",
-            database_engine=create_engine(config.database_config),
+            database_engine=db_engine,
             room_list_handler=object(),
             tls_server_context_factory=Mock(),
             **kargs
@@ -305,168 +340,6 @@ class MockClock(object):
         return d
 
 
-class SQLiteMemoryDbPool(ConnectionPool, object):
-    def __init__(self):
-        super(SQLiteMemoryDbPool, self).__init__(
-            "sqlite3", ":memory:",
-            cp_min=1,
-            cp_max=1,
-        )
-
-        self.config = Mock()
-        self.config.password_providers = []
-        self.config.database_config = {"name": "sqlite3"}
-
-    def prepare(self):
-        engine = self.create_engine()
-        return self.runWithConnection(
-            lambda conn: prepare_database(conn, engine, self.config)
-        )
-
-    def get_db_conn(self):
-        conn = self.connect()
-        engine = self.create_engine()
-        prepare_database(conn, engine, self.config)
-        return conn
-
-    def create_engine(self):
-        return create_engine(self.config.database_config)
-
-
-class MemoryDataStore(object):
-
-    Room = namedtuple(
-        "Room",
-        ["room_id", "is_public", "creator"]
-    )
-
-    def __init__(self):
-        self.tokens_to_users = {}
-        self.paths_to_content = {}
-
-        self.members = {}
-        self.rooms = {}
-
-        self.current_state = {}
-        self.events = []
-
-    class Snapshot(namedtuple("Snapshot", "room_id user_id membership_state")):
-        def fill_out_prev_events(self, event):
-            pass
-
-    def snapshot_room(self, room_id, user_id, state_type=None, state_key=None):
-        return self.Snapshot(
-            room_id, user_id, self.get_room_member(user_id, room_id)
-        )
-
-    def register(self, user_id, token, password_hash):
-        if user_id in self.tokens_to_users.values():
-            raise StoreError(400, "User in use.")
-        self.tokens_to_users[token] = user_id
-
-    def get_user_by_access_token(self, token):
-        try:
-            return {
-                "name": self.tokens_to_users[token],
-            }
-        except Exception:
-            raise StoreError(400, "User does not exist.")
-
-    def get_room(self, room_id):
-        try:
-            return self.rooms[room_id]
-        except Exception:
-            return None
-
-    def store_room(self, room_id, room_creator_user_id, is_public):
-        if room_id in self.rooms:
-            raise StoreError(409, "Conflicting room!")
-
-        room = MemoryDataStore.Room(
-            room_id=room_id,
-            is_public=is_public,
-            creator=room_creator_user_id
-        )
-        self.rooms[room_id] = room
-
-    def get_room_member(self, user_id, room_id):
-        return self.members.get(room_id, {}).get(user_id)
-
-    def get_room_members(self, room_id, membership=None):
-        if membership:
-            return [
-                v for k, v in self.members.get(room_id, {}).items()
-                if v.membership == membership
-            ]
-        else:
-            return self.members.get(room_id, {}).values()
-
-    def get_rooms_for_user_where_membership_is(self, user_id, membership_list):
-        return [
-            m[user_id] for m in self.members.values()
-            if user_id in m and m[user_id].membership in membership_list
-        ]
-
-    def get_room_events_stream(self, user_id=None, from_key=None, to_key=None,
-                               limit=0, with_feedback=False):
-        return ([], from_key)  # TODO
-
-    def get_joined_hosts_for_room(self, room_id):
-        return defer.succeed([])
-
-    def persist_event(self, event):
-        if event.type == EventTypes.Member:
-            room_id = event.room_id
-            user = event.state_key
-            self.members.setdefault(room_id, {})[user] = event
-
-        if hasattr(event, "state_key"):
-            key = (event.room_id, event.type, event.state_key)
-            self.current_state[key] = event
-
-        self.events.append(event)
-
-    def get_current_state(self, room_id, event_type=None, state_key=""):
-        if event_type:
-            key = (room_id, event_type, state_key)
-            if self.current_state.get(key):
-                return [self.current_state.get(key)]
-            return None
-        else:
-            return [
-                e for e in self.current_state
-                if e[0] == room_id
-            ]
-
-    def set_presence_state(self, user_localpart, state):
-        return defer.succeed({"state": 0})
-
-    def get_presence_list(self, user_localpart, accepted):
-        return []
-
-    def get_room_events_max_id(self):
-        return "s0"  # TODO (erikj)
-
-    def get_send_event_level(self, room_id):
-        return defer.succeed(0)
-
-    def get_power_level(self, room_id, user_id):
-        return defer.succeed(0)
-
-    def get_add_state_level(self, room_id):
-        return defer.succeed(0)
-
-    def get_room_join_rule(self, room_id):
-        # TODO (erikj): This should be configurable
-        return defer.succeed("invite")
-
-    def get_ops_levels(self, room_id):
-        return defer.succeed((5, 5, 5))
-
-    def insert_client_ip(self, user, access_token, ip, user_agent):
-        return defer.succeed(None)
-
-
 def _format_call(args, kwargs):
     return ", ".join(
         ["%r" % (a) for a in args] +