summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/api/auth.py8
-rw-r--r--synapse/api/ratelimiting.py12
-rw-r--r--synapse/app/_base.py18
-rw-r--r--synapse/app/federation_sender.py30
-rwxr-xr-xsynapse/app/homeserver.py17
-rw-r--r--synapse/config/_base.py13
-rw-r--r--synapse/config/api.py12
-rw-r--r--synapse/config/appservice.py10
-rw-r--r--synapse/config/captcha.py23
-rw-r--r--synapse/config/database.py3
-rw-r--r--synapse/config/groups.py4
-rw-r--r--synapse/config/key.py37
-rw-r--r--synapse/config/metrics.py4
-rw-r--r--synapse/config/password.py15
-rw-r--r--synapse/config/ratelimiting.py95
-rw-r--r--synapse/config/registration.py29
-rw-r--r--synapse/config/repository.py77
-rw-r--r--synapse/config/room_directory.py10
-rw-r--r--synapse/config/saml2_config.py2
-rw-r--r--synapse/config/server.py11
-rw-r--r--synapse/config/tls.py5
-rw-r--r--synapse/config/user_directory.py9
-rw-r--r--synapse/config/voip.py8
-rw-r--r--synapse/config/workers.py28
-rw-r--r--synapse/federation/send_queue.py21
-rw-r--r--synapse/federation/sender/__init__.py465
-rw-r--r--synapse/federation/sender/per_destination_queue.py378
-rw-r--r--synapse/federation/sender/transaction_manager.py147
-rw-r--r--synapse/federation/transaction_queue.py716
-rw-r--r--synapse/federation/transport/client.py2
-rw-r--r--synapse/federation/transport/server.py14
-rw-r--r--synapse/handlers/_base.py1
-rw-r--r--synapse/handlers/auth.py60
-rw-r--r--synapse/handlers/deactivate_account.py1
-rw-r--r--synapse/handlers/directory.py8
-rw-r--r--synapse/handlers/events.py7
-rw-r--r--synapse/handlers/initial_sync.py6
-rw-r--r--synapse/handlers/message.py20
-rw-r--r--synapse/handlers/receipts.py79
-rw-r--r--synapse/handlers/register.py9
-rw-r--r--synapse/handlers/room_list.py14
-rw-r--r--synapse/handlers/room_member.py10
-rw-r--r--synapse/handlers/user_directory.py196
-rw-r--r--synapse/replication/tcp/resource.py18
-rw-r--r--synapse/rest/client/v1/admin.py55
-rw-r--r--synapse/rest/client/v1/login.py10
-rw-r--r--synapse/rest/client/v2_alpha/register.py4
-rw-r--r--synapse/rest/client/v2_alpha/user_directory.py6
-rw-r--r--synapse/server.py17
-rw-r--r--synapse/server.pyi3
-rw-r--r--synapse/storage/_base.py13
-rw-r--r--synapse/storage/background_updates.py8
-rw-r--r--synapse/storage/push_rule.py57
-rw-r--r--synapse/storage/room.py16
-rw-r--r--synapse/storage/schema/delta/53/user_dir_populate.sql30
-rw-r--r--synapse/storage/schema/delta/53/user_share.sql3
-rw-r--r--synapse/storage/schema/delta/53/users_in_public_rooms.sql28
-rw-r--r--synapse/storage/user_directory.py504
-rw-r--r--synapse/types.py12
59 files changed, 2132 insertions, 1286 deletions
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index 5992d30623..ee646a97e8 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -788,9 +788,11 @@ class Auth(object):
 
         # Never fail an auth check for the server notices users or support user
         # This can be a problem where event creation is prohibited due to blocking
-        is_support = yield self.store.is_support_user(user_id)
-        if user_id == self.hs.config.server_notices_mxid or is_support:
-            return
+        if user_id is not None:
+            if user_id == self.hs.config.server_notices_mxid:
+                return
+            if (yield self.store.is_support_user(user_id)):
+                return
 
         if self.hs.config.hs_disabled:
             raise ResourceLimitError(
diff --git a/synapse/api/ratelimiting.py b/synapse/api/ratelimiting.py
index ad68079eeb..296c4a1c17 100644
--- a/synapse/api/ratelimiting.py
+++ b/synapse/api/ratelimiting.py
@@ -14,6 +14,8 @@
 
 import collections
 
+from synapse.api.errors import LimitExceededError
+
 
 class Ratelimiter(object):
     """
@@ -82,3 +84,13 @@ class Ratelimiter(object):
                 break
             else:
                 del self.message_counts[key]
+
+    def ratelimit(self, key, time_now_s, rate_hz, burst_count, update=True):
+        allowed, time_allowed = self.can_do_action(
+            key, time_now_s, rate_hz, burst_count, update
+        )
+
+        if not allowed:
+            raise LimitExceededError(
+                retry_after_ms=int(1000 * (time_allowed - time_now_s)),
+            )
diff --git a/synapse/app/_base.py b/synapse/app/_base.py
index 32e8b8a3f5..d4c6c4c8e2 100644
--- a/synapse/app/_base.py
+++ b/synapse/app/_base.py
@@ -63,12 +63,13 @@ def start_worker_reactor(appname, config):
 
     start_reactor(
         appname,
-        config.soft_file_limit,
-        config.gc_thresholds,
-        config.worker_pid_file,
-        config.worker_daemonize,
-        config.worker_cpu_affinity,
-        logger,
+        soft_file_limit=config.soft_file_limit,
+        gc_thresholds=config.gc_thresholds,
+        pid_file=config.worker_pid_file,
+        daemonize=config.worker_daemonize,
+        cpu_affinity=config.worker_cpu_affinity,
+        print_pidfile=config.print_pidfile,
+        logger=logger,
     )
 
 
@@ -79,6 +80,7 @@ def start_reactor(
         pid_file,
         daemonize,
         cpu_affinity,
+        print_pidfile,
         logger,
 ):
     """ Run the reactor in the main process
@@ -93,6 +95,7 @@ def start_reactor(
         pid_file (str): name of pid file to write to if daemonize is True
         daemonize (bool): true to run the reactor in a background process
         cpu_affinity (int|None): cpu affinity mask
+        print_pidfile (bool): whether to print the pid file, if daemonize is True
         logger (logging.Logger): logger instance to pass to Daemonize
     """
 
@@ -124,6 +127,9 @@ def start_reactor(
             reactor.run()
 
     if daemonize:
+        if print_pidfile:
+            print(pid_file)
+
         daemon = Daemonize(
             app=appname,
             pid=pid_file,
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index a461442fdc..9711a7147c 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -28,6 +28,7 @@ from synapse.config.logger import setup_logging
 from synapse.federation import send_queue
 from synapse.http.site import SynapseSite
 from synapse.metrics import RegistryProxy
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
 from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
 from synapse.replication.slave.storage.devices import SlavedDeviceStore
@@ -37,8 +38,10 @@ from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
 from synapse.replication.slave.storage.transactions import SlavedTransactionStore
 from synapse.replication.tcp.client import ReplicationClientHandler
+from synapse.replication.tcp.streams import ReceiptsStream
 from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
+from synapse.types import ReadReceipt
 from synapse.util.async_helpers import Linearizer
 from synapse.util.httpresourcetree import create_resource_tree
 from synapse.util.logcontext import LoggingContext, run_in_background
@@ -202,6 +205,7 @@ class FederationSenderHandler(object):
     """
     def __init__(self, hs, replication_client):
         self.store = hs.get_datastore()
+        self._is_mine_id = hs.is_mine_id
         self.federation_sender = hs.get_federation_sender()
         self.replication_client = replication_client
 
@@ -234,6 +238,32 @@ class FederationSenderHandler(object):
         elif stream_name == "events":
             self.federation_sender.notify_new_events(token)
 
+        # ... and when new receipts happen
+        elif stream_name == ReceiptsStream.NAME:
+            run_as_background_process(
+                "process_receipts_for_federation", self._on_new_receipts, rows,
+            )
+
+    @defer.inlineCallbacks
+    def _on_new_receipts(self, rows):
+        """
+        Args:
+            rows (iterable[synapse.replication.tcp.streams.ReceiptsStreamRow]):
+                new receipts to be processed
+        """
+        for receipt in rows:
+            # we only want to send on receipts for our own users
+            if not self._is_mine_id(receipt.user_id):
+                continue
+            receipt_info = ReadReceipt(
+                receipt.room_id,
+                receipt.receipt_type,
+                receipt.user_id,
+                [receipt.event_id],
+                receipt.data,
+            )
+            yield self.federation_sender.send_read_receipt(receipt_info)
+
     @defer.inlineCallbacks
     def update_token(self, token):
         try:
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index e8b6cc3114..869c028d1f 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -376,6 +376,7 @@ def setup(config_options):
     logger.info("Database prepared in %s.", config.database_config['name'])
 
     hs.setup()
+    hs.setup_master()
 
     @defer.inlineCallbacks
     def do_acme():
@@ -636,17 +637,15 @@ def run(hs):
         # be quite busy the first few minutes
         clock.call_later(5 * 60, start_phone_stats_home)
 
-    if hs.config.daemonize and hs.config.print_pidfile:
-        print(hs.config.pid_file)
-
     _base.start_reactor(
         "synapse-homeserver",
-        hs.config.soft_file_limit,
-        hs.config.gc_thresholds,
-        hs.config.pid_file,
-        hs.config.daemonize,
-        hs.config.cpu_affinity,
-        logger,
+        soft_file_limit=hs.config.soft_file_limit,
+        gc_thresholds=hs.config.gc_thresholds,
+        pid_file=hs.config.pid_file,
+        daemonize=hs.config.daemonize,
+        cpu_affinity=hs.config.cpu_affinity,
+        print_pidfile=hs.config.print_pidfile,
+        logger=logger,
     )
 
 
diff --git a/synapse/config/_base.py b/synapse/config/_base.py
index c4d3087fa4..a219a83550 100644
--- a/synapse/config/_base.py
+++ b/synapse/config/_base.py
@@ -214,14 +214,20 @@ class Config(object):
             " Defaults to the directory containing the last config file",
         )
 
+        obj = cls()
+
+        obj.invoke_all("add_arguments", config_parser)
+
         config_args = config_parser.parse_args(argv)
 
         config_files = find_config_files(search_paths=config_args.config_path)
 
-        obj = cls()
         obj.read_config_files(
             config_files, keys_directory=config_args.keys_directory, generate_keys=False
         )
+
+        obj.invoke_all("read_arguments", config_args)
+
         return obj
 
     @classmethod
@@ -399,7 +405,10 @@ class Config(object):
             self.invoke_all("generate_files", config)
             return
 
-        self.invoke_all("read_config", config)
+        self.parse_config_dict(config)
+
+    def parse_config_dict(self, config_dict):
+        self.invoke_all("read_config", config_dict)
 
 
 def find_config_files(search_paths):
diff --git a/synapse/config/api.py b/synapse/config/api.py
index e8a753f002..5eb4f86fa2 100644
--- a/synapse/config/api.py
+++ b/synapse/config/api.py
@@ -34,10 +34,10 @@ class ApiConfig(Config):
 
         # A list of event types that will be included in the room_invite_state
         #
-        room_invite_state_types:
-            - "{JoinRules}"
-            - "{CanonicalAlias}"
-            - "{RoomAvatar}"
-            - "{RoomEncryption}"
-            - "{Name}"
+        #room_invite_state_types:
+        #  - "{JoinRules}"
+        #  - "{CanonicalAlias}"
+        #  - "{RoomAvatar}"
+        #  - "{RoomEncryption}"
+        #  - "{Name}"
         """.format(**vars(EventTypes))
diff --git a/synapse/config/appservice.py b/synapse/config/appservice.py
index c260d59464..9e64c76544 100644
--- a/synapse/config/appservice.py
+++ b/synapse/config/appservice.py
@@ -37,14 +37,16 @@ class AppServiceConfig(Config):
 
     def default_config(cls, **kwargs):
         return """\
-        # A list of application service config file to use
+        # A list of application service config files to use
         #
-        app_service_config_files: []
+        #app_service_config_files:
+        #  - app_service_1.yaml
+        #  - app_service_2.yaml
 
-        # Whether or not to track application service IP addresses. Implicitly
+        # Uncomment to enable tracking of application service IP addresses. Implicitly
         # enables MAU tracking for application service users.
         #
-        track_appservice_user_ips: False
+        #track_appservice_user_ips: True
         """
 
 
diff --git a/synapse/config/captcha.py b/synapse/config/captcha.py
index d25196be08..f7eebf26d2 100644
--- a/synapse/config/captcha.py
+++ b/synapse/config/captcha.py
@@ -18,11 +18,16 @@ from ._base import Config
 class CaptchaConfig(Config):
 
     def read_config(self, config):
-        self.recaptcha_private_key = config["recaptcha_private_key"]
-        self.recaptcha_public_key = config["recaptcha_public_key"]
-        self.enable_registration_captcha = config["enable_registration_captcha"]
+        self.recaptcha_private_key = config.get("recaptcha_private_key")
+        self.recaptcha_public_key = config.get("recaptcha_public_key")
+        self.enable_registration_captcha = config.get(
+            "enable_registration_captcha", False
+        )
         self.captcha_bypass_secret = config.get("captcha_bypass_secret")
-        self.recaptcha_siteverify_api = config["recaptcha_siteverify_api"]
+        self.recaptcha_siteverify_api = config.get(
+            "recaptcha_siteverify_api",
+            "https://www.recaptcha.net/recaptcha/api/siteverify",
+        )
 
     def default_config(self, **kwargs):
         return """\
@@ -31,21 +36,23 @@ class CaptchaConfig(Config):
 
         # This Home Server's ReCAPTCHA public key.
         #
-        recaptcha_public_key: "YOUR_PUBLIC_KEY"
+        #recaptcha_public_key: "YOUR_PUBLIC_KEY"
 
         # This Home Server's ReCAPTCHA private key.
         #
-        recaptcha_private_key: "YOUR_PRIVATE_KEY"
+        #recaptcha_private_key: "YOUR_PRIVATE_KEY"
 
         # Enables ReCaptcha checks when registering, preventing signup
         # unless a captcha is answered. Requires a valid ReCaptcha
         # public/private key.
         #
-        enable_registration_captcha: False
+        #enable_registration_captcha: false
 
         # A secret key used to bypass the captcha test entirely.
+        #
         #captcha_bypass_secret: "YOUR_SECRET_HERE"
 
         # The API endpoint to use for verifying m.login.recaptcha responses.
-        recaptcha_siteverify_api: "https://www.recaptcha.net/recaptcha/api/siteverify"
+        #
+        #recaptcha_siteverify_api: "https://www.recaptcha.net/recaptcha/api/siteverify"
         """
diff --git a/synapse/config/database.py b/synapse/config/database.py
index 63e9cb63f8..3c27ed6b4a 100644
--- a/synapse/config/database.py
+++ b/synapse/config/database.py
@@ -60,7 +60,8 @@ class DatabaseConfig(Config):
             database: "%(database_path)s"
 
         # Number of events to cache in memory.
-        event_cache_size: "10K"
+        #
+        #event_cache_size: 10K
         """ % locals()
 
     def read_arguments(self, args):
diff --git a/synapse/config/groups.py b/synapse/config/groups.py
index 46933a904c..e4be172a79 100644
--- a/synapse/config/groups.py
+++ b/synapse/config/groups.py
@@ -23,9 +23,9 @@ class GroupsConfig(Config):
 
     def default_config(self, **kwargs):
         return """\
-        # Whether to allow non server admins to create groups on this server
+        # Uncomment to allow non-server-admin users to create groups on this server
         #
-        enable_group_creation: false
+        #enable_group_creation: true
 
         # If enabled, non server admins can only create groups with local parts
         # starting with this prefix
diff --git a/synapse/config/key.py b/synapse/config/key.py
index 35f05fa974..933928885a 100644
--- a/synapse/config/key.py
+++ b/synapse/config/key.py
@@ -38,15 +38,26 @@ logger = logging.getLogger(__name__)
 class KeyConfig(Config):
 
     def read_config(self, config):
-        self.signing_key = self.read_signing_key(config["signing_key_path"])
+        # the signing key can be specified inline or in a separate file
+        if "signing_key" in config:
+            self.signing_key = read_signing_keys([config["signing_key"]])
+        else:
+            self.signing_key = self.read_signing_key(config["signing_key_path"])
+
         self.old_signing_keys = self.read_old_signing_keys(
             config.get("old_signing_keys", {})
         )
         self.key_refresh_interval = self.parse_duration(
-            config["key_refresh_interval"]
+            config.get("key_refresh_interval", "1d"),
         )
         self.perspectives = self.read_perspectives(
-            config["perspectives"]
+            config.get("perspectives", {}).get("servers", {
+                "matrix.org": {"verify_keys": {
+                    "ed25519:auto": {
+                        "key": "Noi6WqcDj0QmPxCNQqgezwTlBKrfqehY1u2FyWP9uYw",
+                    }
+                }}
+            })
         )
 
         self.macaroon_secret_key = config.get(
@@ -88,7 +99,7 @@ class KeyConfig(Config):
 
         # Used to enable access token expiration.
         #
-        expire_access_token: False
+        #expire_access_token: False
 
         # a secret which is used to calculate HMACs for form values, to stop
         # falsification of values. Must be specified for the User Consent
@@ -117,21 +128,21 @@ class KeyConfig(Config):
         # Determines how quickly servers will query to check which keys
         # are still valid.
         #
-        key_refresh_interval: "1d" # 1 Day.
+        #key_refresh_interval: 1d
 
         # The trusted servers to download signing keys from.
         #
-        perspectives:
-          servers:
-            "matrix.org":
-              verify_keys:
-                "ed25519:auto":
-                  key: "Noi6WqcDj0QmPxCNQqgezwTlBKrfqehY1u2FyWP9uYw"
+        #perspectives:
+        #  servers:
+        #    "matrix.org":
+        #      verify_keys:
+        #        "ed25519:auto":
+        #          key: "Noi6WqcDj0QmPxCNQqgezwTlBKrfqehY1u2FyWP9uYw"
         """ % locals()
 
-    def read_perspectives(self, perspectives_config):
+    def read_perspectives(self, perspectives_servers):
         servers = {}
-        for server_name, server_config in perspectives_config["servers"].items():
+        for server_name, server_config in perspectives_servers.items():
             for key_id, key_data in server_config["verify_keys"].items():
                 if is_signing_algorithm_supported(key_id):
                     key_base64 = key_data["key"]
diff --git a/synapse/config/metrics.py b/synapse/config/metrics.py
index ed0498c634..2de51979d8 100644
--- a/synapse/config/metrics.py
+++ b/synapse/config/metrics.py
@@ -24,7 +24,7 @@ MISSING_SENTRY = (
 
 class MetricsConfig(Config):
     def read_config(self, config):
-        self.enable_metrics = config["enable_metrics"]
+        self.enable_metrics = config.get("enable_metrics", False)
         self.report_stats = config.get("report_stats", None)
         self.metrics_port = config.get("metrics_port")
         self.metrics_bind_host = config.get("metrics_bind_host", "127.0.0.1")
@@ -48,7 +48,7 @@ class MetricsConfig(Config):
 
         # Enable collection and rendering of performance metrics
         #
-        enable_metrics: False
+        #enable_metrics: False
 
         # Enable sentry integration
         # NOTE: While attempts are made to ensure that the logs don't contain
diff --git a/synapse/config/password.py b/synapse/config/password.py
index 2a52b9db54..eea59e772b 100644
--- a/synapse/config/password.py
+++ b/synapse/config/password.py
@@ -22,16 +22,21 @@ class PasswordConfig(Config):
 
     def read_config(self, config):
         password_config = config.get("password_config", {})
+        if password_config is None:
+            password_config = {}
+
         self.password_enabled = password_config.get("enabled", True)
         self.password_pepper = password_config.get("pepper", "")
 
     def default_config(self, config_dir_path, server_name, **kwargs):
-        return """
-        # Enable password for login.
-        #
+        return """\
         password_config:
-           enabled: true
+           # Uncomment to disable password login
+           #
+           #enabled: false
+
            # Uncomment and change to a secret random string for extra security.
            # DO NOT CHANGE THIS AFTER INITIAL SETUP!
-           #pepper: ""
+           #
+           #pepper: "EVEN_MORE_SECRET"
         """
diff --git a/synapse/config/ratelimiting.py b/synapse/config/ratelimiting.py
index 093042fdb9..5a68399e63 100644
--- a/synapse/config/ratelimiting.py
+++ b/synapse/config/ratelimiting.py
@@ -15,23 +15,35 @@
 from ._base import Config
 
 
+class RateLimitConfig(object):
+    def __init__(self, config):
+        self.per_second = config.get("per_second", 0.17)
+        self.burst_count = config.get("burst_count", 3.0)
+
+
 class RatelimitConfig(Config):
 
     def read_config(self, config):
-        self.rc_messages_per_second = config["rc_messages_per_second"]
-        self.rc_message_burst_count = config["rc_message_burst_count"]
+        self.rc_messages_per_second = config.get("rc_messages_per_second", 0.2)
+        self.rc_message_burst_count = config.get("rc_message_burst_count", 10.0)
 
-        self.federation_rc_window_size = config["federation_rc_window_size"]
-        self.federation_rc_sleep_limit = config["federation_rc_sleep_limit"]
-        self.federation_rc_sleep_delay = config["federation_rc_sleep_delay"]
-        self.federation_rc_reject_limit = config["federation_rc_reject_limit"]
-        self.federation_rc_concurrent = config["federation_rc_concurrent"]
+        self.rc_registration = RateLimitConfig(config.get("rc_registration", {}))
 
-        self.rc_registration_requests_per_second = config.get(
-            "rc_registration_requests_per_second", 0.17,
+        rc_login_config = config.get("rc_login", {})
+        self.rc_login_address = RateLimitConfig(rc_login_config.get("address", {}))
+        self.rc_login_account = RateLimitConfig(rc_login_config.get("account", {}))
+        self.rc_login_failed_attempts = RateLimitConfig(
+            rc_login_config.get("failed_attempts", {}),
         )
-        self.rc_registration_request_burst_count = config.get(
-            "rc_registration_request_burst_count", 3,
+
+        self.federation_rc_window_size = config.get("federation_rc_window_size", 1000)
+        self.federation_rc_sleep_limit = config.get("federation_rc_sleep_limit", 10)
+        self.federation_rc_sleep_delay = config.get("federation_rc_sleep_delay", 500)
+        self.federation_rc_reject_limit = config.get("federation_rc_reject_limit", 50)
+        self.federation_rc_concurrent = config.get("federation_rc_concurrent", 3)
+
+        self.federation_rr_transactions_per_room_per_second = config.get(
+            "federation_rr_transactions_per_room_per_second", 50,
         )
 
     def default_config(self, **kwargs):
@@ -40,44 +52,75 @@ class RatelimitConfig(Config):
 
         # Number of messages a client can send per second
         #
-        rc_messages_per_second: 0.2
+        #rc_messages_per_second: 0.2
 
         # Number of message a client can send before being throttled
         #
-        rc_message_burst_count: 10.0
+        #rc_message_burst_count: 10.0
+
+        # Ratelimiting settings for registration and login.
+        #
+        # Each ratelimiting configuration is made of two parameters:
+        #   - per_second: number of requests a client can send per second.
+        #   - burst_count: number of requests a client can send before being throttled.
+        #
+        # Synapse currently uses the following configurations:
+        #   - one for registration that ratelimits registration requests based on the
+        #     client's IP address.
+        #   - one for login that ratelimits login requests based on the client's IP
+        #     address.
+        #   - one for login that ratelimits login requests based on the account the
+        #     client is attempting to log into.
+        #   - one for login that ratelimits login requests based on the account the
+        #     client is attempting to log into, based on the amount of failed login
+        #     attempts for this account.
+        #
+        # The defaults are as shown below.
+        #
+        #rc_registration:
+        #  per_second: 0.17
+        #  burst_count: 3
+        #
+        #rc_login:
+        #  address:
+        #    per_second: 0.17
+        #    burst_count: 3
+        #  account:
+        #    per_second: 0.17
+        #    burst_count: 3
+        #  failed_attempts:
+        #    per_second: 0.17
+        #    burst_count: 3
 
         # The federation window size in milliseconds
         #
-        federation_rc_window_size: 1000
+        #federation_rc_window_size: 1000
 
         # The number of federation requests from a single server in a window
         # before the server will delay processing the request.
         #
-        federation_rc_sleep_limit: 10
+        #federation_rc_sleep_limit: 10
 
         # The duration in milliseconds to delay processing events from
         # remote servers by if they go over the sleep limit.
         #
-        federation_rc_sleep_delay: 500
+        #federation_rc_sleep_delay: 500
 
         # The maximum number of concurrent federation requests allowed
         # from a single server
         #
-        federation_rc_reject_limit: 50
+        #federation_rc_reject_limit: 50
 
         # The number of federation requests to concurrently process from a
         # single server
         #
-        federation_rc_concurrent: 3
+        #federation_rc_concurrent: 3
 
-        # Number of registration requests a client can send per second.
-        # Defaults to 1/minute (0.17).
+        # Target outgoing federation transaction frequency for sending read-receipts,
+        # per-room.
         #
-        #rc_registration_requests_per_second: 0.17
-
-        # Number of registration requests a client can send before being
-        # throttled.
-        # Defaults to 3.
+        # If we end up trying to send out more read-receipts, they will get buffered up
+        # into fewer transactions.
         #
-        #rc_registration_request_burst_count: 3.0
+        #federation_rr_transactions_per_room_per_second: 50
         """
diff --git a/synapse/config/registration.py b/synapse/config/registration.py
index d34dc9e456..f6b2b9ceee 100644
--- a/synapse/config/registration.py
+++ b/synapse/config/registration.py
@@ -24,7 +24,7 @@ class RegistrationConfig(Config):
 
     def read_config(self, config):
         self.enable_registration = bool(
-            strtobool(str(config["enable_registration"]))
+            strtobool(str(config.get("enable_registration", False)))
         )
         if "disable_registration" in config:
             self.enable_registration = not bool(
@@ -36,7 +36,10 @@ class RegistrationConfig(Config):
         self.registration_shared_secret = config.get("registration_shared_secret")
 
         self.bcrypt_rounds = config.get("bcrypt_rounds", 12)
-        self.trusted_third_party_id_servers = config["trusted_third_party_id_servers"]
+        self.trusted_third_party_id_servers = config.get(
+            "trusted_third_party_id_servers",
+            ["matrix.org", "vector.im"],
+        )
         self.default_identity_server = config.get("default_identity_server")
         self.allow_guest_access = config.get("allow_guest_access", False)
 
@@ -64,11 +67,13 @@ class RegistrationConfig(Config):
 
         return """\
         ## Registration ##
+        #
         # Registration can be rate-limited using the parameters in the "Ratelimiting"
         # section of this file.
 
         # Enable registration for new users.
-        enable_registration: False
+        #
+        #enable_registration: false
 
         # The user must provide all of the below types of 3PID when registering.
         #
@@ -79,7 +84,7 @@ class RegistrationConfig(Config):
         # Explicitly disable asking for MSISDNs from the registration
         # flow (overrides registrations_require_3pid if MSISDNs are set as required)
         #
-        #disable_msisdn_registration: True
+        #disable_msisdn_registration: true
 
         # Mandate that users are only allowed to associate certain formats of
         # 3PIDs with accounts on this server.
@@ -92,8 +97,8 @@ class RegistrationConfig(Config):
         #  - medium: msisdn
         #    pattern: '\\+44'
 
-        # If set, allows registration by anyone who also has the shared
-        # secret, even if registration is otherwise disabled.
+        # If set, allows registration of standard or admin accounts by anyone who
+        # has the shared secret, even if registration is otherwise disabled.
         #
         %(registration_shared_secret)s
 
@@ -103,13 +108,13 @@ class RegistrationConfig(Config):
         # N.B. that increasing this will exponentially increase the time required
         # to register or login - e.g. 24 => 2^24 rounds which will take >20 mins.
         #
-        bcrypt_rounds: 12
+        #bcrypt_rounds: 12
 
         # Allows users to register as guests without a password/email/etc, and
         # participate in rooms hosted on this server which have been made
         # accessible to anonymous users.
         #
-        allow_guest_access: False
+        #allow_guest_access: false
 
         # The identity server which we suggest that clients should use when users log
         # in on this server.
@@ -125,9 +130,9 @@ class RegistrationConfig(Config):
         # Also defines the ID server which will be called when an account is
         # deactivated (one will be picked arbitrarily).
         #
-        trusted_third_party_id_servers:
-          - matrix.org
-          - vector.im
+        #trusted_third_party_id_servers:
+        #  - matrix.org
+        #  - vector.im
 
         # Users who register on this homeserver will automatically be joined
         # to these rooms
@@ -141,7 +146,7 @@ class RegistrationConfig(Config):
         # Setting to false means that if the rooms are not manually created,
         # users cannot be auto-joined since they do not exist.
         #
-        autocreate_auto_join_rooms: true
+        #autocreate_auto_join_rooms: true
         """ % locals()
 
     def add_arguments(self, parser):
diff --git a/synapse/config/repository.py b/synapse/config/repository.py
index 97db2a5b7a..3f34ad9b2a 100644
--- a/synapse/config/repository.py
+++ b/synapse/config/repository.py
@@ -19,6 +19,36 @@ from synapse.util.module_loader import load_module
 
 from ._base import Config, ConfigError
 
+DEFAULT_THUMBNAIL_SIZES = [
+    {
+        "width": 32,
+        "height": 32,
+        "method": "crop",
+    }, {
+        "width": 96,
+        "height": 96,
+        "method": "crop",
+    }, {
+        "width": 320,
+        "height": 240,
+        "method": "scale",
+    }, {
+        "width": 640,
+        "height": 480,
+        "method": "scale",
+    }, {
+        "width": 800,
+        "height": 600,
+        "method": "scale"
+    },
+]
+
+THUMBNAIL_SIZE_YAML = """\
+        #  - width: %(width)i
+        #    height: %(height)i
+        #    method: %(method)s
+"""
+
 MISSING_NETADDR = (
     "Missing netaddr library. This is required for URL preview API."
 )
@@ -77,9 +107,9 @@ def parse_thumbnail_requirements(thumbnail_sizes):
 
 class ContentRepositoryConfig(Config):
     def read_config(self, config):
-        self.max_upload_size = self.parse_size(config["max_upload_size"])
-        self.max_image_pixels = self.parse_size(config["max_image_pixels"])
-        self.max_spider_size = self.parse_size(config["max_spider_size"])
+        self.max_upload_size = self.parse_size(config.get("max_upload_size", "10M"))
+        self.max_image_pixels = self.parse_size(config.get("max_image_pixels", "32M"))
+        self.max_spider_size = self.parse_size(config.get("max_spider_size", "10M"))
 
         self.media_store_path = self.ensure_directory(config["media_store_path"])
 
@@ -139,9 +169,9 @@ class ContentRepositoryConfig(Config):
             )
 
         self.uploads_path = self.ensure_directory(config["uploads_path"])
-        self.dynamic_thumbnails = config["dynamic_thumbnails"]
+        self.dynamic_thumbnails = config.get("dynamic_thumbnails", False)
         self.thumbnail_requirements = parse_thumbnail_requirements(
-            config["thumbnail_sizes"]
+            config.get("thumbnail_sizes", DEFAULT_THUMBNAIL_SIZES),
         )
         self.url_preview_enabled = config.get("url_preview_enabled", False)
         if self.url_preview_enabled:
@@ -178,6 +208,13 @@ class ContentRepositoryConfig(Config):
     def default_config(self, data_dir_path, **kwargs):
         media_store = os.path.join(data_dir_path, "media_store")
         uploads_path = os.path.join(data_dir_path, "uploads")
+
+        formatted_thumbnail_sizes = "".join(
+            THUMBNAIL_SIZE_YAML % s for s in DEFAULT_THUMBNAIL_SIZES
+        )
+        # strip final NL
+        formatted_thumbnail_sizes = formatted_thumbnail_sizes[:-1]
+
         return r"""
         # Directory where uploaded images and attachments are stored.
         #
@@ -204,11 +241,11 @@ class ContentRepositoryConfig(Config):
 
         # The largest allowed upload size in bytes
         #
-        max_upload_size: "10M"
+        #max_upload_size: 10M
 
         # Maximum number of pixels that will be thumbnailed
         #
-        max_image_pixels: "32M"
+        #max_image_pixels: 32M
 
         # Whether to generate new thumbnails on the fly to precisely match
         # the resolution requested by the client. If true then whenever
@@ -216,32 +253,18 @@ class ContentRepositoryConfig(Config):
         # generate a new thumbnail. If false the server will pick a thumbnail
         # from a precalculated list.
         #
-        dynamic_thumbnails: false
+        #dynamic_thumbnails: false
 
         # List of thumbnails to precalculate when an image is uploaded.
         #
-        thumbnail_sizes:
-        - width: 32
-          height: 32
-          method: crop
-        - width: 96
-          height: 96
-          method: crop
-        - width: 320
-          height: 240
-          method: scale
-        - width: 640
-          height: 480
-          method: scale
-        - width: 800
-          height: 600
-          method: scale
+        #thumbnail_sizes:
+%(formatted_thumbnail_sizes)s
 
         # Is the preview URL API enabled?  If enabled, you *must* specify
         # an explicit url_preview_ip_range_blacklist of IPs that the spider is
         # denied from accessing.
         #
-        url_preview_enabled: False
+        #url_preview_enabled: false
 
         # List of IP address CIDR ranges that the URL preview spider is denied
         # from accessing.  There are no defaults: you must explicitly
@@ -306,6 +329,6 @@ class ContentRepositoryConfig(Config):
         #  - netloc: '^[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+$'
 
         # The largest allowed URL preview spidering size in bytes
-        max_spider_size: "10M"
-
+        #
+        #max_spider_size: 10M
         """ % locals()
diff --git a/synapse/config/room_directory.py b/synapse/config/room_directory.py
index 9b897abe3c..8a9fded4c5 100644
--- a/synapse/config/room_directory.py
+++ b/synapse/config/room_directory.py
@@ -20,6 +20,10 @@ from ._base import Config, ConfigError
 
 class RoomDirectoryConfig(Config):
     def read_config(self, config):
+        self.enable_room_list_search = config.get(
+            "enable_room_list_search", True,
+        )
+
         alias_creation_rules = config.get("alias_creation_rules")
 
         if alias_creation_rules is not None:
@@ -54,6 +58,12 @@ class RoomDirectoryConfig(Config):
 
     def default_config(self, config_dir_path, server_name, **kwargs):
         return """
+        # Uncomment to disable searching the public room list. When disabled
+        # blocks searching local and remote room lists for local and remote
+        # users by always returning an empty list for all queries.
+        #
+        #enable_room_list_search: false
+
         # The `alias_creation` option controls who's allowed to create aliases
         # on this server.
         #
diff --git a/synapse/config/saml2_config.py b/synapse/config/saml2_config.py
index aff0a1f00c..39b9eb29c2 100644
--- a/synapse/config/saml2_config.py
+++ b/synapse/config/saml2_config.py
@@ -64,7 +64,7 @@ class SAML2Config(Config):
         }
 
     def default_config(self, config_dir_path, server_name, **kwargs):
-        return """
+        return """\
         # Enable SAML2 for registration and login. Uses pysaml2.
         #
         # `sp_config` is the configuration for the pysaml2 Service Provider.
diff --git a/synapse/config/server.py b/synapse/config/server.py
index 35a322fee0..08e4e45482 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -45,7 +45,7 @@ class ServerConfig(Config):
 
         self.pid_file = self.abspath(config.get("pid_file"))
         self.web_client_location = config.get("web_client_location", None)
-        self.soft_file_limit = config["soft_file_limit"]
+        self.soft_file_limit = config.get("soft_file_limit", 0)
         self.daemonize = config.get("daemonize")
         self.print_pidfile = config.get("print_pidfile")
         self.user_agent_suffix = config.get("user_agent_suffix")
@@ -126,6 +126,11 @@ class ServerConfig(Config):
                 self.public_baseurl += '/'
         self.start_pushers = config.get("start_pushers", True)
 
+        # (undocumented) option for torturing the worker-mode replication a bit,
+        # for testing. The value defines the number of milliseconds to pause before
+        # sending out any replication updates.
+        self.replication_torture_level = config.get("replication_torture_level")
+
         self.listeners = []
         for listener in config.get("listeners", []):
             if not isinstance(listener.get("port", None), int):
@@ -307,11 +312,11 @@ class ServerConfig(Config):
         # Zero is used to indicate synapse should set the soft limit to the
         # hard limit.
         #
-        soft_file_limit: 0
+        #soft_file_limit: 0
 
         # Set to false to disable presence tracking on this homeserver.
         #
-        use_presence: true
+        #use_presence: false
 
         # The GC threshold parameters to pass to `gc.set_threshold`, if defined
         #
diff --git a/synapse/config/tls.py b/synapse/config/tls.py
index 40045de7ac..f0014902da 100644
--- a/synapse/config/tls.py
+++ b/synapse/config/tls.py
@@ -181,6 +181,11 @@ class TlsConfig(Config):
         # See 'ACME support' below to enable auto-provisioning this certificate via
         # Let's Encrypt.
         #
+        # If supplying your own, be sure to use a `.pem` file that includes the
+        # full certificate chain including any intermediate certificates (for
+        # instance, if using certbot, use `fullchain.pem` as your certificate,
+        # not `cert.pem`).
+        #
         #tls_certificate_path: "%(tls_certificate_path)s"
 
         # PEM-encoded private key for TLS
diff --git a/synapse/config/user_directory.py b/synapse/config/user_directory.py
index fab3a7d1c8..142754a7dc 100644
--- a/synapse/config/user_directory.py
+++ b/synapse/config/user_directory.py
@@ -22,9 +22,13 @@ class UserDirectoryConfig(Config):
     """
 
     def read_config(self, config):
+        self.user_directory_search_enabled = True
         self.user_directory_search_all_users = False
         user_directory_config = config.get("user_directory", None)
         if user_directory_config:
+            self.user_directory_search_enabled = (
+                user_directory_config.get("enabled", True)
+            )
             self.user_directory_search_all_users = (
                 user_directory_config.get("search_all_users", False)
             )
@@ -33,6 +37,10 @@ class UserDirectoryConfig(Config):
         return """
         # User Directory configuration
         #
+        # 'enabled' defines whether users can search the user directory. If
+        # false then empty responses are returned to all queries. Defaults to
+        # true.
+        #
         # 'search_all_users' defines whether to search all users visible to your HS
         # when searching the user directory, rather than limiting to users visible
         # in public rooms.  Defaults to false.  If you set it True, you'll have to run
@@ -40,5 +48,6 @@ class UserDirectoryConfig(Config):
         # on your database to tell it to rebuild the user_directory search indexes.
         #
         #user_directory:
+        #  enabled: true
         #  search_all_users: false
         """
diff --git a/synapse/config/voip.py b/synapse/config/voip.py
index 257f7c86e7..2a1f005a37 100644
--- a/synapse/config/voip.py
+++ b/synapse/config/voip.py
@@ -22,7 +22,9 @@ class VoipConfig(Config):
         self.turn_shared_secret = config.get("turn_shared_secret")
         self.turn_username = config.get("turn_username")
         self.turn_password = config.get("turn_password")
-        self.turn_user_lifetime = self.parse_duration(config["turn_user_lifetime"])
+        self.turn_user_lifetime = self.parse_duration(
+            config.get("turn_user_lifetime", "1h"),
+        )
         self.turn_allow_guests = config.get("turn_allow_guests", True)
 
     def default_config(self, **kwargs):
@@ -45,7 +47,7 @@ class VoipConfig(Config):
 
         # How long generated TURN credentials last
         #
-        turn_user_lifetime: "1h"
+        #turn_user_lifetime: 1h
 
         # Whether guests should be allowed to use the TURN server.
         # This defaults to True, otherwise VoIP will be unreliable for guests.
@@ -53,5 +55,5 @@ class VoipConfig(Config):
         # connect to arbitrary endpoints without having first signed up for a
         # valid account (e.g. by passing a CAPTCHA).
         #
-        turn_allow_guests: True
+        #turn_allow_guests: True
         """
diff --git a/synapse/config/workers.py b/synapse/config/workers.py
index 80baf0ce0e..bfbd8b6c91 100644
--- a/synapse/config/workers.py
+++ b/synapse/config/workers.py
@@ -28,7 +28,7 @@ class WorkerConfig(Config):
         if self.worker_app == "synapse.app.homeserver":
             self.worker_app = None
 
-        self.worker_listeners = config.get("worker_listeners")
+        self.worker_listeners = config.get("worker_listeners", [])
         self.worker_daemonize = config.get("worker_daemonize")
         self.worker_pid_file = config.get("worker_pid_file")
         self.worker_log_file = config.get("worker_log_file")
@@ -48,6 +48,17 @@ class WorkerConfig(Config):
         self.worker_main_http_uri = config.get("worker_main_http_uri", None)
         self.worker_cpu_affinity = config.get("worker_cpu_affinity")
 
+        # This option is really only here to support `--manhole` command line
+        # argument.
+        manhole = config.get("worker_manhole")
+        if manhole:
+            self.worker_listeners.append({
+                "port": manhole,
+                "bind_addresses": ["127.0.0.1"],
+                "type": "manhole",
+                "tls": False,
+            })
+
         if self.worker_listeners:
             for listener in self.worker_listeners:
                 bind_address = listener.pop("bind_address", None)
@@ -57,3 +68,18 @@ class WorkerConfig(Config):
                     bind_addresses.append(bind_address)
                 elif not bind_addresses:
                     bind_addresses.append('')
+
+    def read_arguments(self, args):
+        # We support a bunch of command line arguments that override options in
+        # the config. A lot of these options have a worker_* prefix when running
+        # on workers so we also have to override them when command line options
+        # are specified.
+
+        if args.daemonize is not None:
+            self.worker_daemonize = args.daemonize
+        if args.log_config is not None:
+            self.worker_log_config = args.log_config
+        if args.log_file is not None:
+            self.worker_log_file = args.log_file
+        if args.manhole is not None:
+            self.worker_manhole = args.worker_manhole
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index b7d0b25781..04d04a4457 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -46,7 +46,7 @@ logger = logging.getLogger(__name__)
 
 
 class FederationRemoteSendQueue(object):
-    """A drop in replacement for TransactionQueue"""
+    """A drop in replacement for FederationSender"""
 
     def __init__(self, hs):
         self.server_name = hs.hostname
@@ -154,13 +154,13 @@ class FederationRemoteSendQueue(object):
                 del self.device_messages[key]
 
     def notify_new_events(self, current_id):
-        """As per TransactionQueue"""
+        """As per FederationSender"""
         # We don't need to replicate this as it gets sent down a different
         # stream.
         pass
 
     def build_and_send_edu(self, destination, edu_type, content, key=None):
-        """As per TransactionQueue"""
+        """As per FederationSender"""
         if destination == self.server_name:
             logger.info("Not sending EDU to ourselves")
             return
@@ -183,8 +183,17 @@ class FederationRemoteSendQueue(object):
 
         self.notifier.on_new_replication_data()
 
+    def send_read_receipt(self, receipt):
+        """As per FederationSender
+
+        Args:
+            receipt (synapse.types.ReadReceipt):
+        """
+        # nothing to do here: the replication listener will handle it.
+        pass
+
     def send_presence(self, states):
-        """As per TransactionQueue
+        """As per FederationSender
 
         Args:
             states (list(UserPresenceState))
@@ -201,7 +210,7 @@ class FederationRemoteSendQueue(object):
         self.notifier.on_new_replication_data()
 
     def send_device_messages(self, destination):
-        """As per TransactionQueue"""
+        """As per FederationSender"""
         pos = self._next_pos()
         self.device_messages[pos] = destination
         self.notifier.on_new_replication_data()
@@ -439,7 +448,7 @@ def process_rows_for_federation(transaction_queue, rows):
     transaction queue ready for sending to the relevant homeservers.
 
     Args:
-        transaction_queue (TransactionQueue)
+        transaction_queue (FederationSender)
         rows (list(synapse.replication.tcp.streams.FederationStreamRow))
     """
 
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
new file mode 100644
index 0000000000..1dc041752b
--- /dev/null
+++ b/synapse/federation/sender/__init__.py
@@ -0,0 +1,465 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from six import itervalues
+
+from prometheus_client import Counter
+
+from twisted.internet import defer
+
+import synapse.metrics
+from synapse.federation.sender.per_destination_queue import PerDestinationQueue
+from synapse.federation.sender.transaction_manager import TransactionManager
+from synapse.federation.units import Edu
+from synapse.handlers.presence import get_interested_remotes
+from synapse.metrics import (
+    LaterGauge,
+    event_processing_loop_counter,
+    event_processing_loop_room_count,
+    events_processed_counter,
+)
+from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.util import logcontext
+from synapse.util.metrics import measure_func
+
+logger = logging.getLogger(__name__)
+
+sent_pdus_destination_dist_count = Counter(
+    "synapse_federation_client_sent_pdu_destinations:count",
+    "Number of PDUs queued for sending to one or more destinations",
+)
+
+sent_pdus_destination_dist_total = Counter(
+    "synapse_federation_client_sent_pdu_destinations:total", ""
+    "Total number of PDUs queued for sending across all destinations",
+)
+
+
+class FederationSender(object):
+    def __init__(self, hs):
+        self.hs = hs
+        self.server_name = hs.hostname
+
+        self.store = hs.get_datastore()
+        self.state = hs.get_state_handler()
+
+        self.clock = hs.get_clock()
+        self.is_mine_id = hs.is_mine_id
+
+        self._transaction_manager = TransactionManager(hs)
+
+        # map from destination to PerDestinationQueue
+        self._per_destination_queues = {}   # type: dict[str, PerDestinationQueue]
+
+        LaterGauge(
+            "synapse_federation_transaction_queue_pending_destinations",
+            "",
+            [],
+            lambda: sum(
+                1 for d in self._per_destination_queues.values()
+                if d.transmission_loop_running
+            ),
+        )
+
+        # Map of user_id -> UserPresenceState for all the pending presence
+        # to be sent out by user_id. Entries here get processed and put in
+        # pending_presence_by_dest
+        self.pending_presence = {}
+
+        LaterGauge(
+            "synapse_federation_transaction_queue_pending_pdus",
+            "",
+            [],
+            lambda: sum(
+                d.pending_pdu_count() for d in self._per_destination_queues.values()
+            ),
+        )
+        LaterGauge(
+            "synapse_federation_transaction_queue_pending_edus",
+            "",
+            [],
+            lambda: sum(
+                d.pending_edu_count() for d in self._per_destination_queues.values()
+            ),
+        )
+
+        self._order = 1
+
+        self._is_processing = False
+        self._last_poked_id = -1
+
+        self._processing_pending_presence = False
+
+        # map from room_id to a set of PerDestinationQueues which we believe are
+        # awaiting a call to flush_read_receipts_for_room. The presence of an entry
+        # here for a given room means that we are rate-limiting RR flushes to that room,
+        # and that there is a pending call to _flush_rrs_for_room in the system.
+        self._queues_awaiting_rr_flush_by_room = {
+        }   # type: dict[str, set[PerDestinationQueue]]
+
+        self._rr_txn_interval_per_room_ms = (
+            1000.0 / hs.get_config().federation_rr_transactions_per_room_per_second
+        )
+
+    def _get_per_destination_queue(self, destination):
+        """Get or create a PerDestinationQueue for the given destination
+
+        Args:
+            destination (str): server_name of remote server
+
+        Returns:
+            PerDestinationQueue
+        """
+        queue = self._per_destination_queues.get(destination)
+        if not queue:
+            queue = PerDestinationQueue(self.hs, self._transaction_manager, destination)
+            self._per_destination_queues[destination] = queue
+        return queue
+
+    def notify_new_events(self, current_id):
+        """This gets called when we have some new events we might want to
+        send out to other servers.
+        """
+        self._last_poked_id = max(current_id, self._last_poked_id)
+
+        if self._is_processing:
+            return
+
+        # fire off a processing loop in the background
+        run_as_background_process(
+            "process_event_queue_for_federation",
+            self._process_event_queue_loop,
+        )
+
+    @defer.inlineCallbacks
+    def _process_event_queue_loop(self):
+        try:
+            self._is_processing = True
+            while True:
+                last_token = yield self.store.get_federation_out_pos("events")
+                next_token, events = yield self.store.get_all_new_events_stream(
+                    last_token, self._last_poked_id, limit=100,
+                )
+
+                logger.debug("Handling %s -> %s", last_token, next_token)
+
+                if not events and next_token >= self._last_poked_id:
+                    break
+
+                @defer.inlineCallbacks
+                def handle_event(event):
+                    # Only send events for this server.
+                    send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of()
+                    is_mine = self.is_mine_id(event.sender)
+                    if not is_mine and send_on_behalf_of is None:
+                        return
+
+                    try:
+                        # Get the state from before the event.
+                        # We need to make sure that this is the state from before
+                        # the event and not from after it.
+                        # Otherwise if the last member on a server in a room is
+                        # banned then it won't receive the event because it won't
+                        # be in the room after the ban.
+                        destinations = yield self.state.get_current_hosts_in_room(
+                            event.room_id, latest_event_ids=event.prev_event_ids(),
+                        )
+                    except Exception:
+                        logger.exception(
+                            "Failed to calculate hosts in room for event: %s",
+                            event.event_id,
+                        )
+                        return
+
+                    destinations = set(destinations)
+
+                    if send_on_behalf_of is not None:
+                        # If we are sending the event on behalf of another server
+                        # then it already has the event and there is no reason to
+                        # send the event to it.
+                        destinations.discard(send_on_behalf_of)
+
+                    logger.debug("Sending %s to %r", event, destinations)
+
+                    self._send_pdu(event, destinations)
+
+                @defer.inlineCallbacks
+                def handle_room_events(events):
+                    for event in events:
+                        yield handle_event(event)
+
+                events_by_room = {}
+                for event in events:
+                    events_by_room.setdefault(event.room_id, []).append(event)
+
+                yield logcontext.make_deferred_yieldable(defer.gatherResults(
+                    [
+                        logcontext.run_in_background(handle_room_events, evs)
+                        for evs in itervalues(events_by_room)
+                    ],
+                    consumeErrors=True
+                ))
+
+                yield self.store.update_federation_out_pos(
+                    "events", next_token
+                )
+
+                if events:
+                    now = self.clock.time_msec()
+                    ts = yield self.store.get_received_ts(events[-1].event_id)
+
+                    synapse.metrics.event_processing_lag.labels(
+                        "federation_sender").set(now - ts)
+                    synapse.metrics.event_processing_last_ts.labels(
+                        "federation_sender").set(ts)
+
+                    events_processed_counter.inc(len(events))
+
+                    event_processing_loop_room_count.labels(
+                        "federation_sender"
+                    ).inc(len(events_by_room))
+
+                event_processing_loop_counter.labels("federation_sender").inc()
+
+                synapse.metrics.event_processing_positions.labels(
+                    "federation_sender").set(next_token)
+
+        finally:
+            self._is_processing = False
+
+    def _send_pdu(self, pdu, destinations):
+        # We loop through all destinations to see whether we already have
+        # a transaction in progress. If we do, stick it in the pending_pdus
+        # table and we'll get back to it later.
+
+        order = self._order
+        self._order += 1
+
+        destinations = set(destinations)
+        destinations.discard(self.server_name)
+        logger.debug("Sending to: %s", str(destinations))
+
+        if not destinations:
+            return
+
+        sent_pdus_destination_dist_total.inc(len(destinations))
+        sent_pdus_destination_dist_count.inc()
+
+        for destination in destinations:
+            self._get_per_destination_queue(destination).send_pdu(pdu, order)
+
+    @defer.inlineCallbacks
+    def send_read_receipt(self, receipt):
+        """Send a RR to any other servers in the room
+
+        Args:
+            receipt (synapse.types.ReadReceipt): receipt to be sent
+        """
+
+        # Some background on the rate-limiting going on here.
+        #
+        # It turns out that if we attempt to send out RRs as soon as we get them from
+        # a client, then we end up trying to do several hundred Hz of federation
+        # transactions. (The number of transactions scales as O(N^2) on the size of a
+        # room, since in a large room we have both more RRs coming in, and more servers
+        # to send them to.)
+        #
+        # This leads to a lot of CPU load, and we end up getting behind. The solution
+        # currently adopted is as follows:
+        #
+        # The first receipt in a given room is sent out immediately, at time T0. Any
+        # further receipts are, in theory, batched up for N seconds, where N is calculated
+        # based on the number of servers in the room to achieve a transaction frequency
+        # of around 50Hz. So, for example, if there were 100 servers in the room, then
+        # N would be 100 / 50Hz = 2 seconds.
+        #
+        # Then, after T+N, we flush out any receipts that have accumulated, and restart
+        # the timer to flush out more receipts at T+2N, etc. If no receipts accumulate,
+        # we stop the cycle and go back to the start.
+        #
+        # However, in practice, it is often possible to flush out receipts earlier: in
+        # particular, if we are sending a transaction to a given server anyway (for
+        # example, because we have a PDU or a RR in another room to send), then we may
+        # as well send out all of the pending RRs for that server. So it may be that
+        # by the time we get to T+N, we don't actually have any RRs left to send out.
+        # Nevertheless we continue to buffer up RRs for the room in question until we
+        # reach the point that no RRs arrive between timer ticks.
+        #
+        # For even more background, see https://github.com/matrix-org/synapse/issues/4730.
+
+        room_id = receipt.room_id
+
+        # Work out which remote servers should be poked and poke them.
+        domains = yield self.state.get_current_hosts_in_room(room_id)
+        domains = [d for d in domains if d != self.server_name]
+        if not domains:
+            return
+
+        queues_pending_flush = self._queues_awaiting_rr_flush_by_room.get(
+            room_id
+        )
+
+        # if there is no flush yet scheduled, we will send out these receipts with
+        # immediate flushes, and schedule the next flush for this room.
+        if queues_pending_flush is not None:
+            logger.debug("Queuing receipt for: %r", domains)
+        else:
+            logger.debug("Sending receipt to: %r", domains)
+            self._schedule_rr_flush_for_room(room_id, len(domains))
+
+        for domain in domains:
+            queue = self._get_per_destination_queue(domain)
+            queue.queue_read_receipt(receipt)
+
+            # if there is already a RR flush pending for this room, then make sure this
+            # destination is registered for the flush
+            if queues_pending_flush is not None:
+                queues_pending_flush.add(queue)
+            else:
+                queue.flush_read_receipts_for_room(room_id)
+
+    def _schedule_rr_flush_for_room(self, room_id, n_domains):
+        # that is going to cause approximately len(domains) transactions, so now back
+        # off for that multiplied by RR_TXN_INTERVAL_PER_ROOM
+        backoff_ms = self._rr_txn_interval_per_room_ms * n_domains
+
+        logger.debug("Scheduling RR flush in %s in %d ms", room_id, backoff_ms)
+        self.clock.call_later(backoff_ms, self._flush_rrs_for_room, room_id)
+        self._queues_awaiting_rr_flush_by_room[room_id] = set()
+
+    def _flush_rrs_for_room(self, room_id):
+        queues = self._queues_awaiting_rr_flush_by_room.pop(room_id)
+        logger.debug("Flushing RRs in %s to %s", room_id, queues)
+
+        if not queues:
+            # no more RRs arrived for this room; we are done.
+            return
+
+        # schedule the next flush
+        self._schedule_rr_flush_for_room(room_id, len(queues))
+
+        for queue in queues:
+            queue.flush_read_receipts_for_room(room_id)
+
+    @logcontext.preserve_fn  # the caller should not yield on this
+    @defer.inlineCallbacks
+    def send_presence(self, states):
+        """Send the new presence states to the appropriate destinations.
+
+        This actually queues up the presence states ready for sending and
+        triggers a background task to process them and send out the transactions.
+
+        Args:
+            states (list(UserPresenceState))
+        """
+        if not self.hs.config.use_presence:
+            # No-op if presence is disabled.
+            return
+
+        # First we queue up the new presence by user ID, so multiple presence
+        # updates in quick successtion are correctly handled
+        # We only want to send presence for our own users, so lets always just
+        # filter here just in case.
+        self.pending_presence.update({
+            state.user_id: state for state in states
+            if self.is_mine_id(state.user_id)
+        })
+
+        # We then handle the new pending presence in batches, first figuring
+        # out the destinations we need to send each state to and then poking it
+        # to attempt a new transaction. We linearize this so that we don't
+        # accidentally mess up the ordering and send multiple presence updates
+        # in the wrong order
+        if self._processing_pending_presence:
+            return
+
+        self._processing_pending_presence = True
+        try:
+            while True:
+                states_map = self.pending_presence
+                self.pending_presence = {}
+
+                if not states_map:
+                    break
+
+                yield self._process_presence_inner(list(states_map.values()))
+        except Exception:
+            logger.exception("Error sending presence states to servers")
+        finally:
+            self._processing_pending_presence = False
+
+    @measure_func("txnqueue._process_presence")
+    @defer.inlineCallbacks
+    def _process_presence_inner(self, states):
+        """Given a list of states populate self.pending_presence_by_dest and
+        poke to send a new transaction to each destination
+
+        Args:
+            states (list(UserPresenceState))
+        """
+        hosts_and_states = yield get_interested_remotes(self.store, states, self.state)
+
+        for destinations, states in hosts_and_states:
+            for destination in destinations:
+                if destination == self.server_name:
+                    continue
+                self._get_per_destination_queue(destination).send_presence(states)
+
+    def build_and_send_edu(self, destination, edu_type, content, key=None):
+        """Construct an Edu object, and queue it for sending
+
+        Args:
+            destination (str): name of server to send to
+            edu_type (str): type of EDU to send
+            content (dict): content of EDU
+            key (Any|None): clobbering key for this edu
+        """
+        if destination == self.server_name:
+            logger.info("Not sending EDU to ourselves")
+            return
+
+        edu = Edu(
+            origin=self.server_name,
+            destination=destination,
+            edu_type=edu_type,
+            content=content,
+        )
+
+        self.send_edu(edu, key)
+
+    def send_edu(self, edu, key):
+        """Queue an EDU for sending
+
+        Args:
+            edu (Edu): edu to send
+            key (Any|None): clobbering key for this edu
+        """
+        queue = self._get_per_destination_queue(edu.destination)
+        if key:
+            queue.send_keyed_edu(edu, key)
+        else:
+            queue.send_edu(edu)
+
+    def send_device_messages(self, destination):
+        if destination == self.server_name:
+            logger.info("Not sending device update to ourselves")
+            return
+
+        self._get_per_destination_queue(destination).attempt_new_transaction()
+
+    def get_current_token(self):
+        return 0
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
new file mode 100644
index 0000000000..be99211003
--- /dev/null
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -0,0 +1,378 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2019 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import datetime
+import logging
+
+from prometheus_client import Counter
+
+from twisted.internet import defer
+
+from synapse.api.errors import (
+    FederationDeniedError,
+    HttpResponseException,
+    RequestSendFailed,
+)
+from synapse.events import EventBase
+from synapse.federation.units import Edu
+from synapse.handlers.presence import format_user_presence_state
+from synapse.metrics import sent_transactions_counter
+from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.storage import UserPresenceState
+from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
+
+logger = logging.getLogger(__name__)
+
+
+sent_edus_counter = Counter(
+    "synapse_federation_client_sent_edus",
+    "Total number of EDUs successfully sent",
+)
+
+sent_edus_by_type = Counter(
+    "synapse_federation_client_sent_edus_by_type",
+    "Number of sent EDUs successfully sent, by event type",
+    ["type"],
+)
+
+
+class PerDestinationQueue(object):
+    """
+    Manages the per-destination transmission queues.
+
+    Args:
+        hs (synapse.HomeServer):
+        transaction_sender (TransactionManager):
+        destination (str): the server_name of the destination that we are managing
+            transmission for.
+    """
+    def __init__(self, hs, transaction_manager, destination):
+        self._server_name = hs.hostname
+        self._clock = hs.get_clock()
+        self._store = hs.get_datastore()
+        self._transaction_manager = transaction_manager
+
+        self._destination = destination
+        self.transmission_loop_running = False
+
+        # a list of tuples of (pending pdu, order)
+        self._pending_pdus = []    # type: list[tuple[EventBase, int]]
+        self._pending_edus = []    # type: list[Edu]
+
+        # Pending EDUs by their "key". Keyed EDUs are EDUs that get clobbered
+        # based on their key (e.g. typing events by room_id)
+        # Map of (edu_type, key) -> Edu
+        self._pending_edus_keyed = {}   # type: dict[tuple[str, str], Edu]
+
+        # Map of user_id -> UserPresenceState of pending presence to be sent to this
+        # destination
+        self._pending_presence = {}   # type: dict[str, UserPresenceState]
+
+        # room_id -> receipt_type -> user_id -> receipt_dict
+        self._pending_rrs = {}
+        self._rrs_pending_flush = False
+
+        # stream_id of last successfully sent to-device message.
+        # NB: may be a long or an int.
+        self._last_device_stream_id = 0
+
+        # stream_id of last successfully sent device list update.
+        self._last_device_list_stream_id = 0
+
+    def __str__(self):
+        return "PerDestinationQueue[%s]" % self._destination
+
+    def pending_pdu_count(self):
+        return len(self._pending_pdus)
+
+    def pending_edu_count(self):
+        return (
+            len(self._pending_edus)
+            + len(self._pending_presence)
+            + len(self._pending_edus_keyed)
+        )
+
+    def send_pdu(self, pdu, order):
+        """Add a PDU to the queue, and start the transmission loop if neccessary
+
+        Args:
+            pdu (EventBase): pdu to send
+            order (int):
+        """
+        self._pending_pdus.append((pdu, order))
+        self.attempt_new_transaction()
+
+    def send_presence(self, states):
+        """Add presence updates to the queue. Start the transmission loop if neccessary.
+
+        Args:
+            states (iterable[UserPresenceState]): presence to send
+        """
+        self._pending_presence.update({
+            state.user_id: state for state in states
+        })
+        self.attempt_new_transaction()
+
+    def queue_read_receipt(self, receipt):
+        """Add a RR to the list to be sent. Doesn't start the transmission loop yet
+        (see flush_read_receipts_for_room)
+
+        Args:
+            receipt (synapse.api.receipt_info.ReceiptInfo): receipt to be queued
+        """
+        self._pending_rrs.setdefault(
+            receipt.room_id, {},
+        ).setdefault(
+            receipt.receipt_type, {}
+        )[receipt.user_id] = {
+            "event_ids": receipt.event_ids,
+            "data": receipt.data,
+        }
+
+    def flush_read_receipts_for_room(self, room_id):
+        # if we don't have any read-receipts for this room, it may be that we've already
+        # sent them out, so we don't need to flush.
+        if room_id not in self._pending_rrs:
+            return
+        self._rrs_pending_flush = True
+        self.attempt_new_transaction()
+
+    def send_keyed_edu(self, edu, key):
+        self._pending_edus_keyed[(edu.edu_type, key)] = edu
+        self.attempt_new_transaction()
+
+    def send_edu(self, edu):
+        self._pending_edus.append(edu)
+        self.attempt_new_transaction()
+
+    def attempt_new_transaction(self):
+        """Try to start a new transaction to this destination
+
+        If there is already a transaction in progress to this destination,
+        returns immediately. Otherwise kicks off the process of sending a
+        transaction in the background.
+        """
+        # list of (pending_pdu, deferred, order)
+        if self.transmission_loop_running:
+            # XXX: this can get stuck on by a never-ending
+            # request at which point pending_pdus just keeps growing.
+            # we need application-layer timeouts of some flavour of these
+            # requests
+            logger.debug(
+                "TX [%s] Transaction already in progress",
+                self._destination
+            )
+            return
+
+        logger.debug("TX [%s] Starting transaction loop", self._destination)
+
+        run_as_background_process(
+            "federation_transaction_transmission_loop",
+            self._transaction_transmission_loop,
+        )
+
+    @defer.inlineCallbacks
+    def _transaction_transmission_loop(self):
+        pending_pdus = []
+        try:
+            self.transmission_loop_running = True
+
+            # This will throw if we wouldn't retry. We do this here so we fail
+            # quickly, but we will later check this again in the http client,
+            # hence why we throw the result away.
+            yield get_retry_limiter(self._destination, self._clock, self._store)
+
+            pending_pdus = []
+            while True:
+                device_message_edus, device_stream_id, dev_list_id = (
+                    yield self._get_new_device_messages()
+                )
+
+                # BEGIN CRITICAL SECTION
+                #
+                # In order to avoid a race condition, we need to make sure that
+                # the following code (from popping the queues up to the point
+                # where we decide if we actually have any pending messages) is
+                # atomic - otherwise new PDUs or EDUs might arrive in the
+                # meantime, but not get sent because we hold the
+                # transmission_loop_running flag.
+
+                pending_pdus = self._pending_pdus
+
+                # We can only include at most 50 PDUs per transactions
+                pending_pdus, self._pending_pdus = pending_pdus[:50], pending_pdus[50:]
+
+                pending_edus = []
+
+                pending_edus.extend(self._get_rr_edus(force_flush=False))
+
+                # We can only include at most 100 EDUs per transactions
+                pending_edus.extend(self._pop_pending_edus(100 - len(pending_edus)))
+
+                pending_edus.extend(
+                    self._pending_edus_keyed.values()
+                )
+
+                self._pending_edus_keyed = {}
+
+                pending_edus.extend(device_message_edus)
+
+                pending_presence = self._pending_presence
+                self._pending_presence = {}
+                if pending_presence:
+                    pending_edus.append(
+                        Edu(
+                            origin=self._server_name,
+                            destination=self._destination,
+                            edu_type="m.presence",
+                            content={
+                                "push": [
+                                    format_user_presence_state(
+                                        presence, self._clock.time_msec()
+                                    )
+                                    for presence in pending_presence.values()
+                                ]
+                            },
+                        )
+                    )
+
+                if pending_pdus:
+                    logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d",
+                                 self._destination, len(pending_pdus))
+
+                if not pending_pdus and not pending_edus:
+                    logger.debug("TX [%s] Nothing to send", self._destination)
+                    self._last_device_stream_id = device_stream_id
+                    return
+
+                # if we've decided to send a transaction anyway, and we have room, we
+                # may as well send any pending RRs
+                if len(pending_edus) < 100:
+                    pending_edus.extend(self._get_rr_edus(force_flush=True))
+
+                # END CRITICAL SECTION
+
+                success = yield self._transaction_manager.send_new_transaction(
+                    self._destination, pending_pdus, pending_edus
+                )
+                if success:
+                    sent_transactions_counter.inc()
+                    sent_edus_counter.inc(len(pending_edus))
+                    for edu in pending_edus:
+                        sent_edus_by_type.labels(edu.edu_type).inc()
+                    # Remove the acknowledged device messages from the database
+                    # Only bother if we actually sent some device messages
+                    if device_message_edus:
+                        yield self._store.delete_device_msgs_for_remote(
+                            self._destination, device_stream_id
+                        )
+                        logger.info(
+                            "Marking as sent %r %r", self._destination, dev_list_id
+                        )
+                        yield self._store.mark_as_sent_devices_by_remote(
+                            self._destination, dev_list_id
+                        )
+
+                    self._last_device_stream_id = device_stream_id
+                    self._last_device_list_stream_id = dev_list_id
+                else:
+                    break
+        except NotRetryingDestination as e:
+            logger.debug(
+                "TX [%s] not ready for retry yet (next retry at %s) - "
+                "dropping transaction for now",
+                self._destination,
+                datetime.datetime.fromtimestamp(
+                    (e.retry_last_ts + e.retry_interval) / 1000.0
+                ),
+            )
+        except FederationDeniedError as e:
+            logger.info(e)
+        except HttpResponseException as e:
+            logger.warning(
+                "TX [%s] Received %d response to transaction: %s",
+                self._destination, e.code, e,
+            )
+        except RequestSendFailed as e:
+            logger.warning("TX [%s] Failed to send transaction: %s", self._destination, e)
+
+            for p, _ in pending_pdus:
+                logger.info("Failed to send event %s to %s", p.event_id,
+                            self._destination)
+        except Exception:
+            logger.exception(
+                "TX [%s] Failed to send transaction",
+                self._destination,
+            )
+            for p, _ in pending_pdus:
+                logger.info("Failed to send event %s to %s", p.event_id,
+                            self._destination)
+        finally:
+            # We want to be *very* sure we clear this after we stop processing
+            self.transmission_loop_running = False
+
+    def _get_rr_edus(self, force_flush):
+        if not self._pending_rrs:
+            return
+        if not force_flush and not self._rrs_pending_flush:
+            # not yet time for this lot
+            return
+
+        edu = Edu(
+            origin=self._server_name,
+            destination=self._destination,
+            edu_type="m.receipt",
+            content=self._pending_rrs,
+        )
+        self._pending_rrs = {}
+        self._rrs_pending_flush = False
+        yield edu
+
+    def _pop_pending_edus(self, limit):
+        pending_edus = self._pending_edus
+        pending_edus, self._pending_edus = pending_edus[:limit], pending_edus[limit:]
+        return pending_edus
+
+    @defer.inlineCallbacks
+    def _get_new_device_messages(self):
+        last_device_stream_id = self._last_device_stream_id
+        to_device_stream_id = self._store.get_to_device_stream_token()
+        contents, stream_id = yield self._store.get_new_device_msgs_for_remote(
+            self._destination, last_device_stream_id, to_device_stream_id
+        )
+        edus = [
+            Edu(
+                origin=self._server_name,
+                destination=self._destination,
+                edu_type="m.direct_to_device",
+                content=content,
+            )
+            for content in contents
+        ]
+
+        last_device_list = self._last_device_list_stream_id
+        now_stream_id, results = yield self._store.get_devices_by_remote(
+            self._destination, last_device_list
+        )
+        edus.extend(
+            Edu(
+                origin=self._server_name,
+                destination=self._destination,
+                edu_type="m.device_list_update",
+                content=content,
+            )
+            for content in results
+        )
+        defer.returnValue((edus, stream_id, now_stream_id))
diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py
new file mode 100644
index 0000000000..35e6b8ff5b
--- /dev/null
+++ b/synapse/federation/sender/transaction_manager.py
@@ -0,0 +1,147 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+
+from twisted.internet import defer
+
+from synapse.api.errors import HttpResponseException
+from synapse.federation.persistence import TransactionActions
+from synapse.federation.units import Transaction
+from synapse.util.metrics import measure_func
+
+logger = logging.getLogger(__name__)
+
+
+class TransactionManager(object):
+    """Helper class which handles building and sending transactions
+
+    shared between PerDestinationQueue objects
+    """
+    def __init__(self, hs):
+        self._server_name = hs.hostname
+        self.clock = hs.get_clock()   # nb must be called this for @measure_func
+        self._store = hs.get_datastore()
+        self._transaction_actions = TransactionActions(self._store)
+        self._transport_layer = hs.get_federation_transport_client()
+
+        # HACK to get unique tx id
+        self._next_txn_id = int(self.clock.time_msec())
+
+    @measure_func("_send_new_transaction")
+    @defer.inlineCallbacks
+    def send_new_transaction(self, destination, pending_pdus, pending_edus):
+
+        # Sort based on the order field
+        pending_pdus.sort(key=lambda t: t[1])
+        pdus = [x[0] for x in pending_pdus]
+        edus = pending_edus
+
+        success = True
+
+        logger.debug("TX [%s] _attempt_new_transaction", destination)
+
+        txn_id = str(self._next_txn_id)
+
+        logger.debug(
+            "TX [%s] {%s} Attempting new transaction"
+            " (pdus: %d, edus: %d)",
+            destination, txn_id,
+            len(pdus),
+            len(edus),
+        )
+
+        logger.debug("TX [%s] Persisting transaction...", destination)
+
+        transaction = Transaction.create_new(
+            origin_server_ts=int(self.clock.time_msec()),
+            transaction_id=txn_id,
+            origin=self._server_name,
+            destination=destination,
+            pdus=pdus,
+            edus=edus,
+        )
+
+        self._next_txn_id += 1
+
+        yield self._transaction_actions.prepare_to_send(transaction)
+
+        logger.debug("TX [%s] Persisted transaction", destination)
+        logger.info(
+            "TX [%s] {%s} Sending transaction [%s],"
+            " (PDUs: %d, EDUs: %d)",
+            destination, txn_id,
+            transaction.transaction_id,
+            len(pdus),
+            len(edus),
+        )
+
+        # Actually send the transaction
+
+        # FIXME (erikj): This is a bit of a hack to make the Pdu age
+        # keys work
+        def json_data_cb():
+            data = transaction.get_dict()
+            now = int(self.clock.time_msec())
+            if "pdus" in data:
+                for p in data["pdus"]:
+                    if "age_ts" in p:
+                        unsigned = p.setdefault("unsigned", {})
+                        unsigned["age"] = now - int(p["age_ts"])
+                        del p["age_ts"]
+            return data
+
+        try:
+            response = yield self._transport_layer.send_transaction(
+                transaction, json_data_cb
+            )
+            code = 200
+        except HttpResponseException as e:
+            code = e.code
+            response = e.response
+
+            if e.code in (401, 404, 429) or 500 <= e.code:
+                logger.info(
+                    "TX [%s] {%s} got %d response",
+                    destination, txn_id, code
+                )
+                raise e
+
+        logger.info(
+            "TX [%s] {%s} got %d response",
+            destination, txn_id, code
+        )
+
+        yield self._transaction_actions.delivered(
+            transaction, code, response
+        )
+
+        logger.debug("TX [%s] {%s} Marked as delivered", destination, txn_id)
+
+        if code == 200:
+            for e_id, r in response.get("pdus", {}).items():
+                if "error" in r:
+                    logger.warn(
+                        "TX [%s] {%s} Remote returned error for %s: %s",
+                        destination, txn_id, e_id, r,
+                    )
+        else:
+            for p in pdus:
+                logger.warn(
+                    "TX [%s] {%s} Failed to send event %s",
+                    destination, txn_id, p.event_id,
+                )
+            success = False
+
+        defer.returnValue(success)
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
deleted file mode 100644
index e5e42c647d..0000000000
--- a/synapse/federation/transaction_queue.py
+++ /dev/null
@@ -1,716 +0,0 @@
-# -*- coding: utf-8 -*-
-# Copyright 2014-2016 OpenMarket Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-import datetime
-import logging
-
-from six import itervalues
-
-from prometheus_client import Counter
-
-from twisted.internet import defer
-
-import synapse.metrics
-from synapse.api.errors import (
-    FederationDeniedError,
-    HttpResponseException,
-    RequestSendFailed,
-)
-from synapse.handlers.presence import format_user_presence_state, get_interested_remotes
-from synapse.metrics import (
-    LaterGauge,
-    event_processing_loop_counter,
-    event_processing_loop_room_count,
-    events_processed_counter,
-    sent_transactions_counter,
-)
-from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.util import logcontext
-from synapse.util.metrics import measure_func
-from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
-
-from .persistence import TransactionActions
-from .units import Edu, Transaction
-
-logger = logging.getLogger(__name__)
-
-sent_pdus_destination_dist_count = Counter(
-    "synapse_federation_client_sent_pdu_destinations:count",
-    "Number of PDUs queued for sending to one or more destinations",
-)
-
-sent_pdus_destination_dist_total = Counter(
-    "synapse_federation_client_sent_pdu_destinations:total", ""
-    "Total number of PDUs queued for sending across all destinations",
-)
-
-sent_edus_counter = Counter(
-    "synapse_federation_client_sent_edus",
-    "Total number of EDUs successfully sent",
-)
-
-sent_edus_by_type = Counter(
-    "synapse_federation_client_sent_edus_by_type",
-    "Number of sent EDUs successfully sent, by event type",
-    ["type"],
-)
-
-
-class TransactionQueue(object):
-    """This class makes sure we only have one transaction in flight at
-    a time for a given destination.
-
-    It batches pending PDUs into single transactions.
-    """
-
-    def __init__(self, hs):
-        self.hs = hs
-        self.server_name = hs.hostname
-
-        self.store = hs.get_datastore()
-        self.state = hs.get_state_handler()
-        self.transaction_actions = TransactionActions(self.store)
-
-        self.transport_layer = hs.get_federation_transport_client()
-
-        self.clock = hs.get_clock()
-        self.is_mine_id = hs.is_mine_id
-
-        # Is a mapping from destinations -> deferreds. Used to keep track
-        # of which destinations have transactions in flight and when they are
-        # done
-        self.pending_transactions = {}
-
-        LaterGauge(
-            "synapse_federation_transaction_queue_pending_destinations",
-            "",
-            [],
-            lambda: len(self.pending_transactions),
-        )
-
-        # Is a mapping from destination -> list of
-        # tuple(pending pdus, deferred, order)
-        self.pending_pdus_by_dest = pdus = {}
-        # destination -> list of tuple(edu, deferred)
-        self.pending_edus_by_dest = edus = {}
-
-        # Map of user_id -> UserPresenceState for all the pending presence
-        # to be sent out by user_id. Entries here get processed and put in
-        # pending_presence_by_dest
-        self.pending_presence = {}
-
-        # Map of destination -> user_id -> UserPresenceState of pending presence
-        # to be sent to each destinations
-        self.pending_presence_by_dest = presence = {}
-
-        # Pending EDUs by their "key". Keyed EDUs are EDUs that get clobbered
-        # based on their key (e.g. typing events by room_id)
-        # Map of destination -> (edu_type, key) -> Edu
-        self.pending_edus_keyed_by_dest = edus_keyed = {}
-
-        LaterGauge(
-            "synapse_federation_transaction_queue_pending_pdus",
-            "",
-            [],
-            lambda: sum(map(len, pdus.values())),
-        )
-        LaterGauge(
-            "synapse_federation_transaction_queue_pending_edus",
-            "",
-            [],
-            lambda: (
-                sum(map(len, edus.values()))
-                + sum(map(len, presence.values()))
-                + sum(map(len, edus_keyed.values()))
-            ),
-        )
-
-        # destination -> stream_id of last successfully sent to-device message.
-        # NB: may be a long or an int.
-        self.last_device_stream_id_by_dest = {}
-
-        # destination -> stream_id of last successfully sent device list
-        # update.
-        self.last_device_list_stream_id_by_dest = {}
-
-        # HACK to get unique tx id
-        self._next_txn_id = int(self.clock.time_msec())
-
-        self._order = 1
-
-        self._is_processing = False
-        self._last_poked_id = -1
-
-        self._processing_pending_presence = False
-
-    def notify_new_events(self, current_id):
-        """This gets called when we have some new events we might want to
-        send out to other servers.
-        """
-        self._last_poked_id = max(current_id, self._last_poked_id)
-
-        if self._is_processing:
-            return
-
-        # fire off a processing loop in the background
-        run_as_background_process(
-            "process_event_queue_for_federation",
-            self._process_event_queue_loop,
-        )
-
-    @defer.inlineCallbacks
-    def _process_event_queue_loop(self):
-        try:
-            self._is_processing = True
-            while True:
-                last_token = yield self.store.get_federation_out_pos("events")
-                next_token, events = yield self.store.get_all_new_events_stream(
-                    last_token, self._last_poked_id, limit=100,
-                )
-
-                logger.debug("Handling %s -> %s", last_token, next_token)
-
-                if not events and next_token >= self._last_poked_id:
-                    break
-
-                @defer.inlineCallbacks
-                def handle_event(event):
-                    # Only send events for this server.
-                    send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of()
-                    is_mine = self.is_mine_id(event.sender)
-                    if not is_mine and send_on_behalf_of is None:
-                        return
-
-                    try:
-                        # Get the state from before the event.
-                        # We need to make sure that this is the state from before
-                        # the event and not from after it.
-                        # Otherwise if the last member on a server in a room is
-                        # banned then it won't receive the event because it won't
-                        # be in the room after the ban.
-                        destinations = yield self.state.get_current_hosts_in_room(
-                            event.room_id, latest_event_ids=event.prev_event_ids(),
-                        )
-                    except Exception:
-                        logger.exception(
-                            "Failed to calculate hosts in room for event: %s",
-                            event.event_id,
-                        )
-                        return
-
-                    destinations = set(destinations)
-
-                    if send_on_behalf_of is not None:
-                        # If we are sending the event on behalf of another server
-                        # then it already has the event and there is no reason to
-                        # send the event to it.
-                        destinations.discard(send_on_behalf_of)
-
-                    logger.debug("Sending %s to %r", event, destinations)
-
-                    self._send_pdu(event, destinations)
-
-                @defer.inlineCallbacks
-                def handle_room_events(events):
-                    for event in events:
-                        yield handle_event(event)
-
-                events_by_room = {}
-                for event in events:
-                    events_by_room.setdefault(event.room_id, []).append(event)
-
-                yield logcontext.make_deferred_yieldable(defer.gatherResults(
-                    [
-                        logcontext.run_in_background(handle_room_events, evs)
-                        for evs in itervalues(events_by_room)
-                    ],
-                    consumeErrors=True
-                ))
-
-                yield self.store.update_federation_out_pos(
-                    "events", next_token
-                )
-
-                if events:
-                    now = self.clock.time_msec()
-                    ts = yield self.store.get_received_ts(events[-1].event_id)
-
-                    synapse.metrics.event_processing_lag.labels(
-                        "federation_sender").set(now - ts)
-                    synapse.metrics.event_processing_last_ts.labels(
-                        "federation_sender").set(ts)
-
-                    events_processed_counter.inc(len(events))
-
-                    event_processing_loop_room_count.labels(
-                        "federation_sender"
-                    ).inc(len(events_by_room))
-
-                event_processing_loop_counter.labels("federation_sender").inc()
-
-                synapse.metrics.event_processing_positions.labels(
-                    "federation_sender").set(next_token)
-
-        finally:
-            self._is_processing = False
-
-    def _send_pdu(self, pdu, destinations):
-        # We loop through all destinations to see whether we already have
-        # a transaction in progress. If we do, stick it in the pending_pdus
-        # table and we'll get back to it later.
-
-        order = self._order
-        self._order += 1
-
-        destinations = set(destinations)
-        destinations.discard(self.server_name)
-        logger.debug("Sending to: %s", str(destinations))
-
-        if not destinations:
-            return
-
-        sent_pdus_destination_dist_total.inc(len(destinations))
-        sent_pdus_destination_dist_count.inc()
-
-        for destination in destinations:
-            self.pending_pdus_by_dest.setdefault(destination, []).append(
-                (pdu, order)
-            )
-
-            self._attempt_new_transaction(destination)
-
-    @logcontext.preserve_fn  # the caller should not yield on this
-    @defer.inlineCallbacks
-    def send_presence(self, states):
-        """Send the new presence states to the appropriate destinations.
-
-        This actually queues up the presence states ready for sending and
-        triggers a background task to process them and send out the transactions.
-
-        Args:
-            states (list(UserPresenceState))
-        """
-        if not self.hs.config.use_presence:
-            # No-op if presence is disabled.
-            return
-
-        # First we queue up the new presence by user ID, so multiple presence
-        # updates in quick successtion are correctly handled
-        # We only want to send presence for our own users, so lets always just
-        # filter here just in case.
-        self.pending_presence.update({
-            state.user_id: state for state in states
-            if self.is_mine_id(state.user_id)
-        })
-
-        # We then handle the new pending presence in batches, first figuring
-        # out the destinations we need to send each state to and then poking it
-        # to attempt a new transaction. We linearize this so that we don't
-        # accidentally mess up the ordering and send multiple presence updates
-        # in the wrong order
-        if self._processing_pending_presence:
-            return
-
-        self._processing_pending_presence = True
-        try:
-            while True:
-                states_map = self.pending_presence
-                self.pending_presence = {}
-
-                if not states_map:
-                    break
-
-                yield self._process_presence_inner(list(states_map.values()))
-        except Exception:
-            logger.exception("Error sending presence states to servers")
-        finally:
-            self._processing_pending_presence = False
-
-    @measure_func("txnqueue._process_presence")
-    @defer.inlineCallbacks
-    def _process_presence_inner(self, states):
-        """Given a list of states populate self.pending_presence_by_dest and
-        poke to send a new transaction to each destination
-
-        Args:
-            states (list(UserPresenceState))
-        """
-        hosts_and_states = yield get_interested_remotes(self.store, states, self.state)
-
-        for destinations, states in hosts_and_states:
-            for destination in destinations:
-                if destination == self.server_name:
-                    continue
-
-                self.pending_presence_by_dest.setdefault(
-                    destination, {}
-                ).update({
-                    state.user_id: state for state in states
-                })
-
-                self._attempt_new_transaction(destination)
-
-    def build_and_send_edu(self, destination, edu_type, content, key=None):
-        """Construct an Edu object, and queue it for sending
-
-        Args:
-            destination (str): name of server to send to
-            edu_type (str): type of EDU to send
-            content (dict): content of EDU
-            key (Any|None): clobbering key for this edu
-        """
-        if destination == self.server_name:
-            logger.info("Not sending EDU to ourselves")
-            return
-
-        edu = Edu(
-            origin=self.server_name,
-            destination=destination,
-            edu_type=edu_type,
-            content=content,
-        )
-
-        self.send_edu(edu, key)
-
-    def send_edu(self, edu, key):
-        """Queue an EDU for sending
-
-        Args:
-            edu (Edu): edu to send
-            key (Any|None): clobbering key for this edu
-        """
-        if key:
-            self.pending_edus_keyed_by_dest.setdefault(
-                edu.destination, {}
-            )[(edu.edu_type, key)] = edu
-        else:
-            self.pending_edus_by_dest.setdefault(edu.destination, []).append(edu)
-
-        self._attempt_new_transaction(edu.destination)
-
-    def send_device_messages(self, destination):
-        if destination == self.server_name:
-            logger.info("Not sending device update to ourselves")
-            return
-
-        self._attempt_new_transaction(destination)
-
-    def get_current_token(self):
-        return 0
-
-    def _attempt_new_transaction(self, destination):
-        """Try to start a new transaction to this destination
-
-        If there is already a transaction in progress to this destination,
-        returns immediately. Otherwise kicks off the process of sending a
-        transaction in the background.
-
-        Args:
-            destination (str):
-
-        Returns:
-            None
-        """
-        # list of (pending_pdu, deferred, order)
-        if destination in self.pending_transactions:
-            # XXX: pending_transactions can get stuck on by a never-ending
-            # request at which point pending_pdus_by_dest just keeps growing.
-            # we need application-layer timeouts of some flavour of these
-            # requests
-            logger.debug(
-                "TX [%s] Transaction already in progress",
-                destination
-            )
-            return
-
-        logger.debug("TX [%s] Starting transaction loop", destination)
-
-        run_as_background_process(
-            "federation_transaction_transmission_loop",
-            self._transaction_transmission_loop,
-            destination,
-        )
-
-    @defer.inlineCallbacks
-    def _transaction_transmission_loop(self, destination):
-        pending_pdus = []
-        try:
-            self.pending_transactions[destination] = 1
-
-            # This will throw if we wouldn't retry. We do this here so we fail
-            # quickly, but we will later check this again in the http client,
-            # hence why we throw the result away.
-            yield get_retry_limiter(destination, self.clock, self.store)
-
-            pending_pdus = []
-            while True:
-                device_message_edus, device_stream_id, dev_list_id = (
-                    yield self._get_new_device_messages(destination)
-                )
-
-                # BEGIN CRITICAL SECTION
-                #
-                # In order to avoid a race condition, we need to make sure that
-                # the following code (from popping the queues up to the point
-                # where we decide if we actually have any pending messages) is
-                # atomic - otherwise new PDUs or EDUs might arrive in the
-                # meantime, but not get sent because we hold the
-                # pending_transactions flag.
-
-                pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
-
-                # We can only include at most 50 PDUs per transactions
-                pending_pdus, leftover_pdus = pending_pdus[:50], pending_pdus[50:]
-                if leftover_pdus:
-                    self.pending_pdus_by_dest[destination] = leftover_pdus
-
-                pending_edus = self.pending_edus_by_dest.pop(destination, [])
-
-                # We can only include at most 100 EDUs per transactions
-                pending_edus, leftover_edus = pending_edus[:100], pending_edus[100:]
-                if leftover_edus:
-                    self.pending_edus_by_dest[destination] = leftover_edus
-
-                pending_presence = self.pending_presence_by_dest.pop(destination, {})
-
-                pending_edus.extend(
-                    self.pending_edus_keyed_by_dest.pop(destination, {}).values()
-                )
-
-                pending_edus.extend(device_message_edus)
-                if pending_presence:
-                    pending_edus.append(
-                        Edu(
-                            origin=self.server_name,
-                            destination=destination,
-                            edu_type="m.presence",
-                            content={
-                                "push": [
-                                    format_user_presence_state(
-                                        presence, self.clock.time_msec()
-                                    )
-                                    for presence in pending_presence.values()
-                                ]
-                            },
-                        )
-                    )
-
-                if pending_pdus:
-                    logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d",
-                                 destination, len(pending_pdus))
-
-                if not pending_pdus and not pending_edus:
-                    logger.debug("TX [%s] Nothing to send", destination)
-                    self.last_device_stream_id_by_dest[destination] = (
-                        device_stream_id
-                    )
-                    return
-
-                # END CRITICAL SECTION
-
-                success = yield self._send_new_transaction(
-                    destination, pending_pdus, pending_edus,
-                )
-                if success:
-                    sent_transactions_counter.inc()
-                    sent_edus_counter.inc(len(pending_edus))
-                    for edu in pending_edus:
-                        sent_edus_by_type.labels(edu.edu_type).inc()
-                    # Remove the acknowledged device messages from the database
-                    # Only bother if we actually sent some device messages
-                    if device_message_edus:
-                        yield self.store.delete_device_msgs_for_remote(
-                            destination, device_stream_id
-                        )
-                        logger.info("Marking as sent %r %r", destination, dev_list_id)
-                        yield self.store.mark_as_sent_devices_by_remote(
-                            destination, dev_list_id
-                        )
-
-                    self.last_device_stream_id_by_dest[destination] = device_stream_id
-                    self.last_device_list_stream_id_by_dest[destination] = dev_list_id
-                else:
-                    break
-        except NotRetryingDestination as e:
-            logger.debug(
-                "TX [%s] not ready for retry yet (next retry at %s) - "
-                "dropping transaction for now",
-                destination,
-                datetime.datetime.fromtimestamp(
-                    (e.retry_last_ts + e.retry_interval) / 1000.0
-                ),
-            )
-        except FederationDeniedError as e:
-            logger.info(e)
-        except HttpResponseException as e:
-            logger.warning(
-                "TX [%s] Received %d response to transaction: %s",
-                destination, e.code, e,
-            )
-        except RequestSendFailed as e:
-            logger.warning("TX [%s] Failed to send transaction: %s", destination, e)
-
-            for p, _ in pending_pdus:
-                logger.info("Failed to send event %s to %s", p.event_id,
-                            destination)
-        except Exception:
-            logger.exception(
-                "TX [%s] Failed to send transaction",
-                destination,
-            )
-            for p, _ in pending_pdus:
-                logger.info("Failed to send event %s to %s", p.event_id,
-                            destination)
-        finally:
-            # We want to be *very* sure we delete this after we stop processing
-            self.pending_transactions.pop(destination, None)
-
-    @defer.inlineCallbacks
-    def _get_new_device_messages(self, destination):
-        last_device_stream_id = self.last_device_stream_id_by_dest.get(destination, 0)
-        to_device_stream_id = self.store.get_to_device_stream_token()
-        contents, stream_id = yield self.store.get_new_device_msgs_for_remote(
-            destination, last_device_stream_id, to_device_stream_id
-        )
-        edus = [
-            Edu(
-                origin=self.server_name,
-                destination=destination,
-                edu_type="m.direct_to_device",
-                content=content,
-            )
-            for content in contents
-        ]
-
-        last_device_list = self.last_device_list_stream_id_by_dest.get(destination, 0)
-        now_stream_id, results = yield self.store.get_devices_by_remote(
-            destination, last_device_list
-        )
-        edus.extend(
-            Edu(
-                origin=self.server_name,
-                destination=destination,
-                edu_type="m.device_list_update",
-                content=content,
-            )
-            for content in results
-        )
-        defer.returnValue((edus, stream_id, now_stream_id))
-
-    @measure_func("_send_new_transaction")
-    @defer.inlineCallbacks
-    def _send_new_transaction(self, destination, pending_pdus, pending_edus):
-
-        # Sort based on the order field
-        pending_pdus.sort(key=lambda t: t[1])
-        pdus = [x[0] for x in pending_pdus]
-        edus = pending_edus
-
-        success = True
-
-        logger.debug("TX [%s] _attempt_new_transaction", destination)
-
-        txn_id = str(self._next_txn_id)
-
-        logger.debug(
-            "TX [%s] {%s} Attempting new transaction"
-            " (pdus: %d, edus: %d)",
-            destination, txn_id,
-            len(pdus),
-            len(edus),
-        )
-
-        logger.debug("TX [%s] Persisting transaction...", destination)
-
-        transaction = Transaction.create_new(
-            origin_server_ts=int(self.clock.time_msec()),
-            transaction_id=txn_id,
-            origin=self.server_name,
-            destination=destination,
-            pdus=pdus,
-            edus=edus,
-        )
-
-        self._next_txn_id += 1
-
-        yield self.transaction_actions.prepare_to_send(transaction)
-
-        logger.debug("TX [%s] Persisted transaction", destination)
-        logger.info(
-            "TX [%s] {%s} Sending transaction [%s],"
-            " (PDUs: %d, EDUs: %d)",
-            destination, txn_id,
-            transaction.transaction_id,
-            len(pdus),
-            len(edus),
-        )
-
-        # Actually send the transaction
-
-        # FIXME (erikj): This is a bit of a hack to make the Pdu age
-        # keys work
-        def json_data_cb():
-            data = transaction.get_dict()
-            now = int(self.clock.time_msec())
-            if "pdus" in data:
-                for p in data["pdus"]:
-                    if "age_ts" in p:
-                        unsigned = p.setdefault("unsigned", {})
-                        unsigned["age"] = now - int(p["age_ts"])
-                        del p["age_ts"]
-            return data
-
-        try:
-            response = yield self.transport_layer.send_transaction(
-                transaction, json_data_cb
-            )
-            code = 200
-        except HttpResponseException as e:
-            code = e.code
-            response = e.response
-
-            if e.code in (401, 404, 429) or 500 <= e.code:
-                logger.info(
-                    "TX [%s] {%s} got %d response",
-                    destination, txn_id, code
-                )
-                raise e
-
-        logger.info(
-            "TX [%s] {%s} got %d response",
-            destination, txn_id, code
-        )
-
-        yield self.transaction_actions.delivered(
-            transaction, code, response
-        )
-
-        logger.debug("TX [%s] {%s} Marked as delivered", destination, txn_id)
-
-        if code == 200:
-            for e_id, r in response.get("pdus", {}).items():
-                if "error" in r:
-                    logger.warn(
-                        "TX [%s] {%s} Remote returned error for %s: %s",
-                        destination, txn_id, e_id, r,
-                    )
-        else:
-            for p in pdus:
-                logger.warn(
-                    "TX [%s] {%s} Failed to send event %s",
-                    destination, txn_id, p.event_id,
-                )
-            success = False
-
-        defer.returnValue(success)
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index e424c40fdf..0cdb31178f 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -173,7 +173,7 @@ class TransportLayerClient(object):
         # generated by the json_data_callback.
         json_data = transaction.get_dict()
 
-        path = _create_v1_path("/send/%s", transaction.transaction_id)
+        path = _create_v1_path("/send/%s/", transaction.transaction_id)
 
         response = yield self.client.put_json(
             transaction.destination,
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index efb6bdca48..96d680a5ad 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -312,7 +312,7 @@ class BaseFederationServlet(object):
 
 
 class FederationSendServlet(BaseFederationServlet):
-    PATH = "/send/(?P<transaction_id>[^/]*)/?"
+    PATH = "/send/(?P<transaction_id>[^/]*)/"
 
     def __init__(self, handler, server_name, **kwargs):
         super(FederationSendServlet, self).__init__(
@@ -378,7 +378,7 @@ class FederationSendServlet(BaseFederationServlet):
 
 
 class FederationEventServlet(BaseFederationServlet):
-    PATH = "/event/(?P<event_id>[^/]*)/?"
+    PATH = "/event/(?P<event_id>[^/]*)/"
 
     # This is when someone asks for a data item for a given server data_id pair.
     def on_GET(self, origin, content, query, event_id):
@@ -386,7 +386,7 @@ class FederationEventServlet(BaseFederationServlet):
 
 
 class FederationStateServlet(BaseFederationServlet):
-    PATH = "/state/(?P<context>[^/]*)/?"
+    PATH = "/state/(?P<context>[^/]*)/"
 
     # This is when someone asks for all data for a given context.
     def on_GET(self, origin, content, query, context):
@@ -398,7 +398,7 @@ class FederationStateServlet(BaseFederationServlet):
 
 
 class FederationStateIdsServlet(BaseFederationServlet):
-    PATH = "/state_ids/(?P<room_id>[^/]*)/?"
+    PATH = "/state_ids/(?P<room_id>[^/]*)/"
 
     def on_GET(self, origin, content, query, room_id):
         return self.handler.on_state_ids_request(
@@ -409,7 +409,7 @@ class FederationStateIdsServlet(BaseFederationServlet):
 
 
 class FederationBackfillServlet(BaseFederationServlet):
-    PATH = "/backfill/(?P<context>[^/]*)/?"
+    PATH = "/backfill/(?P<context>[^/]*)/"
 
     def on_GET(self, origin, content, query, context):
         versions = [x.decode('ascii') for x in query[b"v"]]
@@ -1080,7 +1080,7 @@ class FederationGroupsCategoriesServlet(BaseFederationServlet):
     """Get all categories for a group
     """
     PATH = (
-        "/groups/(?P<group_id>[^/]*)/categories/?"
+        "/groups/(?P<group_id>[^/]*)/categories/"
     )
 
     @defer.inlineCallbacks
@@ -1150,7 +1150,7 @@ class FederationGroupsRolesServlet(BaseFederationServlet):
     """Get roles in a group
     """
     PATH = (
-        "/groups/(?P<group_id>[^/]*)/roles/?"
+        "/groups/(?P<group_id>[^/]*)/roles/"
     )
 
     @defer.inlineCallbacks
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index d8d86d6ff3..ac09d03ba9 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -165,6 +165,7 @@ class BaseHandler(object):
                     member_event.room_id,
                     "leave",
                     ratelimit=False,
+                    require_consent=False,
                 )
             except Exception as e:
                 logger.exception("Error kicking guest user: %s" % (e,))
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 2abd9af94f..caad9ae2dd 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -35,6 +35,7 @@ from synapse.api.errors import (
     StoreError,
     SynapseError,
 )
+from synapse.api.ratelimiting import Ratelimiter
 from synapse.module_api import ModuleApi
 from synapse.types import UserID
 from synapse.util import logcontext
@@ -99,6 +100,11 @@ class AuthHandler(BaseHandler):
                         login_types.append(t)
         self._supported_login_types = login_types
 
+        self._account_ratelimiter = Ratelimiter()
+        self._failed_attempts_ratelimiter = Ratelimiter()
+
+        self._clock = self.hs.get_clock()
+
     @defer.inlineCallbacks
     def validate_user_via_ui_auth(self, requester, request_body, clientip):
         """
@@ -568,7 +574,12 @@ class AuthHandler(BaseHandler):
         Returns:
             defer.Deferred: (unicode) canonical_user_id, or None if zero or
             multiple matches
+
+        Raises:
+            LimitExceededError if the ratelimiter's login requests count for this
+                user is too high too proceed.
         """
+        self.ratelimit_login_per_account(user_id)
         res = yield self._find_user_id_and_pwd_hash(user_id)
         if res is not None:
             defer.returnValue(res[0])
@@ -634,6 +645,8 @@ class AuthHandler(BaseHandler):
             StoreError if there was a problem accessing the database
             SynapseError if there was a problem with the request
             LoginError if there was an authentication problem.
+            LimitExceededError if the ratelimiter's login requests count for this
+                user is too high too proceed.
         """
 
         if username.startswith('@'):
@@ -643,6 +656,8 @@ class AuthHandler(BaseHandler):
                 username, self.hs.hostname
             ).to_string()
 
+        self.ratelimit_login_per_account(qualified_user_id)
+
         login_type = login_submission.get("type")
         known_login_type = False
 
@@ -715,9 +730,16 @@ class AuthHandler(BaseHandler):
         if not known_login_type:
             raise SynapseError(400, "Unknown login type %s" % login_type)
 
-        # unknown username or invalid password. We raise a 403 here, but note
-        # that if we're doing user-interactive login, it turns all LoginErrors
-        # into a 401 anyway.
+        # unknown username or invalid password.
+        self._failed_attempts_ratelimiter.ratelimit(
+            qualified_user_id.lower(), time_now_s=self._clock.time(),
+            rate_hz=self.hs.config.rc_login_failed_attempts.per_second,
+            burst_count=self.hs.config.rc_login_failed_attempts.burst_count,
+            update=True,
+        )
+
+        # We raise a 403 here, but note that if we're doing user-interactive
+        # login, it turns all LoginErrors into a 401 anyway.
         raise LoginError(
             403, "Invalid password",
             errcode=Codes.FORBIDDEN
@@ -735,6 +757,10 @@ class AuthHandler(BaseHandler):
             password (unicode): the provided password
         Returns:
             (unicode) the canonical_user_id, or None if unknown user / bad password
+
+        Raises:
+            LimitExceededError if the ratelimiter's login requests count for this
+                user is too high too proceed.
         """
         lookupres = yield self._find_user_id_and_pwd_hash(user_id)
         if not lookupres:
@@ -763,6 +789,7 @@ class AuthHandler(BaseHandler):
             auth_api.validate_macaroon(macaroon, "login", True, user_id)
         except Exception:
             raise AuthError(403, "Invalid token", errcode=Codes.FORBIDDEN)
+        self.ratelimit_login_per_account(user_id)
         yield self.auth.check_auth_blocking(user_id)
         defer.returnValue(user_id)
 
@@ -934,6 +961,33 @@ class AuthHandler(BaseHandler):
         else:
             return defer.succeed(False)
 
+    def ratelimit_login_per_account(self, user_id):
+        """Checks whether the process must be stopped because of ratelimiting.
+
+        Checks against two ratelimiters: the generic one for login attempts per
+        account and the one specific to failed attempts.
+
+        Args:
+            user_id (unicode): complete @user:id
+
+        Raises:
+            LimitExceededError if one of the ratelimiters' login requests count
+                for this user is too high too proceed.
+        """
+        self._failed_attempts_ratelimiter.ratelimit(
+            user_id.lower(), time_now_s=self._clock.time(),
+            rate_hz=self.hs.config.rc_login_failed_attempts.per_second,
+            burst_count=self.hs.config.rc_login_failed_attempts.burst_count,
+            update=False,
+        )
+
+        self._account_ratelimiter.ratelimit(
+            user_id.lower(), time_now_s=self._clock.time(),
+            rate_hz=self.hs.config.rc_login_account.per_second,
+            burst_count=self.hs.config.rc_login_account.burst_count,
+            update=True,
+        )
+
 
 @attr.s
 class MacaroonGenerator(object):
diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py
index 75fe50c42c..97d3f31d98 100644
--- a/synapse/handlers/deactivate_account.py
+++ b/synapse/handlers/deactivate_account.py
@@ -164,6 +164,7 @@ class DeactivateAccountHandler(BaseHandler):
                     room_id,
                     "leave",
                     ratelimit=False,
+                    require_consent=False,
                 )
             except Exception:
                 logger.exception(
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 8b113307d2..fe128d9c88 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -44,6 +44,7 @@ class DirectoryHandler(BaseHandler):
         self.appservice_handler = hs.get_application_service_handler()
         self.event_creation_handler = hs.get_event_creation_handler()
         self.config = hs.config
+        self.enable_room_list_search = hs.config.enable_room_list_search
 
         self.federation = hs.get_federation_client()
         hs.get_federation_registry().register_query_handler(
@@ -411,6 +412,13 @@ class DirectoryHandler(BaseHandler):
         if visibility not in ["public", "private"]:
             raise SynapseError(400, "Invalid visibility setting")
 
+        if visibility == "public" and not self.enable_room_list_search:
+            # The room list has been disabled.
+            raise AuthError(
+                403,
+                "This user is not permitted to publish rooms to the room list"
+            )
+
         room = yield self.store.get_room(room_id)
         if room is None:
             raise SynapseError(400, "Unknown room")
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index f772e62c28..d883e98381 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -19,7 +19,7 @@ import random
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, Membership
-from synapse.api.errors import AuthError
+from synapse.api.errors import AuthError, SynapseError
 from synapse.events import EventBase
 from synapse.events.utils import serialize_event
 from synapse.types import UserID
@@ -61,6 +61,11 @@ class EventStreamHandler(BaseHandler):
         If `only_keys` is not None, events from keys will be sent down.
         """
 
+        if room_id:
+            blocked = yield self.store.is_room_blocked(room_id)
+            if blocked:
+                raise SynapseError(403, "This room has been blocked on this server")
+
         # send any outstanding server notices to the user.
         yield self._server_notices_sender.on_user_syncing(auth_user_id)
 
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index 563bb3cea3..7dfae78db0 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -18,7 +18,7 @@ import logging
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, Membership
-from synapse.api.errors import AuthError, Codes
+from synapse.api.errors import AuthError, Codes, SynapseError
 from synapse.events.utils import serialize_event
 from synapse.events.validator import EventValidator
 from synapse.handlers.presence import format_user_presence_state
@@ -262,6 +262,10 @@ class InitialSyncHandler(BaseHandler):
             A JSON serialisable dict with the snapshot of the room.
         """
 
+        blocked = yield self.store.is_room_blocked(room_id)
+        if blocked:
+            raise SynapseError(403, "This room has been blocked on this server")
+
         user_id = requester.user.to_string()
 
         membership, member_event_id = yield self._check_in_room_or_world_readable(
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index c762b58902..9b41c7b205 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -243,12 +243,19 @@ class EventCreationHandler(object):
 
         self.spam_checker = hs.get_spam_checker()
 
-        if self.config.block_events_without_consent_error is not None:
+        self._block_events_without_consent_error = (
+            self.config.block_events_without_consent_error
+        )
+
+        # we need to construct a ConsentURIBuilder here, as it checks that the necessary
+        # config options, but *only* if we have a configuration for which we are
+        # going to need it.
+        if self._block_events_without_consent_error:
             self._consent_uri_builder = ConsentURIBuilder(self.config)
 
     @defer.inlineCallbacks
     def create_event(self, requester, event_dict, token_id=None, txn_id=None,
-                     prev_events_and_hashes=None):
+                     prev_events_and_hashes=None, require_consent=True):
         """
         Given a dict from a client, create a new event.
 
@@ -269,6 +276,9 @@ class EventCreationHandler(object):
                 where *hashes* is a map from algorithm to hash.
 
                 If None, they will be requested from the database.
+
+            require_consent (bool): Whether to check if the requester has
+                consented to privacy policy.
         Raises:
             ResourceLimitError if server is blocked to some resource being
             exceeded
@@ -310,7 +320,7 @@ class EventCreationHandler(object):
                     )
 
         is_exempt = yield self._is_exempt_from_privacy_policy(builder, requester)
-        if not is_exempt:
+        if require_consent and not is_exempt:
             yield self.assert_accepted_privacy_policy(requester)
 
         if token_id is not None:
@@ -378,7 +388,7 @@ class EventCreationHandler(object):
         Raises:
             ConsentNotGivenError: if the user has not given consent yet
         """
-        if self.config.block_events_without_consent_error is None:
+        if self._block_events_without_consent_error is None:
             return
 
         # exempt AS users from needing consent
@@ -405,7 +415,7 @@ class EventCreationHandler(object):
         consent_uri = self._consent_uri_builder.build_user_consent_uri(
             requester.user.localpart,
         )
-        msg = self.config.block_events_without_consent_error % {
+        msg = self._block_events_without_consent_error % {
             'consent_uri': consent_uri,
         }
         raise ConsentNotGivenError(
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 1728089667..274d2946ad 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -16,9 +16,8 @@ import logging
 
 from twisted.internet import defer
 
-from synapse.types import get_domain_from_id
-
-from ._base import BaseHandler
+from synapse.handlers._base import BaseHandler
+from synapse.types import ReadReceipt
 
 logger = logging.getLogger(__name__)
 
@@ -42,13 +41,13 @@ class ReceiptsHandler(BaseHandler):
         """Called when we receive an EDU of type m.receipt from a remote HS.
         """
         receipts = [
-            {
-                "room_id": room_id,
-                "receipt_type": receipt_type,
-                "user_id": user_id,
-                "event_ids": user_values["event_ids"],
-                "data": user_values.get("data", {}),
-            }
+            ReadReceipt(
+                room_id=room_id,
+                receipt_type=receipt_type,
+                user_id=user_id,
+                event_ids=user_values["event_ids"],
+                data=user_values.get("data", {}),
+            )
             for room_id, room_values in content.items()
             for receipt_type, users in room_values.items()
             for user_id, user_values in users.items()
@@ -64,14 +63,12 @@ class ReceiptsHandler(BaseHandler):
         max_batch_id = None
 
         for receipt in receipts:
-            room_id = receipt["room_id"]
-            receipt_type = receipt["receipt_type"]
-            user_id = receipt["user_id"]
-            event_ids = receipt["event_ids"]
-            data = receipt["data"]
-
             res = yield self.store.insert_receipt(
-                room_id, receipt_type, user_id, event_ids, data
+                receipt.room_id,
+                receipt.receipt_type,
+                receipt.user_id,
+                receipt.event_ids,
+                receipt.data,
             )
 
             if not res:
@@ -89,7 +86,7 @@ class ReceiptsHandler(BaseHandler):
             # no new receipts
             defer.returnValue(False)
 
-        affected_room_ids = list(set([r["room_id"] for r in receipts]))
+        affected_room_ids = list(set([r.room_id for r in receipts]))
 
         self.notifier.on_new_event(
             "receipt_key", max_batch_id, rooms=affected_room_ids
@@ -107,49 +104,21 @@ class ReceiptsHandler(BaseHandler):
         """Called when a client tells us a local user has read up to the given
         event_id in the room.
         """
-        receipt = {
-            "room_id": room_id,
-            "receipt_type": receipt_type,
-            "user_id": user_id,
-            "event_ids": [event_id],
-            "data": {
+        receipt = ReadReceipt(
+            room_id=room_id,
+            receipt_type=receipt_type,
+            user_id=user_id,
+            event_ids=[event_id],
+            data={
                 "ts": int(self.clock.time_msec()),
-            }
-        }
+            },
+        )
 
         is_new = yield self._handle_new_receipts([receipt])
         if not is_new:
             return
 
-        # Work out which remote servers should be poked and poke them.
-
-        # TODO: optimise this to move some of the work to the workers.
-        data = receipt["data"]
-
-        # XXX why does this not use state.get_current_hosts_in_room() ?
-        users = yield self.state.get_current_user_in_room(room_id)
-        remotedomains = set(get_domain_from_id(u) for u in users)
-        remotedomains = remotedomains.copy()
-        remotedomains.discard(self.server_name)
-
-        logger.debug("Sending receipt to: %r", remotedomains)
-
-        for domain in remotedomains:
-            self.federation.build_and_send_edu(
-                destination=domain,
-                edu_type="m.receipt",
-                content={
-                    room_id: {
-                        receipt_type: {
-                            user_id: {
-                                "event_ids": [event_id],
-                                "data": data,
-                            }
-                        }
-                    },
-                },
-                key=(room_id, receipt_type, user_id),
-            )
+        yield self.federation.send_read_receipt(receipt)
 
     @defer.inlineCallbacks
     def get_receipts_for_room(self, room_id, to_key):
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 03130edc54..68f73d3793 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -23,6 +23,7 @@ from synapse.api.constants import LoginType
 from synapse.api.errors import (
     AuthError,
     Codes,
+    ConsentNotGivenError,
     InvalidCaptchaError,
     LimitExceededError,
     RegistrationError,
@@ -311,6 +312,10 @@ class RegistrationHandler(BaseHandler):
                         )
                 else:
                     yield self._join_user_to_room(fake_requester, r)
+            except ConsentNotGivenError as e:
+                # Technically not necessary to pull out this error though
+                # moving away from bare excepts is a good thing to do.
+                logger.error("Failed to join new user to %r: %r", r, e)
             except Exception as e:
                 logger.error("Failed to join new user to %r: %r", r, e)
 
@@ -629,8 +634,8 @@ class RegistrationHandler(BaseHandler):
 
             allowed, time_allowed = self.ratelimiter.can_do_action(
                 address, time_now_s=time_now,
-                rate_hz=self.hs.config.rc_registration_requests_per_second,
-                burst_count=self.hs.config.rc_registration_request_burst_count,
+                rate_hz=self.hs.config.rc_registration.per_second,
+                burst_count=self.hs.config.rc_registration.burst_count,
             )
 
             if not allowed:
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index afa508d729..d6c9d56007 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -44,6 +44,7 @@ EMPTY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None)
 class RoomListHandler(BaseHandler):
     def __init__(self, hs):
         super(RoomListHandler, self).__init__(hs)
+        self.enable_room_list_search = hs.config.enable_room_list_search
         self.response_cache = ResponseCache(hs, "room_list")
         self.remote_response_cache = ResponseCache(hs, "remote_room_list",
                                                    timeout_ms=30 * 1000)
@@ -66,10 +67,17 @@ class RoomListHandler(BaseHandler):
                 appservice and network id to use an appservice specific one.
                 Setting to None returns all public rooms across all lists.
         """
+        if not self.enable_room_list_search:
+            return defer.succeed({
+                "chunk": [],
+                "total_room_count_estimate": 0,
+            })
+
         logger.info(
             "Getting public room list: limit=%r, since=%r, search=%r, network=%r",
             limit, since_token, bool(search_filter), network_tuple,
         )
+
         if search_filter:
             # We explicitly don't bother caching searches or requests for
             # appservice specific lists.
@@ -441,6 +449,12 @@ class RoomListHandler(BaseHandler):
     def get_remote_public_room_list(self, server_name, limit=None, since_token=None,
                                     search_filter=None, include_all_networks=False,
                                     third_party_instance_id=None,):
+        if not self.enable_room_list_search:
+            defer.returnValue({
+                "chunk": [],
+                "total_room_count_estimate": 0,
+            })
+
         if search_filter:
             # We currently don't support searching across federation, so we have
             # to do it manually without pagination
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 190ea2c7b1..71ce5b54e5 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -160,6 +160,7 @@ class RoomMemberHandler(object):
         txn_id=None,
         ratelimit=True,
         content=None,
+        require_consent=True,
     ):
         user_id = target.to_string()
 
@@ -185,6 +186,7 @@ class RoomMemberHandler(object):
             token_id=requester.access_token_id,
             txn_id=txn_id,
             prev_events_and_hashes=prev_events_and_hashes,
+            require_consent=require_consent,
         )
 
         # Check if this event matches the previous membership event for the user.
@@ -232,6 +234,10 @@ class RoomMemberHandler(object):
                 self.copy_room_tags_and_direct_to_room(
                     predecessor["room_id"], room_id, user_id,
                 )
+                # Move over old push rules
+                self.store.move_push_rules_from_room_to_room_for_user(
+                    predecessor["room_id"], room_id, user_id,
+                )
         elif event.membership == Membership.LEAVE:
             if prev_member_event_id:
                 prev_member_event = yield self.store.get_event(prev_member_event_id)
@@ -301,6 +307,7 @@ class RoomMemberHandler(object):
             third_party_signed=None,
             ratelimit=True,
             content=None,
+            require_consent=True,
     ):
         key = (room_id,)
 
@@ -315,6 +322,7 @@ class RoomMemberHandler(object):
                 third_party_signed=third_party_signed,
                 ratelimit=ratelimit,
                 content=content,
+                require_consent=require_consent,
             )
 
         defer.returnValue(result)
@@ -331,6 +339,7 @@ class RoomMemberHandler(object):
             third_party_signed=None,
             ratelimit=True,
             content=None,
+            require_consent=True,
     ):
         content_specified = bool(content)
         if content is None:
@@ -512,6 +521,7 @@ class RoomMemberHandler(object):
             ratelimit=ratelimit,
             prev_events_and_hashes=prev_events_and_hashes,
             content=content,
+            require_consent=require_consent,
         )
         defer.returnValue(res)
 
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index c21da8343a..7dc0e236e7 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -38,18 +38,8 @@ class UserDirectoryHandler(object):
     world_readable or publically joinable room. We keep a database table up to date
     by streaming changes of the current state and recalculating whether users should
     be in the directory or not when necessary.
-
-    For each user in the directory we also store a room_id which is public and that the
-    user is joined to. This allows us to ignore history_visibility and join_rules changes
-    for that user in all other public rooms, as we know they'll still be in at least
-    one public room.
     """
 
-    INITIAL_ROOM_SLEEP_MS = 50
-    INITIAL_ROOM_SLEEP_COUNT = 100
-    INITIAL_ROOM_BATCH_SIZE = 100
-    INITIAL_USER_SLEEP_MS = 10
-
     def __init__(self, hs):
         self.store = hs.get_datastore()
         self.state = hs.get_state_handler()
@@ -59,11 +49,6 @@ class UserDirectoryHandler(object):
         self.is_mine_id = hs.is_mine_id
         self.update_user_directory = hs.config.update_user_directory
         self.search_all_users = hs.config.user_directory_search_all_users
-
-        # When start up for the first time we need to populate the user_directory.
-        # This is a set of user_id's we've inserted already
-        self.initially_handled_users = set()
-
         # The current position in the current_state_delta stream
         self.pos = None
 
@@ -126,7 +111,7 @@ class UserDirectoryHandler(object):
         # Support users are for diagnostics and should not appear in the user directory.
         if not is_support:
             yield self.store.update_profile_in_user_dir(
-                user_id, profile.display_name, profile.avatar_url, None
+                user_id, profile.display_name, profile.avatar_url
             )
 
     @defer.inlineCallbacks
@@ -143,10 +128,9 @@ class UserDirectoryHandler(object):
         if self.pos is None:
             self.pos = yield self.store.get_user_directory_stream_pos()
 
-        # If still None then we need to do the initial fill of directory
+        # If still None then the initial background update hasn't happened yet
         if self.pos is None:
-            yield self._do_initial_spam()
-            self.pos = yield self.store.get_user_directory_stream_pos()
+            defer.returnValue(None)
 
         # Loop round handling deltas until we're up to date
         while True:
@@ -168,113 +152,6 @@ class UserDirectoryHandler(object):
                 yield self.store.update_user_directory_stream_pos(self.pos)
 
     @defer.inlineCallbacks
-    def _do_initial_spam(self):
-        """Populates the user_directory from the current state of the DB, used
-        when synapse first starts with user_directory support
-        """
-        new_pos = yield self.store.get_max_stream_id_in_current_state_deltas()
-
-        # Delete any existing entries just in case there are any
-        yield self.store.delete_all_from_user_dir()
-
-        # We process by going through each existing room at a time.
-        room_ids = yield self.store.get_all_rooms()
-
-        logger.info("Doing initial update of user directory. %d rooms", len(room_ids))
-        num_processed_rooms = 0
-
-        for room_id in room_ids:
-            logger.info("Handling room %d/%d", num_processed_rooms + 1, len(room_ids))
-            yield self._handle_initial_room(room_id)
-            num_processed_rooms += 1
-            yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0)
-
-        logger.info("Processed all rooms.")
-
-        if self.search_all_users:
-            num_processed_users = 0
-            user_ids = yield self.store.get_all_local_users()
-            logger.info(
-                "Doing initial update of user directory. %d users", len(user_ids)
-            )
-            for user_id in user_ids:
-                # We add profiles for all users even if they don't match the
-                # include pattern, just in case we want to change it in future
-                logger.info(
-                    "Handling user %d/%d", num_processed_users + 1, len(user_ids)
-                )
-                yield self._handle_local_user(user_id)
-                num_processed_users += 1
-                yield self.clock.sleep(self.INITIAL_USER_SLEEP_MS / 1000.0)
-
-            logger.info("Processed all users")
-
-        self.initially_handled_users = None
-
-        yield self.store.update_user_directory_stream_pos(new_pos)
-
-    @defer.inlineCallbacks
-    def _handle_initial_room(self, room_id):
-        """
-        Called when we initially fill out user_directory one room at a time
-        """
-        is_in_room = yield self.store.is_host_joined(room_id, self.server_name)
-        if not is_in_room:
-            return
-
-        is_public = yield self.store.is_room_world_readable_or_publicly_joinable(
-            room_id
-        )
-
-        users_with_profile = yield self.state.get_current_user_in_room(room_id)
-        user_ids = set(users_with_profile)
-        unhandled_users = user_ids - self.initially_handled_users
-
-        yield self.store.add_profiles_to_user_dir(
-            {user_id: users_with_profile[user_id] for user_id in unhandled_users},
-        )
-
-        self.initially_handled_users |= unhandled_users
-
-        # We now go and figure out the new users who share rooms with user entries
-        # We sleep aggressively here as otherwise it can starve resources.
-        # We also batch up inserts/updates, but try to avoid too many at once.
-        to_insert = set()
-        count = 0
-        for user_id in user_ids:
-            if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
-                yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0)
-
-            if not self.is_mine_id(user_id):
-                count += 1
-                continue
-
-            if self.store.get_if_app_services_interested_in_user(user_id):
-                count += 1
-                continue
-
-            for other_user_id in user_ids:
-                if user_id == other_user_id:
-                    continue
-
-                if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
-                    yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0)
-                count += 1
-
-                user_set = (user_id, other_user_id)
-                to_insert.add(user_set)
-
-                if len(to_insert) > self.INITIAL_ROOM_BATCH_SIZE:
-                    yield self.store.add_users_who_share_room(
-                        room_id, not is_public, to_insert
-                    )
-                    to_insert.clear()
-
-        if to_insert:
-            yield self.store.add_users_who_share_room(room_id, not is_public, to_insert)
-            to_insert.clear()
-
-    @defer.inlineCallbacks
     def _handle_deltas(self, deltas):
         """Called with the state deltas to process
         """
@@ -423,7 +300,9 @@ class UserDirectoryHandler(object):
 
         row = yield self.store.get_user_in_directory(user_id)
         if not row:
-            yield self.store.add_profiles_to_user_dir({user_id: profile})
+            yield self.store.update_profile_in_user_dir(
+                user_id, profile.display_name, profile.avatar_url
+            )
 
     @defer.inlineCallbacks
     def _handle_new_user(self, room_id, user_id, profile):
@@ -435,9 +314,9 @@ class UserDirectoryHandler(object):
         """
         logger.debug("Adding new user to dir, %r", user_id)
 
-        row = yield self.store.get_user_in_directory(user_id)
-        if not row:
-            yield self.store.add_profiles_to_user_dir({user_id: profile})
+        yield self.store.update_profile_in_user_dir(
+            user_id, profile.display_name, profile.avatar_url
+        )
 
         is_public = yield self.store.is_room_world_readable_or_publicly_joinable(
             room_id
@@ -445,34 +324,39 @@ class UserDirectoryHandler(object):
         # Now we update users who share rooms with users.
         users_with_profile = yield self.state.get_current_user_in_room(room_id)
 
-        to_insert = set()
+        if is_public:
+            yield self.store.add_users_in_public_rooms(room_id, (user_id,))
+        else:
+            to_insert = set()
 
-        # First, if they're our user then we need to update for every user
-        if self.is_mine_id(user_id):
+            # First, if they're our user then we need to update for every user
+            if self.is_mine_id(user_id):
 
-            is_appservice = self.store.get_if_app_services_interested_in_user(user_id)
+                is_appservice = self.store.get_if_app_services_interested_in_user(
+                    user_id
+                )
 
-            # We don't care about appservice users.
-            if not is_appservice:
-                for other_user_id in users_with_profile:
-                    if user_id == other_user_id:
-                        continue
+                # We don't care about appservice users.
+                if not is_appservice:
+                    for other_user_id in users_with_profile:
+                        if user_id == other_user_id:
+                            continue
 
-                    to_insert.add((user_id, other_user_id))
+                        to_insert.add((user_id, other_user_id))
 
-        # Next we need to update for every local user in the room
-        for other_user_id in users_with_profile:
-            if user_id == other_user_id:
-                continue
+            # Next we need to update for every local user in the room
+            for other_user_id in users_with_profile:
+                if user_id == other_user_id:
+                    continue
 
-            is_appservice = self.store.get_if_app_services_interested_in_user(
-                other_user_id
-            )
-            if self.is_mine_id(other_user_id) and not is_appservice:
-                to_insert.add((other_user_id, user_id))
+                is_appservice = self.store.get_if_app_services_interested_in_user(
+                    other_user_id
+                )
+                if self.is_mine_id(other_user_id) and not is_appservice:
+                    to_insert.add((other_user_id, user_id))
 
-        if to_insert:
-            yield self.store.add_users_who_share_room(room_id, not is_public, to_insert)
+            if to_insert:
+                yield self.store.add_users_who_share_private_room(room_id, to_insert)
 
     @defer.inlineCallbacks
     def _handle_remove_user(self, room_id, user_id):
@@ -487,10 +371,10 @@ class UserDirectoryHandler(object):
         # Remove user from sharing tables
         yield self.store.remove_user_who_share_room(user_id, room_id)
 
-        # Are they still in a room with members? If not, remove them entirely.
-        users_in_room_with = yield self.store.get_users_who_share_room_from_dir(user_id)
+        # Are they still in any rooms? If not, remove them entirely.
+        rooms_user_is_in = yield self.store.get_user_dir_rooms_user_is_in(user_id)
 
-        if len(users_in_room_with) == 0:
+        if len(rooms_user_is_in) == 0:
             yield self.store.remove_from_user_dir(user_id)
 
     @defer.inlineCallbacks
@@ -517,9 +401,7 @@ class UserDirectoryHandler(object):
         new_avatar = event.content.get("avatar_url")
 
         if prev_name != new_name or prev_avatar != new_avatar:
-            yield self.store.update_profile_in_user_dir(
-                user_id, new_name, new_avatar, room_id
-            )
+            yield self.store.update_profile_in_user_dir(user_id, new_name, new_avatar)
 
     @defer.inlineCallbacks
     def _get_key_change(self, prev_event_id, event_id, key_name, public_value):
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index fd59f1595f..47cdf30bd3 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -16,6 +16,7 @@
 """
 
 import logging
+import random
 
 from six import itervalues
 
@@ -74,6 +75,8 @@ class ReplicationStreamer(object):
         self.notifier = hs.get_notifier()
         self._server_notices_sender = hs.get_server_notices_sender()
 
+        self._replication_torture_level = hs.config.replication_torture_level
+
         # Current connections.
         self.connections = []
 
@@ -157,10 +160,23 @@ class ReplicationStreamer(object):
                     for stream in self.streams:
                         stream.advance_current_token()
 
-                    for stream in self.streams:
+                    all_streams = self.streams
+
+                    if self._replication_torture_level is not None:
+                        # there is no guarantee about ordering between the streams,
+                        # so let's shuffle them around a bit when we are in torture mode.
+                        all_streams = list(all_streams)
+                        random.shuffle(all_streams)
+
+                    for stream in all_streams:
                         if stream.last_token == stream.upto_token:
                             continue
 
+                        if self._replication_torture_level:
+                            yield self.clock.sleep(
+                                self._replication_torture_level / 1000.0
+                            )
+
                         logger.debug(
                             "Getting stream: %s: %s -> %s",
                             stream.NAME, stream.last_token, stream.upto_token
diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py
index 2a29f0c2af..e788769639 100644
--- a/synapse/rest/client/v1/admin.py
+++ b/synapse/rest/client/v1/admin.py
@@ -490,40 +490,54 @@ class ShutdownRoomRestServlet(ClientV1RestServlet):
 
         requester_user_id = requester.user.to_string()
 
-        logger.info("Shutting down room %r", room_id)
+        logger.info(
+            "Shutting down room %r, joining to new room: %r",
+            room_id, new_room_id,
+        )
 
+        # This will work even if the room is already blocked, but that is
+        # desirable in case the first attempt at blocking the room failed below.
         yield self.store.block_room(room_id, requester_user_id)
 
         users = yield self.state.get_current_user_in_room(room_id)
         kicked_users = []
+        failed_to_kick_users = []
         for user_id in users:
             if not self.hs.is_mine_id(user_id):
                 continue
 
             logger.info("Kicking %r from %r...", user_id, room_id)
 
-            target_requester = create_requester(user_id)
-            yield self.room_member_handler.update_membership(
-                requester=target_requester,
-                target=target_requester.user,
-                room_id=room_id,
-                action=Membership.LEAVE,
-                content={},
-                ratelimit=False
-            )
+            try:
+                target_requester = create_requester(user_id)
+                yield self.room_member_handler.update_membership(
+                    requester=target_requester,
+                    target=target_requester.user,
+                    room_id=room_id,
+                    action=Membership.LEAVE,
+                    content={},
+                    ratelimit=False,
+                    require_consent=False,
+                )
 
-            yield self.room_member_handler.forget(target_requester.user, room_id)
+                yield self.room_member_handler.forget(target_requester.user, room_id)
 
-            yield self.room_member_handler.update_membership(
-                requester=target_requester,
-                target=target_requester.user,
-                room_id=new_room_id,
-                action=Membership.JOIN,
-                content={},
-                ratelimit=False
-            )
+                yield self.room_member_handler.update_membership(
+                    requester=target_requester,
+                    target=target_requester.user,
+                    room_id=new_room_id,
+                    action=Membership.JOIN,
+                    content={},
+                    ratelimit=False,
+                    require_consent=False,
+                )
 
-            kicked_users.append(user_id)
+                kicked_users.append(user_id)
+            except Exception:
+                logger.exception(
+                    "Failed to leave old room and join new room for %r", user_id,
+                )
+                failed_to_kick_users.append(user_id)
 
         yield self.event_creation_handler.create_and_send_nonmember_event(
             room_creator_requester,
@@ -544,6 +558,7 @@ class ShutdownRoomRestServlet(ClientV1RestServlet):
 
         defer.returnValue((200, {
             "kicked_users": kicked_users,
+            "failed_to_kick_users": failed_to_kick_users,
             "local_aliases": aliases_for_room,
             "new_room_id": new_room_id,
         }))
diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py
index 6121c5b6df..8d56effbb8 100644
--- a/synapse/rest/client/v1/login.py
+++ b/synapse/rest/client/v1/login.py
@@ -22,6 +22,7 @@ from twisted.internet import defer
 from twisted.web.client import PartialDownloadError
 
 from synapse.api.errors import Codes, LoginError, SynapseError
+from synapse.api.ratelimiting import Ratelimiter
 from synapse.http.server import finish_request
 from synapse.http.servlet import (
     RestServlet,
@@ -97,6 +98,7 @@ class LoginRestServlet(ClientV1RestServlet):
         self.registration_handler = hs.get_registration_handler()
         self.handlers = hs.get_handlers()
         self._well_known_builder = WellKnownBuilder(hs)
+        self._address_ratelimiter = Ratelimiter()
 
     def on_GET(self, request):
         flows = []
@@ -129,6 +131,13 @@ class LoginRestServlet(ClientV1RestServlet):
 
     @defer.inlineCallbacks
     def on_POST(self, request):
+        self._address_ratelimiter.ratelimit(
+            request.getClientIP(), time_now_s=self.hs.clock.time(),
+            rate_hz=self.hs.config.rc_login_address.per_second,
+            burst_count=self.hs.config.rc_login_address.burst_count,
+            update=True,
+        )
+
         login_submission = parse_json_object_from_request(request)
         try:
             if self.jwt_enabled and (login_submission["type"] ==
@@ -285,6 +294,7 @@ class LoginRestServlet(ClientV1RestServlet):
             raise LoginError(401, "Invalid JWT", errcode=Codes.UNAUTHORIZED)
 
         user_id = UserID(user, self.hs.hostname).to_string()
+
         auth_handler = self.auth_handler
         registered_user_id = yield auth_handler.check_user_exists(user_id)
         if registered_user_id:
diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py
index 6f34029431..6d235262c8 100644
--- a/synapse/rest/client/v2_alpha/register.py
+++ b/synapse/rest/client/v2_alpha/register.py
@@ -210,8 +210,8 @@ class RegisterRestServlet(RestServlet):
 
         allowed, time_allowed = self.ratelimiter.can_do_action(
             client_addr, time_now_s=time_now,
-            rate_hz=self.hs.config.rc_registration_requests_per_second,
-            burst_count=self.hs.config.rc_registration_request_burst_count,
+            rate_hz=self.hs.config.rc_registration.per_second,
+            burst_count=self.hs.config.rc_registration.burst_count,
             update=False,
         )
 
diff --git a/synapse/rest/client/v2_alpha/user_directory.py b/synapse/rest/client/v2_alpha/user_directory.py
index cac0624ba7..36b02de37f 100644
--- a/synapse/rest/client/v2_alpha/user_directory.py
+++ b/synapse/rest/client/v2_alpha/user_directory.py
@@ -59,6 +59,12 @@ class UserDirectorySearchRestServlet(RestServlet):
         requester = yield self.auth.get_user_by_req(request, allow_guest=False)
         user_id = requester.user.to_string()
 
+        if not self.hs.config.user_directory_search_enabled:
+            defer.returnValue((200, {
+                "limited": False,
+                "results": [],
+            }))
+
         body = parse_json_object_from_request(request)
 
         limit = body.get("limit", 10)
diff --git a/synapse/server.py b/synapse/server.py
index 72835e8c86..dc8f1ccb8c 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -42,7 +42,7 @@ from synapse.federation.federation_server import (
     ReplicationFederationHandlerRegistry,
 )
 from synapse.federation.send_queue import FederationRemoteSendQueue
-from synapse.federation.transaction_queue import TransactionQueue
+from synapse.federation.sender import FederationSender
 from synapse.federation.transport.client import TransportLayerClient
 from synapse.groups.attestations import GroupAttestationSigning, GroupAttestionRenewer
 from synapse.groups.groups_server import GroupsServerHandler
@@ -185,6 +185,10 @@ class HomeServer(object):
         'registration_handler',
     ]
 
+    REQUIRED_ON_MASTER_STARTUP = [
+        "user_directory_handler",
+    ]
+
     # This is overridden in derived application classes
     # (such as synapse.app.homeserver.SynapseHomeServer) and gives the class to be
     # instantiated during setup() for future return by get_datastore()
@@ -221,6 +225,15 @@ class HomeServer(object):
             conn.commit()
         logger.info("Finished setting up.")
 
+    def setup_master(self):
+        """
+        Some handlers have side effects on instantiation (like registering
+        background updates). This function causes them to be fetched, and
+        therefore instantiated, to run those side effects.
+        """
+        for i in self.REQUIRED_ON_MASTER_STARTUP:
+            getattr(self, "get_" + i)()
+
     def get_reactor(self):
         """
         Fetch the Twisted reactor in use by this HomeServer.
@@ -421,7 +434,7 @@ class HomeServer(object):
 
     def build_federation_sender(self):
         if self.should_send_federation():
-            return TransactionQueue(self)
+            return FederationSender(self)
         elif not self.config.worker_app:
             return FederationRemoteSendQueue(self)
         else:
diff --git a/synapse/server.pyi b/synapse/server.pyi
index fb8df56cd5..3ba3a967c2 100644
--- a/synapse/server.pyi
+++ b/synapse/server.pyi
@@ -1,5 +1,6 @@
 import synapse.api.auth
 import synapse.config.homeserver
+import synapse.federation.sender
 import synapse.federation.transaction_queue
 import synapse.federation.transport.client
 import synapse.handlers
@@ -62,7 +63,7 @@ class HomeServer(object):
     def get_set_password_handler(self) -> synapse.handlers.set_password.SetPasswordHandler:
         pass
 
-    def get_federation_sender(self) -> synapse.federation.transaction_queue.TransactionQueue:
+    def get_federation_sender(self) -> synapse.federation.sender.FederationSender:
         pass
 
     def get_federation_transport_client(self) -> synapse.federation.transport.client.TransportLayerClient:
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index a0333d5309..7e3903859b 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -767,18 +767,25 @@ class SQLBaseStore(object):
         """
         allvalues = {}
         allvalues.update(keyvalues)
-        allvalues.update(values)
         allvalues.update(insertion_values)
 
+        if not values:
+            latter = "NOTHING"
+        else:
+            allvalues.update(values)
+            latter = (
+                "UPDATE SET " + ", ".join(k + "=EXCLUDED." + k for k in values)
+            )
+
         sql = (
             "INSERT INTO %s (%s) VALUES (%s) "
-            "ON CONFLICT (%s) DO UPDATE SET %s"
+            "ON CONFLICT (%s) DO %s"
         ) % (
             table,
             ", ".join(k for k in allvalues),
             ", ".join("?" for _ in allvalues),
             ", ".join(k for k in keyvalues),
-            ", ".join(k + "=EXCLUDED." + k for k in values),
+            latter
         )
         txn.execute(sql, list(allvalues.values()))
 
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index 60cdc884e6..a2f8c23a65 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -52,7 +52,9 @@ class BackgroundUpdatePerformance(object):
         Returns:
             A duration in ms as a float
         """
-        if self.total_item_count == 0:
+        if self.avg_duration_ms == 0:
+            return 0
+        elif self.total_item_count == 0:
             return None
         else:
             # Use the exponential moving average so that we can adapt to
@@ -64,7 +66,9 @@ class BackgroundUpdatePerformance(object):
         Returns:
             A duration in ms as a float
         """
-        if self.total_item_count == 0:
+        if self.total_duration_ms == 0:
+            return 0
+        elif self.total_item_count == 0:
             return None
         else:
             return float(self.total_item_count) / float(self.total_duration_ms)
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index 6a5028961d..4b8438c3e9 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -186,6 +186,63 @@ class PushRulesWorkerStore(ApplicationServiceWorkerStore,
         defer.returnValue(results)
 
     @defer.inlineCallbacks
+    def move_push_rule_from_room_to_room(
+        self, new_room_id, user_id, rule,
+    ):
+        """Move a single push rule from one room to another for a specific user.
+
+        Args:
+            new_room_id (str): ID of the new room.
+            user_id (str): ID of user the push rule belongs to.
+            rule (Dict): A push rule.
+        """
+        # Create new rule id
+        rule_id_scope = '/'.join(rule["rule_id"].split('/')[:-1])
+        new_rule_id = rule_id_scope + "/" + new_room_id
+
+        # Change room id in each condition
+        for condition in rule.get("conditions", []):
+            if condition.get("key") == "room_id":
+                condition["pattern"] = new_room_id
+
+        # Add the rule for the new room
+        yield self.add_push_rule(
+            user_id=user_id,
+            rule_id=new_rule_id,
+            priority_class=rule["priority_class"],
+            conditions=rule["conditions"],
+            actions=rule["actions"],
+        )
+
+        # Delete push rule for the old room
+        yield self.delete_push_rule(user_id, rule["rule_id"])
+
+    @defer.inlineCallbacks
+    def move_push_rules_from_room_to_room_for_user(
+        self, old_room_id, new_room_id, user_id,
+    ):
+        """Move all of the push rules from one room to another for a specific
+        user.
+
+        Args:
+            old_room_id (str): ID of the old room.
+            new_room_id (str): ID of the new room.
+            user_id (str): ID of user to copy push rules for.
+        """
+        # Retrieve push rules for this user
+        user_push_rules = yield self.get_push_rules_for_user(user_id)
+
+        # Get rules relating to the old room, move them to the new room, then
+        # delete them from the old room
+        for rule in user_push_rules:
+            conditions = rule.get("conditions", [])
+            if any((c.get("key") == "room_id" and
+                    c.get("pattern") == old_room_id) for c in conditions):
+                self.move_push_rule_from_room_to_room(
+                    new_room_id, user_id, rule,
+                )
+
+    @defer.inlineCallbacks
     def bulk_get_push_rules_for_room(self, event, context):
         state_group = context.state_group
         if not state_group:
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 41c65e112a..a979d4860a 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -500,10 +500,22 @@ class RoomStore(RoomWorkerStore, SearchStore):
 
     @defer.inlineCallbacks
     def block_room(self, room_id, user_id):
-        yield self._simple_insert(
+        """Marks the room as blocked. Can be called multiple times.
+
+        Args:
+            room_id (str): Room to block
+            user_id (str): Who blocked it
+
+        Returns:
+            Deferred
+        """
+        yield self._simple_upsert(
             table="blocked_rooms",
-            values={
+            keyvalues={
                 "room_id": room_id,
+            },
+            values={},
+            insertion_values={
                 "user_id": user_id,
             },
             desc="block_room",
diff --git a/synapse/storage/schema/delta/53/user_dir_populate.sql b/synapse/storage/schema/delta/53/user_dir_populate.sql
new file mode 100644
index 0000000000..ffcc896b58
--- /dev/null
+++ b/synapse/storage/schema/delta/53/user_dir_populate.sql
@@ -0,0 +1,30 @@
+/* Copyright 2019 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Set up staging tables
+INSERT INTO background_updates (update_name, progress_json) VALUES
+    ('populate_user_directory_createtables', '{}');
+
+-- Run through each room and update the user directory according to who is in it
+INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
+    ('populate_user_directory_process_rooms', '{}', 'populate_user_directory_createtables');
+
+-- Insert all users, if search_all_users is on
+INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
+    ('populate_user_directory_process_users', '{}', 'populate_user_directory_process_rooms');
+
+-- Clean up staging tables
+INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
+    ('populate_user_directory_cleanup', '{}', 'populate_user_directory_process_users');
diff --git a/synapse/storage/schema/delta/53/user_share.sql b/synapse/storage/schema/delta/53/user_share.sql
index 14424ded0c..5831b1a6f8 100644
--- a/synapse/storage/schema/delta/53/user_share.sql
+++ b/synapse/storage/schema/delta/53/user_share.sql
@@ -16,9 +16,6 @@
 -- Old disused version of the tables below.
 DROP TABLE IF EXISTS users_who_share_rooms;
 
--- This is no longer used because it's duplicated by the users_who_share_public_rooms
-DROP TABLE IF EXISTS users_in_public_rooms;
-
 -- Tables keeping track of what users share rooms. This is a map of local users
 -- to local or remote users, per room. Remote users cannot be in the user_id
 -- column, only the other_user_id column. There are two tables, one for public
diff --git a/synapse/storage/schema/delta/53/users_in_public_rooms.sql b/synapse/storage/schema/delta/53/users_in_public_rooms.sql
new file mode 100644
index 0000000000..f7827ca6d2
--- /dev/null
+++ b/synapse/storage/schema/delta/53/users_in_public_rooms.sql
@@ -0,0 +1,28 @@
+/* Copyright 2019 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- We don't need the old version of this table.
+DROP TABLE IF EXISTS users_in_public_rooms;
+
+-- Old version of users_in_public_rooms
+DROP TABLE IF EXISTS users_who_share_public_rooms;
+
+-- Track what users are in public rooms.
+CREATE TABLE IF NOT EXISTS users_in_public_rooms (
+    user_id TEXT NOT NULL,
+    room_id TEXT NOT NULL
+);
+
+CREATE UNIQUE INDEX users_in_public_rooms_u_idx ON users_in_public_rooms(user_id, room_id);
diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py
index 2317d22ed6..d360e857d1 100644
--- a/synapse/storage/user_directory.py
+++ b/synapse/storage/user_directory.py
@@ -16,22 +16,301 @@
 import logging
 import re
 
-from six import iteritems
-
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, JoinRules
+from synapse.storage.background_updates import BackgroundUpdateStore
 from synapse.storage.engines import PostgresEngine, Sqlite3Engine
 from synapse.storage.state import StateFilter
 from synapse.types import get_domain_from_id, get_localpart_from_id
-from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
-
-from ._base import SQLBaseStore
+from synapse.util.caches.descriptors import cached
 
 logger = logging.getLogger(__name__)
 
 
-class UserDirectoryStore(SQLBaseStore):
+TEMP_TABLE = "_temp_populate_user_directory"
+
+
+class UserDirectoryStore(BackgroundUpdateStore):
+
+    # How many records do we calculate before sending it to
+    # add_users_who_share_private_rooms?
+    SHARE_PRIVATE_WORKING_SET = 500
+
+    def __init__(self, db_conn, hs):
+        super(UserDirectoryStore, self).__init__(db_conn, hs)
+
+        self.server_name = hs.hostname
+
+        self.register_background_update_handler(
+            "populate_user_directory_createtables",
+            self._populate_user_directory_createtables,
+        )
+        self.register_background_update_handler(
+            "populate_user_directory_process_rooms",
+            self._populate_user_directory_process_rooms,
+        )
+        self.register_background_update_handler(
+            "populate_user_directory_process_users",
+            self._populate_user_directory_process_users,
+        )
+        self.register_background_update_handler(
+            "populate_user_directory_cleanup", self._populate_user_directory_cleanup
+        )
+
+    @defer.inlineCallbacks
+    def _populate_user_directory_createtables(self, progress, batch_size):
+
+        # Get all the rooms that we want to process.
+        def _make_staging_area(txn):
+            sql = (
+                "CREATE TABLE IF NOT EXISTS "
+                + TEMP_TABLE
+                + "_rooms(room_id TEXT NOT NULL, events BIGINT NOT NULL)"
+            )
+            txn.execute(sql)
+
+            sql = (
+                "CREATE TABLE IF NOT EXISTS "
+                + TEMP_TABLE
+                + "_position(position TEXT NOT NULL)"
+            )
+            txn.execute(sql)
+
+            # Get rooms we want to process from the database
+            sql = """
+                SELECT room_id, count(*) FROM current_state_events
+                GROUP BY room_id
+            """
+            txn.execute(sql)
+            rooms = [{"room_id": x[0], "events": x[1]} for x in txn.fetchall()]
+            self._simple_insert_many_txn(txn, TEMP_TABLE + "_rooms", rooms)
+            del rooms
+
+            # If search all users is on, get all the users we want to add.
+            if self.hs.config.user_directory_search_all_users:
+                sql = (
+                    "CREATE TABLE IF NOT EXISTS "
+                    + TEMP_TABLE
+                    + "_users(user_id TEXT NOT NULL)"
+                )
+                txn.execute(sql)
+
+                txn.execute("SELECT name FROM users")
+                users = [{"user_id": x[0]} for x in txn.fetchall()]
+
+                self._simple_insert_many_txn(txn, TEMP_TABLE + "_users", users)
+
+        new_pos = yield self.get_max_stream_id_in_current_state_deltas()
+        yield self.runInteraction(
+            "populate_user_directory_temp_build", _make_staging_area
+        )
+        yield self._simple_insert(TEMP_TABLE + "_position", {"position": new_pos})
+
+        yield self._end_background_update("populate_user_directory_createtables")
+        defer.returnValue(1)
+
+    @defer.inlineCallbacks
+    def _populate_user_directory_cleanup(self, progress, batch_size):
+        """
+        Update the user directory stream position, then clean up the old tables.
+        """
+        position = yield self._simple_select_one_onecol(
+            TEMP_TABLE + "_position", None, "position"
+        )
+        yield self.update_user_directory_stream_pos(position)
+
+        def _delete_staging_area(txn):
+            txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_rooms")
+            txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_users")
+            txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_position")
+
+        yield self.runInteraction(
+            "populate_user_directory_cleanup", _delete_staging_area
+        )
+
+        yield self._end_background_update("populate_user_directory_cleanup")
+        defer.returnValue(1)
+
+    @defer.inlineCallbacks
+    def _populate_user_directory_process_rooms(self, progress, batch_size):
+
+        state = self.hs.get_state_handler()
+
+        # If we don't have progress filed, delete everything.
+        if not progress:
+            yield self.delete_all_from_user_dir()
+
+        def _get_next_batch(txn):
+            sql = """
+                SELECT room_id FROM %s
+                ORDER BY events DESC
+                LIMIT %s
+            """ % (
+                TEMP_TABLE + "_rooms",
+                str(batch_size),
+            )
+            txn.execute(sql)
+            rooms_to_work_on = txn.fetchall()
+
+            if not rooms_to_work_on:
+                return None
+
+            rooms_to_work_on = [x[0] for x in rooms_to_work_on]
+
+            # Get how many are left to process, so we can give status on how
+            # far we are in processing
+            txn.execute("SELECT COUNT(*) FROM " + TEMP_TABLE + "_rooms")
+            progress["remaining"] = txn.fetchone()[0]
+
+            return rooms_to_work_on
+
+        rooms_to_work_on = yield self.runInteraction(
+            "populate_user_directory_temp_read", _get_next_batch
+        )
+
+        # No more rooms -- complete the transaction.
+        if not rooms_to_work_on:
+            yield self._end_background_update("populate_user_directory_process_rooms")
+            defer.returnValue(1)
+
+        logger.info(
+            "Processing the next %d rooms of %d remaining"
+            % (len(rooms_to_work_on), progress["remaining"])
+        )
+
+        for room_id in rooms_to_work_on:
+            is_in_room = yield self.is_host_joined(room_id, self.server_name)
+
+            if is_in_room:
+                is_public = yield self.is_room_world_readable_or_publicly_joinable(
+                    room_id
+                )
+
+                users_with_profile = yield state.get_current_user_in_room(room_id)
+                user_ids = set(users_with_profile)
+
+                # Update each user in the user directory.
+                for user_id, profile in users_with_profile.items():
+                    yield self.update_profile_in_user_dir(
+                        user_id, profile.display_name, profile.avatar_url
+                    )
+
+                to_insert = set()
+
+                if is_public:
+                    for user_id in user_ids:
+                        if self.get_if_app_services_interested_in_user(user_id):
+                            continue
+
+                        to_insert.add(user_id)
+
+                    if to_insert:
+                        yield self.add_users_in_public_rooms(room_id, to_insert)
+                        to_insert.clear()
+                else:
+                    for user_id in user_ids:
+                        if not self.hs.is_mine_id(user_id):
+                            continue
+
+                        if self.get_if_app_services_interested_in_user(user_id):
+                            continue
+
+                        for other_user_id in user_ids:
+                            if user_id == other_user_id:
+                                continue
+
+                            user_set = (user_id, other_user_id)
+                            to_insert.add(user_set)
+
+                            # If it gets too big, stop and write to the database
+                            # to prevent storing too much in RAM.
+                            if len(to_insert) >= self.SHARE_PRIVATE_WORKING_SET:
+                                yield self.add_users_who_share_private_room(
+                                    room_id, to_insert
+                                )
+                                to_insert.clear()
+
+                    if to_insert:
+                        yield self.add_users_who_share_private_room(room_id, to_insert)
+                        to_insert.clear()
+
+            # We've finished a room. Delete it from the table.
+            yield self._simple_delete_one(TEMP_TABLE + "_rooms", {"room_id": room_id})
+            # Update the remaining counter.
+            progress["remaining"] -= 1
+            yield self.runInteraction(
+                "populate_user_directory",
+                self._background_update_progress_txn,
+                "populate_user_directory_process_rooms",
+                progress,
+            )
+
+        defer.returnValue(len(rooms_to_work_on))
+
+    @defer.inlineCallbacks
+    def _populate_user_directory_process_users(self, progress, batch_size):
+        """
+        If search_all_users is enabled, add all of the users to the user directory.
+        """
+        if not self.hs.config.user_directory_search_all_users:
+            yield self._end_background_update("populate_user_directory_process_users")
+            defer.returnValue(1)
+
+        def _get_next_batch(txn):
+            sql = "SELECT user_id FROM %s LIMIT %s" % (
+                TEMP_TABLE + "_users",
+                str(batch_size),
+            )
+            txn.execute(sql)
+            users_to_work_on = txn.fetchall()
+
+            if not users_to_work_on:
+                return None
+
+            users_to_work_on = [x[0] for x in users_to_work_on]
+
+            # Get how many are left to process, so we can give status on how
+            # far we are in processing
+            sql = "SELECT COUNT(*) FROM " + TEMP_TABLE + "_users"
+            txn.execute(sql)
+            progress["remaining"] = txn.fetchone()[0]
+
+            return users_to_work_on
+
+        users_to_work_on = yield self.runInteraction(
+            "populate_user_directory_temp_read", _get_next_batch
+        )
+
+        # No more users -- complete the transaction.
+        if not users_to_work_on:
+            yield self._end_background_update("populate_user_directory_process_users")
+            defer.returnValue(1)
+
+        logger.info(
+            "Processing the next %d users of %d remaining"
+            % (len(users_to_work_on), progress["remaining"])
+        )
+
+        for user_id in users_to_work_on:
+            profile = yield self.get_profileinfo(get_localpart_from_id(user_id))
+            yield self.update_profile_in_user_dir(
+                user_id, profile.display_name, profile.avatar_url
+            )
+
+            # We've finished processing a user. Delete it from the table.
+            yield self._simple_delete_one(TEMP_TABLE + "_users", {"user_id": user_id})
+            # Update the remaining counter.
+            progress["remaining"] -= 1
+            yield self.runInteraction(
+                "populate_user_directory",
+                self._background_update_progress_txn,
+                "populate_user_directory_process_users",
+                progress,
+            )
+
+        defer.returnValue(len(users_to_work_on))
+
     @defer.inlineCallbacks
     def is_room_world_readable_or_publicly_joinable(self, room_id):
         """Check if the room is either world_readable or publically joinable
@@ -63,89 +342,16 @@ class UserDirectoryStore(SQLBaseStore):
 
         defer.returnValue(False)
 
-    def add_profiles_to_user_dir(self, users_with_profile):
-        """Add profiles to the user directory
-
-        Args:
-            users_with_profile (dict): Users to add to directory in the form of
-                mapping of user_id -> ProfileInfo
+    def update_profile_in_user_dir(self, user_id, display_name, avatar_url):
+        """
+        Update or add a user's profile in the user directory.
         """
 
-        if isinstance(self.database_engine, PostgresEngine):
-            # We weight the loclpart most highly, then display name and finally
-            # server name
-            sql = """
-                INSERT INTO user_directory_search(user_id, vector)
-                VALUES (?,
-                    setweight(to_tsvector('english', ?), 'A')
-                    || setweight(to_tsvector('english', ?), 'D')
-                    || setweight(to_tsvector('english', COALESCE(?, '')), 'B')
-                )
-            """
-            args = (
-                (
-                    user_id,
-                    get_localpart_from_id(user_id),
-                    get_domain_from_id(user_id),
-                    profile.display_name,
-                )
-                for user_id, profile in iteritems(users_with_profile)
-            )
-        elif isinstance(self.database_engine, Sqlite3Engine):
-            sql = """
-                INSERT INTO user_directory_search(user_id, value)
-                VALUES (?,?)
-            """
-            args = tuple(
-                (
-                    user_id,
-                    "%s %s" % (user_id, p.display_name) if p.display_name else user_id,
-                )
-                for user_id, p in iteritems(users_with_profile)
-            )
-        else:
-            # This should be unreachable.
-            raise Exception("Unrecognized database engine")
-
-        def _add_profiles_to_user_dir_txn(txn):
-            txn.executemany(sql, args)
-            self._simple_insert_many_txn(
-                txn,
-                table="user_directory",
-                values=[
-                    {
-                        "user_id": user_id,
-                        "room_id": None,
-                        "display_name": profile.display_name,
-                        "avatar_url": profile.avatar_url,
-                    }
-                    for user_id, profile in iteritems(users_with_profile)
-                ],
-            )
-            for user_id in users_with_profile:
-                txn.call_after(self.get_user_in_directory.invalidate, (user_id,))
-
-        return self.runInteraction(
-            "add_profiles_to_user_dir", _add_profiles_to_user_dir_txn
-        )
-
-    @defer.inlineCallbacks
-    def update_user_in_user_dir(self, user_id, room_id):
-        yield self._simple_update_one(
-            table="user_directory",
-            keyvalues={"user_id": user_id},
-            updatevalues={"room_id": room_id},
-            desc="update_user_in_user_dir",
-        )
-        self.get_user_in_directory.invalidate((user_id,))
-
-    def update_profile_in_user_dir(self, user_id, display_name, avatar_url, room_id):
         def _update_profile_in_user_dir_txn(txn):
             new_entry = self._simple_upsert_txn(
                 txn,
                 table="user_directory",
                 keyvalues={"user_id": user_id},
-                insertion_values={"room_id": room_id},
                 values={"display_name": display_name, "avatar_url": avatar_url},
                 lock=False,  # We're only inserter
             )
@@ -242,14 +448,7 @@ class UserDirectoryStore(SQLBaseStore):
                 txn, table="user_directory_search", keyvalues={"user_id": user_id}
             )
             self._simple_delete_txn(
-                txn,
-                table="users_who_share_public_rooms",
-                keyvalues={"user_id": user_id},
-            )
-            self._simple_delete_txn(
-                txn,
-                table="users_who_share_public_rooms",
-                keyvalues={"other_user_id": user_id},
+                txn, table="users_in_public_rooms", keyvalues={"user_id": user_id}
             )
             self._simple_delete_txn(
                 txn,
@@ -271,9 +470,9 @@ class UserDirectoryStore(SQLBaseStore):
         in the given room_id
         """
         user_ids_share_pub = yield self._simple_select_onecol(
-            table="users_who_share_public_rooms",
+            table="users_in_public_rooms",
             keyvalues={"room_id": room_id},
-            retcol="other_user_id",
+            retcol="user_id",
             desc="get_users_in_dir_due_to_room",
         )
 
@@ -290,18 +489,6 @@ class UserDirectoryStore(SQLBaseStore):
         defer.returnValue(user_ids)
 
     @defer.inlineCallbacks
-    def get_all_rooms(self):
-        """Get all room_ids we've ever known about, in ascending order of "size"
-        """
-        sql = """
-            SELECT room_id FROM current_state_events
-            GROUP BY room_id
-            ORDER BY count(*) ASC
-        """
-        rows = yield self._execute("get_all_rooms", None, sql)
-        defer.returnValue([room_id for room_id, in rows])
-
-    @defer.inlineCallbacks
     def get_all_local_users(self):
         """Get all local users
         """
@@ -311,26 +498,19 @@ class UserDirectoryStore(SQLBaseStore):
         rows = yield self._execute("get_all_local_users", None, sql)
         defer.returnValue([name for name, in rows])
 
-    def add_users_who_share_room(self, room_id, share_private, user_id_tuples):
-        """Insert entries into the users_who_share_*_rooms table. The first
+    def add_users_who_share_private_room(self, room_id, user_id_tuples):
+        """Insert entries into the users_who_share_private_rooms table. The first
         user should be a local user.
 
         Args:
             room_id (str)
-            share_private (bool): Is the room private
             user_id_tuples([(str, str)]): iterable of 2-tuple of user IDs.
         """
 
         def _add_users_who_share_room_txn(txn):
-
-            if share_private:
-                tbl = "users_who_share_private_rooms"
-            else:
-                tbl = "users_who_share_public_rooms"
-
             self._simple_upsert_many_txn(
                 txn,
-                table=tbl,
+                table="users_who_share_private_rooms",
                 key_names=["user_id", "other_user_id", "room_id"],
                 key_values=[
                     (user_id, other_user_id, room_id)
@@ -339,15 +519,35 @@ class UserDirectoryStore(SQLBaseStore):
                 value_names=(),
                 value_values=None,
             )
-            for user_id, other_user_id in user_id_tuples:
-                txn.call_after(
-                    self.get_users_who_share_room_from_dir.invalidate, (user_id,)
-                )
 
         return self.runInteraction(
             "add_users_who_share_room", _add_users_who_share_room_txn
         )
 
+    def add_users_in_public_rooms(self, room_id, user_ids):
+        """Insert entries into the users_who_share_private_rooms table. The first
+        user should be a local user.
+
+        Args:
+            room_id (str)
+            user_ids (list[str])
+        """
+
+        def _add_users_in_public_rooms_txn(txn):
+
+            self._simple_upsert_many_txn(
+                txn,
+                table="users_in_public_rooms",
+                key_names=["user_id", "room_id"],
+                key_values=[(user_id, room_id) for user_id in user_ids],
+                value_names=(),
+                value_values=None,
+            )
+
+        return self.runInteraction(
+            "add_users_in_public_rooms", _add_users_in_public_rooms_txn
+        )
+
     def remove_user_who_share_room(self, user_id, room_id):
         """
         Deletes entries in the users_who_share_*_rooms table. The first
@@ -371,25 +571,18 @@ class UserDirectoryStore(SQLBaseStore):
             )
             self._simple_delete_txn(
                 txn,
-                table="users_who_share_public_rooms",
+                table="users_in_public_rooms",
                 keyvalues={"user_id": user_id, "room_id": room_id},
             )
-            self._simple_delete_txn(
-                txn,
-                table="users_who_share_public_rooms",
-                keyvalues={"other_user_id": user_id, "room_id": room_id},
-            )
-            txn.call_after(
-                self.get_users_who_share_room_from_dir.invalidate, (user_id,)
-            )
 
         return self.runInteraction(
             "remove_user_who_share_room", _remove_user_who_share_room_txn
         )
 
-    @cachedInlineCallbacks(max_entries=500000, iterable=True)
-    def get_users_who_share_room_from_dir(self, user_id):
-        """Returns the set of users who share a room with `user_id`
+    @defer.inlineCallbacks
+    def get_user_dir_rooms_user_is_in(self, user_id):
+        """
+        Returns the rooms that a user is in.
 
         Args:
             user_id(str): Must be a local user
@@ -400,23 +593,19 @@ class UserDirectoryStore(SQLBaseStore):
         rows = yield self._simple_select_onecol(
             table="users_who_share_private_rooms",
             keyvalues={"user_id": user_id},
-            retcol="other_user_id",
-            desc="get_users_who_share_room_with_user",
+            retcol="room_id",
+            desc="get_rooms_user_is_in",
         )
 
         pub_rows = yield self._simple_select_onecol(
-            table="users_who_share_public_rooms",
+            table="users_in_public_rooms",
             keyvalues={"user_id": user_id},
-            retcol="other_user_id",
-            desc="get_users_who_share_room_with_user",
+            retcol="room_id",
+            desc="get_rooms_user_is_in",
         )
 
         users = set(pub_rows)
         users.update(rows)
-
-        # Remove the user themselves from this list.
-        users.discard(user_id)
-
         defer.returnValue(list(users))
 
     @defer.inlineCallbacks
@@ -452,10 +641,9 @@ class UserDirectoryStore(SQLBaseStore):
         def _delete_all_from_user_dir_txn(txn):
             txn.execute("DELETE FROM user_directory")
             txn.execute("DELETE FROM user_directory_search")
-            txn.execute("DELETE FROM users_who_share_public_rooms")
+            txn.execute("DELETE FROM users_in_public_rooms")
             txn.execute("DELETE FROM users_who_share_private_rooms")
             txn.call_after(self.get_user_in_directory.invalidate_all)
-            txn.call_after(self.get_users_who_share_room_from_dir.invalidate_all)
 
         return self.runInteraction(
             "delete_all_from_user_dir", _delete_all_from_user_dir_txn
@@ -560,23 +748,19 @@ class UserDirectoryStore(SQLBaseStore):
         """
 
         if self.hs.config.user_directory_search_all_users:
-            # make s.user_id null to keep the ordering algorithm happy
-            join_clause = """
-                CROSS JOIN (SELECT NULL as user_id) AS s
-            """
-            join_args = ()
-            where_clause = "1=1"
+            join_args = (user_id,)
+            where_clause = "user_id != ?"
         else:
-            join_clause = """
-                LEFT JOIN (
-                    SELECT other_user_id AS user_id FROM users_who_share_public_rooms
-                    UNION
-                    SELECT other_user_id AS user_id FROM users_who_share_private_rooms
-                    WHERE user_id = ?
-                ) AS p USING (user_id)
-            """
             join_args = (user_id,)
-            where_clause = "p.user_id IS NOT NULL"
+            where_clause = """
+                (
+                    EXISTS (select 1 from users_in_public_rooms WHERE user_id = t.user_id)
+                    OR EXISTS (
+                        SELECT 1 FROM users_who_share_private_rooms
+                        WHERE user_id = ? AND other_user_id = t.user_id
+                    )
+                )
+            """
 
         if isinstance(self.database_engine, PostgresEngine):
             full_query, exact_query, prefix_query = _parse_query_postgres(search_term)
@@ -588,9 +772,8 @@ class UserDirectoryStore(SQLBaseStore):
             # search: (domain, _, display name, localpart)
             sql = """
                 SELECT d.user_id AS user_id, display_name, avatar_url
-                FROM user_directory_search
+                FROM user_directory_search as t
                 INNER JOIN user_directory AS d USING (user_id)
-                %s
                 WHERE
                     %s
                     AND vector @@ to_tsquery('english', ?)
@@ -617,7 +800,6 @@ class UserDirectoryStore(SQLBaseStore):
                     avatar_url IS NULL
                 LIMIT ?
             """ % (
-                join_clause,
                 where_clause,
             )
             args = join_args + (full_query, exact_query, prefix_query, limit + 1)
@@ -626,9 +808,8 @@ class UserDirectoryStore(SQLBaseStore):
 
             sql = """
                 SELECT d.user_id AS user_id, display_name, avatar_url
-                FROM user_directory_search
+                FROM user_directory_search as t
                 INNER JOIN user_directory AS d USING (user_id)
-                %s
                 WHERE
                     %s
                     AND value MATCH ?
@@ -638,7 +819,6 @@ class UserDirectoryStore(SQLBaseStore):
                     avatar_url IS NULL
                 LIMIT ?
             """ % (
-                join_clause,
                 where_clause,
             )
             args = join_args + (search_query, limit + 1)
diff --git a/synapse/types.py b/synapse/types.py
index d8cb64addb..3de94b6335 100644
--- a/synapse/types.py
+++ b/synapse/types.py
@@ -16,6 +16,8 @@ import re
 import string
 from collections import namedtuple
 
+import attr
+
 from synapse.api.errors import SynapseError
 
 
@@ -455,3 +457,13 @@ class ThirdPartyInstanceID(
     @classmethod
     def create(cls, appservice_id, network_id,):
         return cls(appservice_id=appservice_id, network_id=network_id)
+
+
+@attr.s(slots=True)
+class ReadReceipt(object):
+    """Information about a read-receipt"""
+    room_id = attr.ib()
+    receipt_type = attr.ib()
+    user_id = attr.ib()
+    event_ids = attr.ib()
+    data = attr.ib()