summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--.circleci/config.yml48
-rw-r--r--.dockerignore3
-rw-r--r--MANIFEST.in1
-rw-r--r--changelog.d/3568.feature1
-rw-r--r--changelog.d/3653.feature1
-rw-r--r--changelog.d/3660.misc1
-rw-r--r--changelog.d/3661.bugfix1
-rw-r--r--changelog.d/3669.misc1
-rw-r--r--changelog.d/3670.feature1
-rw-r--r--changelog.d/3681.bugfix1
-rw-r--r--changelog.d/3684.misc1
-rw-r--r--changelog.d/3687.feature1
-rw-r--r--changelog.d/3689.bugfix1
-rw-r--r--changelog.d/3690.misc1
-rw-r--r--changelog.d/3692.bugfix1
-rw-r--r--docker/Dockerfile2
-rw-r--r--docs/workers.rst13
-rw-r--r--synapse/api/auth.py20
-rw-r--r--synapse/api/errors.py16
-rw-r--r--synapse/app/client_reader.py4
-rw-r--r--synapse/app/event_creator.py4
-rw-r--r--synapse/app/federation_reader.py14
-rw-r--r--synapse/app/federation_sender.py4
-rwxr-xr-xsynapse/app/homeserver.py6
-rw-r--r--synapse/app/media_repository.py4
-rw-r--r--synapse/config/server.py4
-rw-r--r--synapse/federation/federation_server.py54
-rw-r--r--synapse/handlers/auth.py24
-rw-r--r--synapse/handlers/deactivate_account.py13
-rw-r--r--synapse/handlers/federation.py74
-rw-r--r--synapse/handlers/identity.py32
-rw-r--r--synapse/handlers/message.py88
-rw-r--r--synapse/handlers/register.py18
-rw-r--r--synapse/handlers/sync.py13
-rw-r--r--synapse/replication/http/__init__.py3
-rw-r--r--synapse/replication/http/federation.py259
-rw-r--r--synapse/replication/slave/storage/transactions.py13
-rw-r--r--synapse/rest/client/v1/admin.py11
-rw-r--r--synapse/rest/client/v1/room.py32
-rw-r--r--synapse/rest/client/v2_alpha/account.py22
-rw-r--r--synapse/server.py6
-rw-r--r--synapse/storage/events.py84
-rw-r--r--synapse/storage/events_worker.py83
-rw-r--r--synapse/storage/monthly_active_users.py56
-rw-r--r--synapse/storage/room.py32
-rw-r--r--synapse/storage/state.py66
-rw-r--r--tests/api/test_auth.py8
-rw-r--r--tests/handlers/test_auth.py38
-rw-r--r--tests/handlers/test_register.py28
-rw-r--r--tests/handlers/test_sync.py71
-rw-r--r--tests/rest/client/v1/test_typing.py66
-rw-r--r--tests/storage/test_client_ips.py12
-rw-r--r--tests/storage/test_monthly_active_users.py15
-rw-r--r--tests/storage/test_state.py2
-rw-r--r--tests/unittest.py144
-rw-r--r--tests/utils.py1
56 files changed, 1235 insertions, 288 deletions
diff --git a/.circleci/config.yml b/.circleci/config.yml
new file mode 100644
index 0000000000..e03f01b837
--- /dev/null
+++ b/.circleci/config.yml
@@ -0,0 +1,48 @@
+version: 2
+jobs:
+  sytestpy2:
+    machine: true
+    steps:
+      - checkout
+      - run: docker pull matrixdotorg/sytest-synapsepy2
+      - run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs matrixdotorg/sytest-synapsepy2
+      - store_artifacts:
+          path: ~/project/logs
+          destination: logs
+  sytestpy2postgres:
+    machine: true
+    steps:
+      - checkout
+      - run: docker pull matrixdotorg/sytest-synapsepy2
+      - run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs -e POSTGRES=1 matrixdotorg/sytest-synapsepy2
+      - store_artifacts:
+          path: ~/project/logs
+          destination: logs
+  sytestpy3:
+    machine: true
+    steps:
+      - checkout
+      - run: docker pull matrixdotorg/sytest-synapsepy3
+      - run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs hawkowl/sytestpy3
+      - store_artifacts:
+          path: ~/project/logs
+          destination: logs
+  sytestpy3postgres:
+    machine: true
+    steps:
+      - checkout
+      - run: docker pull matrixdotorg/sytest-synapsepy3
+      - run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs -e POSTGRES=1 matrixdotorg/sytest-synapsepy3
+      - store_artifacts:
+          path: ~/project/logs
+          destination: logs
+
+workflows:
+  version: 2
+  build:
+    jobs:
+      - sytestpy2
+      - sytestpy2postgres
+# Currently broken while the Python 3 port is incomplete
+#      - sytestpy3
+#      - sytestpy3postgres
diff --git a/.dockerignore b/.dockerignore
index f36f86fbb7..6cdb8532d3 100644
--- a/.dockerignore
+++ b/.dockerignore
@@ -3,3 +3,6 @@ Dockerfile
 .gitignore
 demo/etc
 tox.ini
+synctl
+.git/*
+.tox/*
diff --git a/MANIFEST.in b/MANIFEST.in
index 1ff98d95df..e0826ba544 100644
--- a/MANIFEST.in
+++ b/MANIFEST.in
@@ -36,3 +36,4 @@ recursive-include changelog.d *
 prune .github
 prune demo/etc
 prune docker
+prune .circleci
diff --git a/changelog.d/3568.feature b/changelog.d/3568.feature
new file mode 100644
index 0000000000..247f02ba4e
--- /dev/null
+++ b/changelog.d/3568.feature
@@ -0,0 +1 @@
+speed up /members API and add `at` and `membership` params as per MSC1227
diff --git a/changelog.d/3653.feature b/changelog.d/3653.feature
new file mode 100644
index 0000000000..6c5422994f
--- /dev/null
+++ b/changelog.d/3653.feature
@@ -0,0 +1 @@
+Support more federation endpoints on workers
diff --git a/changelog.d/3660.misc b/changelog.d/3660.misc
new file mode 100644
index 0000000000..acd814c273
--- /dev/null
+++ b/changelog.d/3660.misc
@@ -0,0 +1 @@
+Sytests can now be run inside a Docker container.
diff --git a/changelog.d/3661.bugfix b/changelog.d/3661.bugfix
new file mode 100644
index 0000000000..f2b4703d80
--- /dev/null
+++ b/changelog.d/3661.bugfix
@@ -0,0 +1 @@
+Fix bug on deleting 3pid when using identity servers that don't support unbind API
diff --git a/changelog.d/3669.misc b/changelog.d/3669.misc
new file mode 100644
index 0000000000..fc579ddc60
--- /dev/null
+++ b/changelog.d/3669.misc
@@ -0,0 +1 @@
+Update docker base image from alpine 3.7 to 3.8.
diff --git a/changelog.d/3670.feature b/changelog.d/3670.feature
new file mode 100644
index 0000000000..ba00f2d2ec
--- /dev/null
+++ b/changelog.d/3670.feature
@@ -0,0 +1 @@
+Where server is disabled, block ability for locked out users to read new messages
diff --git a/changelog.d/3681.bugfix b/changelog.d/3681.bugfix
new file mode 100644
index 0000000000..d18a69cd0c
--- /dev/null
+++ b/changelog.d/3681.bugfix
@@ -0,0 +1 @@
+Fixes test_reap_monthly_active_users so it passes under postgres
diff --git a/changelog.d/3684.misc b/changelog.d/3684.misc
new file mode 100644
index 0000000000..4c013263c4
--- /dev/null
+++ b/changelog.d/3684.misc
@@ -0,0 +1 @@
+Implemented a new testing base class to reduce test boilerplate.
diff --git a/changelog.d/3687.feature b/changelog.d/3687.feature
new file mode 100644
index 0000000000..64b89f6411
--- /dev/null
+++ b/changelog.d/3687.feature
@@ -0,0 +1 @@
+set admin uri via config, to be used in error messages where the user should contact the administrator
diff --git a/changelog.d/3689.bugfix b/changelog.d/3689.bugfix
new file mode 100644
index 0000000000..934d039836
--- /dev/null
+++ b/changelog.d/3689.bugfix
@@ -0,0 +1 @@
+Fix mau blocking calulation bug on login
diff --git a/changelog.d/3690.misc b/changelog.d/3690.misc
new file mode 100644
index 0000000000..710add0243
--- /dev/null
+++ b/changelog.d/3690.misc
@@ -0,0 +1 @@
+Rename MAU prometheus metrics
diff --git a/changelog.d/3692.bugfix b/changelog.d/3692.bugfix
new file mode 100644
index 0000000000..f44e13dca1
--- /dev/null
+++ b/changelog.d/3692.bugfix
@@ -0,0 +1 @@
+Fix missing yield in synapse.storage.monthly_active_users.initialise_reserved_users
diff --git a/docker/Dockerfile b/docker/Dockerfile
index 26fb3a6bff..777976217d 100644
--- a/docker/Dockerfile
+++ b/docker/Dockerfile
@@ -1,4 +1,4 @@
-FROM docker.io/python:2-alpine3.7
+FROM docker.io/python:2-alpine3.8
 
 RUN apk add --no-cache --virtual .nacl_deps \
         build-base \
diff --git a/docs/workers.rst b/docs/workers.rst
index c5b37c3ded..ac9efb621f 100644
--- a/docs/workers.rst
+++ b/docs/workers.rst
@@ -173,10 +173,23 @@ endpoints matching the following regular expressions::
     ^/_matrix/federation/v1/backfill/
     ^/_matrix/federation/v1/get_missing_events/
     ^/_matrix/federation/v1/publicRooms
+    ^/_matrix/federation/v1/query/
+    ^/_matrix/federation/v1/make_join/
+    ^/_matrix/federation/v1/make_leave/
+    ^/_matrix/federation/v1/send_join/
+    ^/_matrix/federation/v1/send_leave/
+    ^/_matrix/federation/v1/invite/
+    ^/_matrix/federation/v1/query_auth/
+    ^/_matrix/federation/v1/event_auth/
+    ^/_matrix/federation/v1/exchange_third_party_invite/
+    ^/_matrix/federation/v1/send/
 
 The above endpoints should all be routed to the federation_reader worker by the
 reverse-proxy configuration.
 
+The `^/_matrix/federation/v1/send/` endpoint must only be handled by a single
+instance.
+
 ``synapse.app.federation_sender``
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index 9c62ec4374..3b2a2ab77a 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -775,17 +775,31 @@ class Auth(object):
             )
 
     @defer.inlineCallbacks
-    def check_auth_blocking(self):
+    def check_auth_blocking(self, user_id=None):
         """Checks if the user should be rejected for some external reason,
         such as monthly active user limiting or global disable flag
+
+        Args:
+            user_id(str|None): If present, checks for presence against existing
+            MAU cohort
         """
         if self.hs.config.hs_disabled:
             raise AuthError(
-                403, self.hs.config.hs_disabled_message, errcode=Codes.HS_DISABLED
+                403, self.hs.config.hs_disabled_message,
+                errcode=Codes.RESOURCE_LIMIT_EXCEED,
+                admin_uri=self.hs.config.admin_uri,
             )
         if self.hs.config.limit_usage_by_mau is True:
+            # If the user is already part of the MAU cohort
+            if user_id:
+                timestamp = yield self.store.user_last_seen_monthly_active(user_id)
+                if timestamp:
+                    return
+            # Else if there is no room in the MAU bucket, bail
             current_mau = yield self.store.get_monthly_active_count()
             if current_mau >= self.hs.config.max_mau_value:
                 raise AuthError(
-                    403, "MAU Limit Exceeded", errcode=Codes.MAU_LIMIT_EXCEEDED
+                    403, "Monthly Active User Limits AU Limit Exceeded",
+                    admin_uri=self.hs.config.admin_uri,
+                    errcode=Codes.RESOURCE_LIMIT_EXCEED
                 )
diff --git a/synapse/api/errors.py b/synapse/api/errors.py
index dc3bed5fcb..08f0cb5554 100644
--- a/synapse/api/errors.py
+++ b/synapse/api/errors.py
@@ -56,8 +56,7 @@ class Codes(object):
     SERVER_NOT_TRUSTED = "M_SERVER_NOT_TRUSTED"
     CONSENT_NOT_GIVEN = "M_CONSENT_NOT_GIVEN"
     CANNOT_LEAVE_SERVER_NOTICE_ROOM = "M_CANNOT_LEAVE_SERVER_NOTICE_ROOM"
-    MAU_LIMIT_EXCEEDED = "M_MAU_LIMIT_EXCEEDED"
-    HS_DISABLED = "M_HS_DISABLED"
+    RESOURCE_LIMIT_EXCEED = "M_RESOURCE_LIMIT_EXCEED"
     UNSUPPORTED_ROOM_VERSION = "M_UNSUPPORTED_ROOM_VERSION"
     INCOMPATIBLE_ROOM_VERSION = "M_INCOMPATIBLE_ROOM_VERSION"
 
@@ -225,11 +224,16 @@ class NotFoundError(SynapseError):
 
 class AuthError(SynapseError):
     """An error raised when there was a problem authorising an event."""
+    def __init__(self, code, msg, errcode=Codes.FORBIDDEN, admin_uri=None):
+        self.admin_uri = admin_uri
+        super(AuthError, self).__init__(code, msg, errcode=errcode)
 
-    def __init__(self, *args, **kwargs):
-        if "errcode" not in kwargs:
-            kwargs["errcode"] = Codes.FORBIDDEN
-        super(AuthError, self).__init__(*args, **kwargs)
+    def error_dict(self):
+        return cs_error(
+            self.msg,
+            self.errcode,
+            admin_uri=self.admin_uri,
+        )
 
 
 class EventSizeError(SynapseError):
diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py
index 6b77aec832..ab79a45646 100644
--- a/synapse/app/client_reader.py
+++ b/synapse/app/client_reader.py
@@ -39,7 +39,7 @@ from synapse.replication.slave.storage.events import SlavedEventStore
 from synapse.replication.slave.storage.keys import SlavedKeyStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
 from synapse.replication.slave.storage.room import RoomStore
-from synapse.replication.slave.storage.transactions import TransactionStore
+from synapse.replication.slave.storage.transactions import SlavedTransactionStore
 from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.rest.client.v1.room import (
     JoinedRoomMemberListRestServlet,
@@ -66,7 +66,7 @@ class ClientReaderSlavedStore(
     DirectoryStore,
     SlavedApplicationServiceStore,
     SlavedRegistrationStore,
-    TransactionStore,
+    SlavedTransactionStore,
     SlavedClientIpStore,
     BaseSlavedStore,
 ):
diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py
index a385793dd4..03d39968a8 100644
--- a/synapse/app/event_creator.py
+++ b/synapse/app/event_creator.py
@@ -43,7 +43,7 @@ from synapse.replication.slave.storage.pushers import SlavedPusherStore
 from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
 from synapse.replication.slave.storage.room import RoomStore
-from synapse.replication.slave.storage.transactions import TransactionStore
+from synapse.replication.slave.storage.transactions import SlavedTransactionStore
 from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.rest.client.v1.room import (
     JoinRoomAliasServlet,
@@ -63,7 +63,7 @@ logger = logging.getLogger("synapse.app.event_creator")
 
 class EventCreatorSlavedStore(
     DirectoryStore,
-    TransactionStore,
+    SlavedTransactionStore,
     SlavedProfileStore,
     SlavedAccountDataStore,
     SlavedPusherStore,
diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py
index 57d96c13a2..52522e9d33 100644
--- a/synapse/app/federation_reader.py
+++ b/synapse/app/federation_reader.py
@@ -32,11 +32,16 @@ from synapse.http.site import SynapseSite
 from synapse.metrics import RegistryProxy
 from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
 from synapse.replication.slave.storage._base import BaseSlavedStore
+from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
 from synapse.replication.slave.storage.directory import DirectoryStore
 from synapse.replication.slave.storage.events import SlavedEventStore
 from synapse.replication.slave.storage.keys import SlavedKeyStore
+from synapse.replication.slave.storage.profile import SlavedProfileStore
+from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
+from synapse.replication.slave.storage.pushers import SlavedPusherStore
+from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
 from synapse.replication.slave.storage.room import RoomStore
-from synapse.replication.slave.storage.transactions import TransactionStore
+from synapse.replication.slave.storage.transactions import SlavedTransactionStore
 from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
@@ -49,11 +54,16 @@ logger = logging.getLogger("synapse.app.federation_reader")
 
 
 class FederationReaderSlavedStore(
+    SlavedProfileStore,
+    SlavedApplicationServiceStore,
+    SlavedPusherStore,
+    SlavedPushRuleStore,
+    SlavedReceiptsStore,
     SlavedEventStore,
     SlavedKeyStore,
     RoomStore,
     DirectoryStore,
-    TransactionStore,
+    SlavedTransactionStore,
     BaseSlavedStore,
 ):
     pass
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 7bbf0ad082..7a4310ca18 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -36,7 +36,7 @@ from synapse.replication.slave.storage.events import SlavedEventStore
 from synapse.replication.slave.storage.presence import SlavedPresenceStore
 from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
-from synapse.replication.slave.storage.transactions import TransactionStore
+from synapse.replication.slave.storage.transactions import SlavedTransactionStore
 from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
@@ -50,7 +50,7 @@ logger = logging.getLogger("synapse.app.federation_sender")
 
 
 class FederationSenderSlaveStore(
-    SlavedDeviceInboxStore, TransactionStore, SlavedReceiptsStore, SlavedEventStore,
+    SlavedDeviceInboxStore, SlavedTransactionStore, SlavedReceiptsStore, SlavedEventStore,
     SlavedRegistrationStore, SlavedDeviceStore, SlavedPresenceStore,
 ):
     def __init__(self, db_conn, hs):
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 37a9b126a5..a98bb506e5 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -303,8 +303,8 @@ class SynapseHomeServer(HomeServer):
 
 
 # Gauges to expose monthly active user control metrics
-current_mau_gauge = Gauge("synapse_admin_current_mau", "Current MAU")
-max_mau_value_gauge = Gauge("synapse_admin_max_mau_value", "MAU Limit")
+current_mau_gauge = Gauge("synapse_admin_mau:current", "Current MAU")
+max_mau_gauge = Gauge("synapse_admin_mau:max", "MAU Limit")
 
 
 def setup(config_options):
@@ -532,7 +532,7 @@ def run(hs):
         if hs.config.limit_usage_by_mau:
             count = yield hs.get_datastore().get_monthly_active_count()
         current_mau_gauge.set(float(count))
-        max_mau_value_gauge.set(float(hs.config.max_mau_value))
+        max_mau_gauge.set(float(hs.config.max_mau_value))
 
     hs.get_datastore().initialise_reserved_users(
         hs.config.mau_limits_reserved_threepids
diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py
index 1423056732..fd1f6cbf7e 100644
--- a/synapse/app/media_repository.py
+++ b/synapse/app/media_repository.py
@@ -34,7 +34,7 @@ from synapse.replication.slave.storage._base import BaseSlavedStore
 from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
 from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
-from synapse.replication.slave.storage.transactions import TransactionStore
+from synapse.replication.slave.storage.transactions import SlavedTransactionStore
 from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.rest.media.v0.content_repository import ContentRepoResource
 from synapse.server import HomeServer
@@ -52,7 +52,7 @@ class MediaRepositorySlavedStore(
     SlavedApplicationServiceStore,
     SlavedRegistrationStore,
     SlavedClientIpStore,
-    TransactionStore,
+    SlavedTransactionStore,
     BaseSlavedStore,
     MediaRepositoryStore,
 ):
diff --git a/synapse/config/server.py b/synapse/config/server.py
index 3b078d72ca..2190f3210a 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -82,6 +82,10 @@ class ServerConfig(Config):
         self.hs_disabled = config.get("hs_disabled", False)
         self.hs_disabled_message = config.get("hs_disabled_message", "")
 
+        # Admin uri to direct users at should their instance become blocked
+        # due to resource constraints
+        self.admin_uri = config.get("admin_uri", None)
+
         # FIXME: federation_domain_whitelist needs sytests
         self.federation_domain_whitelist = None
         federation_domain_whitelist = config.get(
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index a23136784a..3e0cd294a1 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -39,6 +39,10 @@ from synapse.federation.federation_base import FederationBase, event_from_pdu_js
 from synapse.federation.persistence import TransactionActions
 from synapse.federation.units import Edu, Transaction
 from synapse.http.endpoint import parse_server_name
+from synapse.replication.http.federation import (
+    ReplicationFederationSendEduRestServlet,
+    ReplicationGetQueryRestServlet,
+)
 from synapse.types import get_domain_from_id
 from synapse.util.async_helpers import Linearizer, concurrently_execute
 from synapse.util.caches.response_cache import ResponseCache
@@ -760,6 +764,8 @@ class FederationHandlerRegistry(object):
         if edu_type in self.edu_handlers:
             raise KeyError("Already have an EDU handler for %s" % (edu_type,))
 
+        logger.info("Registering federation EDU handler for %r", edu_type)
+
         self.edu_handlers[edu_type] = handler
 
     def register_query_handler(self, query_type, handler):
@@ -778,6 +784,8 @@ class FederationHandlerRegistry(object):
                 "Already have a Query handler for %s" % (query_type,)
             )
 
+        logger.info("Registering federation query handler for %r", query_type)
+
         self.query_handlers[query_type] = handler
 
     @defer.inlineCallbacks
@@ -800,3 +808,49 @@ class FederationHandlerRegistry(object):
             raise NotFoundError("No handler for Query type '%s'" % (query_type,))
 
         return handler(args)
+
+
+class ReplicationFederationHandlerRegistry(FederationHandlerRegistry):
+    """A FederationHandlerRegistry for worker processes.
+
+    When receiving EDU or queries it will check if an appropriate handler has
+    been registered on the worker, if there isn't one then it calls off to the
+    master process.
+    """
+
+    def __init__(self, hs):
+        self.config = hs.config
+        self.http_client = hs.get_simple_http_client()
+        self.clock = hs.get_clock()
+
+        self._get_query_client = ReplicationGetQueryRestServlet.make_client(hs)
+        self._send_edu = ReplicationFederationSendEduRestServlet.make_client(hs)
+
+        super(ReplicationFederationHandlerRegistry, self).__init__()
+
+    def on_edu(self, edu_type, origin, content):
+        """Overrides FederationHandlerRegistry
+        """
+        handler = self.edu_handlers.get(edu_type)
+        if handler:
+            return super(ReplicationFederationHandlerRegistry, self).on_edu(
+                edu_type, origin, content,
+            )
+
+        return self._send_edu(
+                edu_type=edu_type,
+                origin=origin,
+                content=content,
+        )
+
+    def on_query(self, query_type, args):
+        """Overrides FederationHandlerRegistry
+        """
+        handler = self.query_handlers.get(query_type)
+        if handler:
+            return handler(args)
+
+        return self._get_query_client(
+                query_type=query_type,
+                args=args,
+        )
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 7ea8ce9f94..4a81bd2ba9 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -520,7 +520,7 @@ class AuthHandler(BaseHandler):
         """
         logger.info("Logging in user %s on device %s", user_id, device_id)
         access_token = yield self.issue_access_token(user_id, device_id)
-        yield self.auth.check_auth_blocking()
+        yield self.auth.check_auth_blocking(user_id)
 
         # the device *should* have been registered before we got here; however,
         # it's possible we raced against a DELETE operation. The thing we
@@ -734,7 +734,6 @@ class AuthHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def validate_short_term_login_token_and_get_user_id(self, login_token):
-        yield self.auth.check_auth_blocking()
         auth_api = self.hs.get_auth()
         user_id = None
         try:
@@ -743,6 +742,7 @@ class AuthHandler(BaseHandler):
             auth_api.validate_macaroon(macaroon, "login", True, user_id)
         except Exception:
             raise AuthError(403, "Invalid token", errcode=Codes.FORBIDDEN)
+        yield self.auth.check_auth_blocking(user_id)
         defer.returnValue(user_id)
 
     @defer.inlineCallbacks
@@ -828,12 +828,26 @@ class AuthHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def delete_threepid(self, user_id, medium, address):
+        """Attempts to unbind the 3pid on the identity servers and deletes it
+        from the local database.
+
+        Args:
+            user_id (str)
+            medium (str)
+            address (str)
+
+        Returns:
+            Deferred[bool]: Returns True if successfully unbound the 3pid on
+            the identity server, False if identity server doesn't support the
+            unbind API.
+        """
+
         # 'Canonicalise' email addresses as per above
         if medium == 'email':
             address = address.lower()
 
         identity_handler = self.hs.get_handlers().identity_handler
-        yield identity_handler.unbind_threepid(
+        result = yield identity_handler.try_unbind_threepid(
             user_id,
             {
                 'medium': medium,
@@ -841,10 +855,10 @@ class AuthHandler(BaseHandler):
             },
         )
 
-        ret = yield self.store.user_delete_threepid(
+        yield self.store.user_delete_threepid(
             user_id, medium, address,
         )
-        defer.returnValue(ret)
+        defer.returnValue(result)
 
     def _save_session(self, session):
         # TODO: Persistent storage
diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py
index b3c5a9ee64..b078df4a76 100644
--- a/synapse/handlers/deactivate_account.py
+++ b/synapse/handlers/deactivate_account.py
@@ -51,7 +51,8 @@ class DeactivateAccountHandler(BaseHandler):
             erase_data (bool): whether to GDPR-erase the user's data
 
         Returns:
-            Deferred
+            Deferred[bool]: True if identity server supports removing
+            threepids, otherwise False.
         """
         # FIXME: Theoretically there is a race here wherein user resets
         # password using threepid.
@@ -60,16 +61,22 @@ class DeactivateAccountHandler(BaseHandler):
         # leave the user still active so they can try again.
         # Ideally we would prevent password resets and then do this in the
         # background thread.
+
+        # This will be set to false if the identity server doesn't support
+        # unbinding
+        identity_server_supports_unbinding = True
+
         threepids = yield self.store.user_get_threepids(user_id)
         for threepid in threepids:
             try:
-                yield self._identity_handler.unbind_threepid(
+                result = yield self._identity_handler.try_unbind_threepid(
                     user_id,
                     {
                         'medium': threepid['medium'],
                         'address': threepid['address'],
                     },
                 )
+                identity_server_supports_unbinding &= result
             except Exception:
                 # Do we want this to be a fatal error or should we carry on?
                 logger.exception("Failed to remove threepid from ID server")
@@ -103,6 +110,8 @@ class DeactivateAccountHandler(BaseHandler):
         # parts users from rooms (if it isn't already running)
         self._start_user_parting()
 
+        defer.returnValue(identity_server_supports_unbinding)
+
     def _start_user_parting(self):
         """
         Start the process that goes through the table of users
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 2380d17f4e..f38b393e4a 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -49,6 +49,11 @@ from synapse.crypto.event_signing import (
     compute_event_signature,
 )
 from synapse.events.validator import EventValidator
+from synapse.replication.http.federation import (
+    ReplicationCleanRoomRestServlet,
+    ReplicationFederationSendEventsRestServlet,
+)
+from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet
 from synapse.state import resolve_events_with_factory
 from synapse.types import UserID, get_domain_from_id
 from synapse.util import logcontext, unwrapFirstError
@@ -91,6 +96,18 @@ class FederationHandler(BaseHandler):
         self.spam_checker = hs.get_spam_checker()
         self.event_creation_handler = hs.get_event_creation_handler()
         self._server_notices_mxid = hs.config.server_notices_mxid
+        self.config = hs.config
+        self.http_client = hs.get_simple_http_client()
+
+        self._send_events_to_master = (
+            ReplicationFederationSendEventsRestServlet.make_client(hs)
+        )
+        self._notify_user_membership_change = (
+            ReplicationUserJoinedLeftRoomRestServlet.make_client(hs)
+        )
+        self._clean_room_for_join_client = (
+            ReplicationCleanRoomRestServlet.make_client(hs)
+        )
 
         # When joining a room we need to queue any events for that room up
         self.room_queues = {}
@@ -1158,7 +1175,7 @@ class FederationHandler(BaseHandler):
         )
 
         context = yield self.state_handler.compute_event_context(event)
-        yield self._persist_events([(event, context)])
+        yield self.persist_events_and_notify([(event, context)])
 
         defer.returnValue(event)
 
@@ -1189,7 +1206,7 @@ class FederationHandler(BaseHandler):
         )
 
         context = yield self.state_handler.compute_event_context(event)
-        yield self._persist_events([(event, context)])
+        yield self.persist_events_and_notify([(event, context)])
 
         defer.returnValue(event)
 
@@ -1432,7 +1449,7 @@ class FederationHandler(BaseHandler):
                     event, context
                 )
 
-            yield self._persist_events(
+            yield self.persist_events_and_notify(
                 [(event, context)],
                 backfilled=backfilled,
             )
@@ -1470,7 +1487,7 @@ class FederationHandler(BaseHandler):
             ], consumeErrors=True,
         ))
 
-        yield self._persist_events(
+        yield self.persist_events_and_notify(
             [
                 (ev_info["event"], context)
                 for ev_info, context in zip(event_infos, contexts)
@@ -1558,7 +1575,7 @@ class FederationHandler(BaseHandler):
                     raise
                 events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR
 
-        yield self._persist_events(
+        yield self.persist_events_and_notify(
             [
                 (e, events_to_context[e.event_id])
                 for e in itertools.chain(auth_events, state)
@@ -1569,7 +1586,7 @@ class FederationHandler(BaseHandler):
             event, old_state=state
         )
 
-        yield self._persist_events(
+        yield self.persist_events_and_notify(
             [(event, new_event_context)],
         )
 
@@ -2297,7 +2314,7 @@ class FederationHandler(BaseHandler):
                 for revocation.
         """
         try:
-            response = yield self.hs.get_simple_http_client().get_json(
+            response = yield self.http_client.get_json(
                 url,
                 {"public_key": public_key}
             )
@@ -2310,7 +2327,7 @@ class FederationHandler(BaseHandler):
             raise AuthError(403, "Third party certificate was invalid")
 
     @defer.inlineCallbacks
-    def _persist_events(self, event_and_contexts, backfilled=False):
+    def persist_events_and_notify(self, event_and_contexts, backfilled=False):
         """Persists events and tells the notifier/pushers about them, if
         necessary.
 
@@ -2322,14 +2339,21 @@ class FederationHandler(BaseHandler):
         Returns:
             Deferred
         """
-        max_stream_id = yield self.store.persist_events(
-            event_and_contexts,
-            backfilled=backfilled,
-        )
+        if self.config.worker_app:
+            yield self._send_events_to_master(
+                store=self.store,
+                event_and_contexts=event_and_contexts,
+                backfilled=backfilled
+            )
+        else:
+            max_stream_id = yield self.store.persist_events(
+                event_and_contexts,
+                backfilled=backfilled,
+            )
 
-        if not backfilled:  # Never notify for backfilled events
-            for event, _ in event_and_contexts:
-                self._notify_persisted_event(event, max_stream_id)
+            if not backfilled:  # Never notify for backfilled events
+                for event, _ in event_and_contexts:
+                    self._notify_persisted_event(event, max_stream_id)
 
     def _notify_persisted_event(self, event, max_stream_id):
         """Checks to see if notifier/pushers should be notified about the
@@ -2368,9 +2392,25 @@ class FederationHandler(BaseHandler):
         )
 
     def _clean_room_for_join(self, room_id):
-        return self.store.clean_room_for_join(room_id)
+        """Called to clean up any data in DB for a given room, ready for the
+        server to join the room.
+
+        Args:
+            room_id (str)
+        """
+        if self.config.worker_app:
+            return self._clean_room_for_join_client(room_id)
+        else:
+            return self.store.clean_room_for_join(room_id)
 
     def user_joined_room(self, user, room_id):
         """Called when a new user has joined the room
         """
-        return user_joined_room(self.distributor, user, room_id)
+        if self.config.worker_app:
+            return self._notify_user_membership_change(
+                room_id=room_id,
+                user_id=user.to_string(),
+                change="joined",
+            )
+        else:
+            return user_joined_room(self.distributor, user, room_id)
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index 1d36d967c3..5feb3f22a6 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -137,15 +137,19 @@ class IdentityHandler(BaseHandler):
         defer.returnValue(data)
 
     @defer.inlineCallbacks
-    def unbind_threepid(self, mxid, threepid):
-        """
-        Removes a binding from an identity server
+    def try_unbind_threepid(self, mxid, threepid):
+        """Removes a binding from an identity server
+
         Args:
             mxid (str): Matrix user ID of binding to be removed
             threepid (dict): Dict with medium & address of binding to be removed
 
+        Raises:
+            SynapseError: If we failed to contact the identity server
+
         Returns:
-            Deferred[bool]: True on success, otherwise False
+            Deferred[bool]: True on success, otherwise False if the identity
+            server doesn't support unbinding
         """
         logger.debug("unbinding threepid %r from %s", threepid, mxid)
         if not self.trusted_id_servers:
@@ -175,11 +179,21 @@ class IdentityHandler(BaseHandler):
             content=content,
             destination_is=id_server,
         )
-        yield self.http_client.post_json_get_json(
-            url,
-            content,
-            headers,
-        )
+        try:
+            yield self.http_client.post_json_get_json(
+                url,
+                content,
+                headers,
+            )
+        except HttpResponseException as e:
+            if e.code in (400, 404, 501,):
+                # The remote server probably doesn't support unbinding (yet)
+                logger.warn("Received %d response while unbinding threepid", e.code)
+                defer.returnValue(False)
+            else:
+                logger.error("Failed to unbind threepid on identity server: %s", e)
+                raise SynapseError(502, "Failed to contact identity server")
+
         defer.returnValue(True)
 
     @defer.inlineCallbacks
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 01a362360e..893c9bcdc4 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -25,7 +25,13 @@ from twisted.internet import defer
 from twisted.internet.defer import succeed
 
 from synapse.api.constants import MAX_DEPTH, EventTypes, Membership
-from synapse.api.errors import AuthError, Codes, ConsentNotGivenError, SynapseError
+from synapse.api.errors import (
+    AuthError,
+    Codes,
+    ConsentNotGivenError,
+    NotFoundError,
+    SynapseError,
+)
 from synapse.api.urls import ConsentURIBuilder
 from synapse.crypto.event_signing import add_hashes_and_signatures
 from synapse.events.utils import serialize_event
@@ -36,6 +42,7 @@ from synapse.util.async_helpers import Linearizer
 from synapse.util.frozenutils import frozendict_json_encoder
 from synapse.util.logcontext import run_in_background
 from synapse.util.metrics import measure_func
+from synapse.visibility import filter_events_for_client
 
 from ._base import BaseHandler
 
@@ -82,28 +89,85 @@ class MessageHandler(object):
         defer.returnValue(data)
 
     @defer.inlineCallbacks
-    def get_state_events(self, user_id, room_id, is_guest=False):
+    def get_state_events(
+        self, user_id, room_id, types=None, filtered_types=None,
+        at_token=None, is_guest=False,
+    ):
         """Retrieve all state events for a given room. If the user is
         joined to the room then return the current state. If the user has
-        left the room return the state events from when they left.
+        left the room return the state events from when they left. If an explicit
+        'at' parameter is passed, return the state events as of that event, if
+        visible.
 
         Args:
             user_id(str): The user requesting state events.
             room_id(str): The room ID to get all state events from.
+            types(list[(str, str|None)]|None): List of (type, state_key) tuples
+                which are used to filter the state fetched. If `state_key` is None,
+                all events are returned of the given type.
+                May be None, which matches any key.
+            filtered_types(list[str]|None): Only apply filtering via `types` to this
+                list of event types.  Other types of events are returned unfiltered.
+                If None, `types` filtering is applied to all events.
+            at_token(StreamToken|None): the stream token of the at which we are requesting
+                the stats. If the user is not allowed to view the state as of that
+                stream token, we raise a 403 SynapseError. If None, returns the current
+                state based on the current_state_events table.
+            is_guest(bool): whether this user is a guest
         Returns:
             A list of dicts representing state events. [{}, {}, {}]
+        Raises:
+            NotFoundError (404) if the at token does not yield an event
+
+            AuthError (403) if the user doesn't have permission to view
+            members of this room.
         """
-        membership, membership_event_id = yield self.auth.check_in_room_or_world_readable(
-            room_id, user_id
-        )
+        if at_token:
+            # FIXME this claims to get the state at a stream position, but
+            # get_recent_events_for_room operates by topo ordering. This therefore
+            # does not reliably give you the state at the given stream position.
+            # (https://github.com/matrix-org/synapse/issues/3305)
+            last_events, _ = yield self.store.get_recent_events_for_room(
+                room_id, end_token=at_token.room_key, limit=1,
+            )
 
-        if membership == Membership.JOIN:
-            room_state = yield self.state.get_current_state(room_id)
-        elif membership == Membership.LEAVE:
-            room_state = yield self.store.get_state_for_events(
-                [membership_event_id], None
+            if not last_events:
+                raise NotFoundError("Can't find event for token %s" % (at_token, ))
+
+            visible_events = yield filter_events_for_client(
+                self.store, user_id, last_events,
+            )
+
+            event = last_events[0]
+            if visible_events:
+                room_state = yield self.store.get_state_for_events(
+                    [event.event_id], types, filtered_types=filtered_types,
+                )
+                room_state = room_state[event.event_id]
+            else:
+                raise AuthError(
+                    403,
+                    "User %s not allowed to view events in room %s at token %s" % (
+                        user_id, room_id, at_token,
+                    )
+                )
+        else:
+            membership, membership_event_id = (
+                yield self.auth.check_in_room_or_world_readable(
+                    room_id, user_id,
+                )
             )
-            room_state = room_state[membership_event_id]
+
+            if membership == Membership.JOIN:
+                state_ids = yield self.store.get_filtered_current_state_ids(
+                    room_id, types, filtered_types=filtered_types,
+                )
+                room_state = yield self.store.get_events(state_ids.values())
+            elif membership == Membership.LEAVE:
+                room_state = yield self.store.get_state_for_events(
+                    [membership_event_id], types, filtered_types=filtered_types,
+                )
+                room_state = room_state[membership_event_id]
 
         now = self.clock.time_msec()
         defer.returnValue(
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 3526b20d5a..f03ee1476b 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -144,7 +144,8 @@ class RegistrationHandler(BaseHandler):
         Raises:
             RegistrationError if there was a problem registering.
         """
-        yield self._check_mau_limits()
+
+        yield self.auth.check_auth_blocking()
         password_hash = None
         if password:
             password_hash = yield self.auth_handler().hash(password)
@@ -289,7 +290,7 @@ class RegistrationHandler(BaseHandler):
                 400,
                 "User ID can only contain characters a-z, 0-9, or '=_-./'",
             )
-        yield self._check_mau_limits()
+        yield self.auth.check_auth_blocking()
         user = UserID(localpart, self.hs.hostname)
         user_id = user.to_string()
 
@@ -439,7 +440,7 @@ class RegistrationHandler(BaseHandler):
         """
         if localpart is None:
             raise SynapseError(400, "Request must include user id")
-        yield self._check_mau_limits()
+        yield self.auth.check_auth_blocking()
         need_register = True
 
         try:
@@ -533,14 +534,3 @@ class RegistrationHandler(BaseHandler):
             remote_room_hosts=remote_room_hosts,
             action="join",
         )
-
-    @defer.inlineCallbacks
-    def _check_mau_limits(self):
-        """
-        Do not accept registrations if monthly active user limits exceeded
-         and limiting is enabled
-        """
-        try:
-            yield self.auth.check_auth_blocking()
-        except AuthError as e:
-            raise RegistrationError(e.code, str(e), e.errcode)
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 6393a9674b..3b21a04a5d 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -191,6 +191,7 @@ class SyncHandler(object):
         self.clock = hs.get_clock()
         self.response_cache = ResponseCache(hs, "sync")
         self.state = hs.get_state_handler()
+        self.auth = hs.get_auth()
 
         # ExpiringCache((User, Device)) -> LruCache(state_key => event_id)
         self.lazy_loaded_members_cache = ExpiringCache(
@@ -198,19 +199,27 @@ class SyncHandler(object):
             max_len=0, expiry_ms=LAZY_LOADED_MEMBERS_CACHE_MAX_AGE,
         )
 
+    @defer.inlineCallbacks
     def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0,
                                full_state=False):
         """Get the sync for a client if we have new data for it now. Otherwise
         wait for new data to arrive on the server. If the timeout expires, then
         return an empty sync result.
         Returns:
-            A Deferred SyncResult.
+            Deferred[SyncResult]
         """
-        return self.response_cache.wrap(
+        # If the user is not part of the mau group, then check that limits have
+        # not been exceeded (if not part of the group by this point, almost certain
+        # auth_blocking will occur)
+        user_id = sync_config.user.to_string()
+        yield self.auth.check_auth_blocking(user_id)
+
+        res = yield self.response_cache.wrap(
             sync_config.request_key,
             self._wait_for_sync_for_user,
             sync_config, since_token, timeout, full_state,
         )
+        defer.returnValue(res)
 
     @defer.inlineCallbacks
     def _wait_for_sync_for_user(self, sync_config, since_token, timeout,
diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py
index 589ee94c66..19f214281e 100644
--- a/synapse/replication/http/__init__.py
+++ b/synapse/replication/http/__init__.py
@@ -14,7 +14,7 @@
 # limitations under the License.
 
 from synapse.http.server import JsonResource
-from synapse.replication.http import membership, send_event
+from synapse.replication.http import federation, membership, send_event
 
 REPLICATION_PREFIX = "/_synapse/replication"
 
@@ -27,3 +27,4 @@ class ReplicationRestResource(JsonResource):
     def register_servlets(self, hs):
         send_event.register_servlets(hs, self)
         membership.register_servlets(hs, self)
+        federation.register_servlets(hs, self)
diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py
new file mode 100644
index 0000000000..2ddd18f73b
--- /dev/null
+++ b/synapse/replication/http/federation.py
@@ -0,0 +1,259 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from twisted.internet import defer
+
+from synapse.events import FrozenEvent
+from synapse.events.snapshot import EventContext
+from synapse.http.servlet import parse_json_object_from_request
+from synapse.replication.http._base import ReplicationEndpoint
+from synapse.util.metrics import Measure
+
+logger = logging.getLogger(__name__)
+
+
+class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
+    """Handles events newly received from federation, including persisting and
+    notifying.
+
+    The API looks like:
+
+        POST /_synapse/replication/fed_send_events/:txn_id
+
+        {
+            "events": [{
+                "event": { .. serialized event .. },
+                "internal_metadata": { .. serialized internal_metadata .. },
+                "rejected_reason": ..,   // The event.rejected_reason field
+                "context": { .. serialized event context .. },
+            }],
+            "backfilled": false
+    """
+
+    NAME = "fed_send_events"
+    PATH_ARGS = ()
+
+    def __init__(self, hs):
+        super(ReplicationFederationSendEventsRestServlet, self).__init__(hs)
+
+        self.store = hs.get_datastore()
+        self.clock = hs.get_clock()
+        self.federation_handler = hs.get_handlers().federation_handler
+
+    @staticmethod
+    @defer.inlineCallbacks
+    def _serialize_payload(store, event_and_contexts, backfilled):
+        """
+        Args:
+            store
+            event_and_contexts (list[tuple[FrozenEvent, EventContext]])
+            backfilled (bool): Whether or not the events are the result of
+                backfilling
+        """
+        event_payloads = []
+        for event, context in event_and_contexts:
+            serialized_context = yield context.serialize(event, store)
+
+            event_payloads.append({
+                "event": event.get_pdu_json(),
+                "internal_metadata": event.internal_metadata.get_dict(),
+                "rejected_reason": event.rejected_reason,
+                "context": serialized_context,
+            })
+
+        payload = {
+            "events": event_payloads,
+            "backfilled": backfilled,
+        }
+
+        defer.returnValue(payload)
+
+    @defer.inlineCallbacks
+    def _handle_request(self, request):
+        with Measure(self.clock, "repl_fed_send_events_parse"):
+            content = parse_json_object_from_request(request)
+
+            backfilled = content["backfilled"]
+
+            event_payloads = content["events"]
+
+            event_and_contexts = []
+            for event_payload in event_payloads:
+                event_dict = event_payload["event"]
+                internal_metadata = event_payload["internal_metadata"]
+                rejected_reason = event_payload["rejected_reason"]
+                event = FrozenEvent(event_dict, internal_metadata, rejected_reason)
+
+                context = yield EventContext.deserialize(
+                    self.store, event_payload["context"],
+                )
+
+                event_and_contexts.append((event, context))
+
+        logger.info(
+            "Got %d events from federation",
+            len(event_and_contexts),
+        )
+
+        yield self.federation_handler.persist_events_and_notify(
+            event_and_contexts, backfilled,
+        )
+
+        defer.returnValue((200, {}))
+
+
+class ReplicationFederationSendEduRestServlet(ReplicationEndpoint):
+    """Handles EDUs newly received from federation, including persisting and
+    notifying.
+
+    Request format:
+
+        POST /_synapse/replication/fed_send_edu/:edu_type/:txn_id
+
+        {
+            "origin": ...,
+            "content: { ... }
+        }
+    """
+
+    NAME = "fed_send_edu"
+    PATH_ARGS = ("edu_type",)
+
+    def __init__(self, hs):
+        super(ReplicationFederationSendEduRestServlet, self).__init__(hs)
+
+        self.store = hs.get_datastore()
+        self.clock = hs.get_clock()
+        self.registry = hs.get_federation_registry()
+
+    @staticmethod
+    def _serialize_payload(edu_type, origin, content):
+        return {
+            "origin": origin,
+            "content": content,
+        }
+
+    @defer.inlineCallbacks
+    def _handle_request(self, request, edu_type):
+        with Measure(self.clock, "repl_fed_send_edu_parse"):
+            content = parse_json_object_from_request(request)
+
+            origin = content["origin"]
+            edu_content = content["content"]
+
+        logger.info(
+            "Got %r edu from $s",
+            edu_type, origin,
+        )
+
+        result = yield self.registry.on_edu(edu_type, origin, edu_content)
+
+        defer.returnValue((200, result))
+
+
+class ReplicationGetQueryRestServlet(ReplicationEndpoint):
+    """Handle responding to queries from federation.
+
+    Request format:
+
+        POST /_synapse/replication/fed_query/:query_type
+
+        {
+            "args": { ... }
+        }
+    """
+
+    NAME = "fed_query"
+    PATH_ARGS = ("query_type",)
+
+    # This is a query, so let's not bother caching
+    CACHE = False
+
+    def __init__(self, hs):
+        super(ReplicationGetQueryRestServlet, self).__init__(hs)
+
+        self.store = hs.get_datastore()
+        self.clock = hs.get_clock()
+        self.registry = hs.get_federation_registry()
+
+    @staticmethod
+    def _serialize_payload(query_type, args):
+        """
+        Args:
+            query_type (str)
+            args (dict): The arguments received for the given query type
+        """
+        return {
+            "args": args,
+        }
+
+    @defer.inlineCallbacks
+    def _handle_request(self, request, query_type):
+        with Measure(self.clock, "repl_fed_query_parse"):
+            content = parse_json_object_from_request(request)
+
+            args = content["args"]
+
+        logger.info(
+            "Got %r query",
+            query_type,
+        )
+
+        result = yield self.registry.on_query(query_type, args)
+
+        defer.returnValue((200, result))
+
+
+class ReplicationCleanRoomRestServlet(ReplicationEndpoint):
+    """Called to clean up any data in DB for a given room, ready for the
+    server to join the room.
+
+    Request format:
+
+        POST /_synapse/replication/fed_query/:fed_cleanup_room/:txn_id
+
+        {}
+    """
+
+    NAME = "fed_cleanup_room"
+    PATH_ARGS = ("room_id",)
+
+    def __init__(self, hs):
+        super(ReplicationCleanRoomRestServlet, self).__init__(hs)
+
+        self.store = hs.get_datastore()
+
+    @staticmethod
+    def _serialize_payload(room_id, args):
+        """
+        Args:
+            room_id (str)
+        """
+        return {}
+
+    @defer.inlineCallbacks
+    def _handle_request(self, request, room_id):
+        yield self.store.clean_room_for_join(room_id)
+
+        defer.returnValue((200, {}))
+
+
+def register_servlets(hs, http_server):
+    ReplicationFederationSendEventsRestServlet(hs).register(http_server)
+    ReplicationFederationSendEduRestServlet(hs).register(http_server)
+    ReplicationGetQueryRestServlet(hs).register(http_server)
+    ReplicationCleanRoomRestServlet(hs).register(http_server)
diff --git a/synapse/replication/slave/storage/transactions.py b/synapse/replication/slave/storage/transactions.py
index 9c9a5eadd9..3527beb3c9 100644
--- a/synapse/replication/slave/storage/transactions.py
+++ b/synapse/replication/slave/storage/transactions.py
@@ -13,19 +13,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from synapse.storage import DataStore
 from synapse.storage.transactions import TransactionStore
 
 from ._base import BaseSlavedStore
 
 
-class TransactionStore(BaseSlavedStore):
-    get_destination_retry_timings = TransactionStore.__dict__[
-        "get_destination_retry_timings"
-    ]
-    _get_destination_retry_timings = DataStore._get_destination_retry_timings.__func__
-    set_destination_retry_timings = DataStore.set_destination_retry_timings.__func__
-    _set_destination_retry_timings = DataStore._set_destination_retry_timings.__func__
-
-    prep_send_transaction = DataStore.prep_send_transaction.__func__
-    delivered_txn = DataStore.delivered_txn.__func__
+class SlavedTransactionStore(TransactionStore, BaseSlavedStore):
+    pass
diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py
index 80d625eecc..ad536ab570 100644
--- a/synapse/rest/client/v1/admin.py
+++ b/synapse/rest/client/v1/admin.py
@@ -391,10 +391,17 @@ class DeactivateAccountRestServlet(ClientV1RestServlet):
         if not is_admin:
             raise AuthError(403, "You are not a server admin")
 
-        yield self._deactivate_account_handler.deactivate_account(
+        result = yield self._deactivate_account_handler.deactivate_account(
             target_user_id, erase,
         )
-        defer.returnValue((200, {}))
+        if result:
+            id_server_unbind_result = "success"
+        else:
+            id_server_unbind_result = "no-support"
+
+        defer.returnValue((200, {
+            "id_server_unbind_result": id_server_unbind_result,
+        }))
 
 
 class ShutdownRoomRestServlet(ClientV1RestServlet):
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index fa5989e74e..fcc1091760 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -34,7 +34,7 @@ from synapse.http.servlet import (
     parse_string,
 )
 from synapse.streams.config import PaginationConfig
-from synapse.types import RoomAlias, RoomID, ThirdPartyInstanceID, UserID
+from synapse.types import RoomAlias, RoomID, StreamToken, ThirdPartyInstanceID, UserID
 
 from .base import ClientV1RestServlet, client_path_patterns
 
@@ -384,15 +384,39 @@ class RoomMemberListRestServlet(ClientV1RestServlet):
     def on_GET(self, request, room_id):
         # TODO support Pagination stream API (limit/tokens)
         requester = yield self.auth.get_user_by_req(request)
-        events = yield self.message_handler.get_state_events(
+        handler = self.message_handler
+
+        # request the state as of a given event, as identified by a stream token,
+        # for consistency with /messages etc.
+        # useful for getting the membership in retrospect as of a given /sync
+        # response.
+        at_token_string = parse_string(request, "at")
+        if at_token_string is None:
+            at_token = None
+        else:
+            at_token = StreamToken.from_string(at_token_string)
+
+        # let you filter down on particular memberships.
+        # XXX: this may not be the best shape for this API - we could pass in a filter
+        # instead, except filters aren't currently aware of memberships.
+        # See https://github.com/matrix-org/matrix-doc/issues/1337 for more details.
+        membership = parse_string(request, "membership")
+        not_membership = parse_string(request, "not_membership")
+
+        events = yield handler.get_state_events(
             room_id=room_id,
             user_id=requester.user.to_string(),
+            at_token=at_token,
+            types=[(EventTypes.Member, None)],
         )
 
         chunk = []
 
         for event in events:
-            if event["type"] != EventTypes.Member:
+            if (
+                (membership and event['content'].get("membership") != membership) or
+                (not_membership and event['content'].get("membership") == not_membership)
+            ):
                 continue
             chunk.append(event)
 
@@ -401,6 +425,8 @@ class RoomMemberListRestServlet(ClientV1RestServlet):
         }))
 
 
+# deprecated in favour of /members?membership=join?
+# except it does custom AS logic and has a simpler return format
 class JoinedRoomMemberListRestServlet(ClientV1RestServlet):
     PATTERNS = client_path_patterns("/rooms/(?P<room_id>[^/]*)/joined_members$")
 
diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py
index eeae466d82..372648cafd 100644
--- a/synapse/rest/client/v2_alpha/account.py
+++ b/synapse/rest/client/v2_alpha/account.py
@@ -209,10 +209,17 @@ class DeactivateAccountRestServlet(RestServlet):
         yield self.auth_handler.validate_user_via_ui_auth(
             requester, body, self.hs.get_ip_from_request(request),
         )
-        yield self._deactivate_account_handler.deactivate_account(
+        result = yield self._deactivate_account_handler.deactivate_account(
             requester.user.to_string(), erase,
         )
-        defer.returnValue((200, {}))
+        if result:
+            id_server_unbind_result = "success"
+        else:
+            id_server_unbind_result = "no-support"
+
+        defer.returnValue((200, {
+            "id_server_unbind_result": id_server_unbind_result,
+        }))
 
 
 class EmailThreepidRequestTokenRestServlet(RestServlet):
@@ -364,7 +371,7 @@ class ThreepidDeleteRestServlet(RestServlet):
         user_id = requester.user.to_string()
 
         try:
-            yield self.auth_handler.delete_threepid(
+            ret = yield self.auth_handler.delete_threepid(
                 user_id, body['medium'], body['address']
             )
         except Exception:
@@ -374,7 +381,14 @@ class ThreepidDeleteRestServlet(RestServlet):
             logger.exception("Failed to remove threepid")
             raise SynapseError(500, "Failed to remove threepid")
 
-        defer.returnValue((200, {}))
+        if ret:
+            id_server_unbind_result = "success"
+        else:
+            id_server_unbind_result = "no-support"
+
+        defer.returnValue((200, {
+            "id_server_unbind_result": id_server_unbind_result,
+        }))
 
 
 class WhoamiRestServlet(RestServlet):
diff --git a/synapse/server.py b/synapse/server.py
index 140be9ebe8..26228d8c72 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -36,6 +36,7 @@ from synapse.federation.federation_client import FederationClient
 from synapse.federation.federation_server import (
     FederationHandlerRegistry,
     FederationServer,
+    ReplicationFederationHandlerRegistry,
 )
 from synapse.federation.send_queue import FederationRemoteSendQueue
 from synapse.federation.transaction_queue import TransactionQueue
@@ -423,7 +424,10 @@ class HomeServer(object):
         return RoomMemberMasterHandler(self)
 
     def build_federation_registry(self):
-        return FederationHandlerRegistry()
+        if self.config.worker_app:
+            return ReplicationFederationHandlerRegistry(self)
+        else:
+            return FederationHandlerRegistry()
 
     def build_server_notices_manager(self):
         if self.config.worker_app:
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index d4aa192a0a..025a7fb6d9 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -1436,88 +1436,6 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
         )
 
     @defer.inlineCallbacks
-    def have_events_in_timeline(self, event_ids):
-        """Given a list of event ids, check if we have already processed and
-        stored them as non outliers.
-        """
-        rows = yield self._simple_select_many_batch(
-            table="events",
-            retcols=("event_id",),
-            column="event_id",
-            iterable=list(event_ids),
-            keyvalues={"outlier": False},
-            desc="have_events_in_timeline",
-        )
-
-        defer.returnValue(set(r["event_id"] for r in rows))
-
-    @defer.inlineCallbacks
-    def have_seen_events(self, event_ids):
-        """Given a list of event ids, check if we have already processed them.
-
-        Args:
-            event_ids (iterable[str]):
-
-        Returns:
-            Deferred[set[str]]: The events we have already seen.
-        """
-        results = set()
-
-        def have_seen_events_txn(txn, chunk):
-            sql = (
-                "SELECT event_id FROM events as e WHERE e.event_id IN (%s)"
-                % (",".join("?" * len(chunk)), )
-            )
-            txn.execute(sql, chunk)
-            for (event_id, ) in txn:
-                results.add(event_id)
-
-        # break the input up into chunks of 100
-        input_iterator = iter(event_ids)
-        for chunk in iter(lambda: list(itertools.islice(input_iterator, 100)),
-                          []):
-            yield self.runInteraction(
-                "have_seen_events",
-                have_seen_events_txn,
-                chunk,
-            )
-        defer.returnValue(results)
-
-    def get_seen_events_with_rejections(self, event_ids):
-        """Given a list of event ids, check if we rejected them.
-
-        Args:
-            event_ids (list[str])
-
-        Returns:
-            Deferred[dict[str, str|None):
-                Has an entry for each event id we already have seen. Maps to
-                the rejected reason string if we rejected the event, else maps
-                to None.
-        """
-        if not event_ids:
-            return defer.succeed({})
-
-        def f(txn):
-            sql = (
-                "SELECT e.event_id, reason FROM events as e "
-                "LEFT JOIN rejections as r ON e.event_id = r.event_id "
-                "WHERE e.event_id = ?"
-            )
-
-            res = {}
-            for event_id in event_ids:
-                txn.execute(sql, (event_id,))
-                row = txn.fetchone()
-                if row:
-                    _, rejected = row
-                    res[event_id] = rejected
-
-            return res
-
-        return self.runInteraction("get_rejection_reasons", f)
-
-    @defer.inlineCallbacks
     def count_daily_messages(self):
         """
         Returns an estimate of the number of messages sent in the last day.
@@ -1993,7 +1911,7 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
         max_depth = max(row[0] for row in rows)
 
         if max_depth <= token.topological:
-            # We need to ensure we don't delete all the events from the datanase
+            # We need to ensure we don't delete all the events from the database
             # otherwise we wouldn't be able to send any events (due to not
             # having any backwards extremeties)
             raise SynapseError(
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index 9b4cfeb899..59822178ff 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -12,6 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import itertools
 import logging
 from collections import namedtuple
 
@@ -442,3 +443,85 @@ class EventsWorkerStore(SQLBaseStore):
             self._get_event_cache.prefill((original_ev.event_id,), cache_entry)
 
         defer.returnValue(cache_entry)
+
+    @defer.inlineCallbacks
+    def have_events_in_timeline(self, event_ids):
+        """Given a list of event ids, check if we have already processed and
+        stored them as non outliers.
+        """
+        rows = yield self._simple_select_many_batch(
+            table="events",
+            retcols=("event_id",),
+            column="event_id",
+            iterable=list(event_ids),
+            keyvalues={"outlier": False},
+            desc="have_events_in_timeline",
+        )
+
+        defer.returnValue(set(r["event_id"] for r in rows))
+
+    @defer.inlineCallbacks
+    def have_seen_events(self, event_ids):
+        """Given a list of event ids, check if we have already processed them.
+
+        Args:
+            event_ids (iterable[str]):
+
+        Returns:
+            Deferred[set[str]]: The events we have already seen.
+        """
+        results = set()
+
+        def have_seen_events_txn(txn, chunk):
+            sql = (
+                "SELECT event_id FROM events as e WHERE e.event_id IN (%s)"
+                % (",".join("?" * len(chunk)), )
+            )
+            txn.execute(sql, chunk)
+            for (event_id, ) in txn:
+                results.add(event_id)
+
+        # break the input up into chunks of 100
+        input_iterator = iter(event_ids)
+        for chunk in iter(lambda: list(itertools.islice(input_iterator, 100)),
+                          []):
+            yield self.runInteraction(
+                "have_seen_events",
+                have_seen_events_txn,
+                chunk,
+            )
+        defer.returnValue(results)
+
+    def get_seen_events_with_rejections(self, event_ids):
+        """Given a list of event ids, check if we rejected them.
+
+        Args:
+            event_ids (list[str])
+
+        Returns:
+            Deferred[dict[str, str|None):
+                Has an entry for each event id we already have seen. Maps to
+                the rejected reason string if we rejected the event, else maps
+                to None.
+        """
+        if not event_ids:
+            return defer.succeed({})
+
+        def f(txn):
+            sql = (
+                "SELECT e.event_id, reason FROM events as e "
+                "LEFT JOIN rejections as r ON e.event_id = r.event_id "
+                "WHERE e.event_id = ?"
+            )
+
+            res = {}
+            for event_id in event_ids:
+                txn.execute(sql, (event_id,))
+                row = txn.fetchone()
+                if row:
+                    _, rejected = row
+                    res[event_id] = rejected
+
+            return res
+
+        return self.runInteraction("get_rejection_reasons", f)
diff --git a/synapse/storage/monthly_active_users.py b/synapse/storage/monthly_active_users.py
index d47dcef3a0..7e417f811e 100644
--- a/synapse/storage/monthly_active_users.py
+++ b/synapse/storage/monthly_active_users.py
@@ -46,7 +46,7 @@ class MonthlyActiveUsersStore(SQLBaseStore):
                 tp["medium"], tp["address"]
             )
             if user_id:
-                self.upsert_monthly_active_user(user_id)
+                yield self.upsert_monthly_active_user(user_id)
                 reserved_user_list.append(user_id)
             else:
                 logger.warning(
@@ -64,23 +64,27 @@ class MonthlyActiveUsersStore(SQLBaseStore):
             Deferred[]
         """
         def _reap_users(txn):
+            # Purge stale users
 
             thirty_days_ago = (
                 int(self._clock.time_msec()) - (1000 * 60 * 60 * 24 * 30)
             )
-            # Purge stale users
-
-            # questionmarks is a hack to overcome sqlite not supporting
-            # tuples in 'WHERE IN %s'
-            questionmarks = '?' * len(self.reserved_users)
             query_args = [thirty_days_ago]
-            query_args.extend(self.reserved_users)
-
-            sql = """
-                DELETE FROM monthly_active_users
-                WHERE timestamp < ?
-                AND user_id NOT IN ({})
-                """.format(','.join(questionmarks))
+            base_sql = "DELETE FROM monthly_active_users WHERE timestamp < ?"
+
+            # Need if/else since 'AND user_id NOT IN ({})' fails on Postgres
+            # when len(reserved_users) == 0. Works fine on sqlite.
+            if len(self.reserved_users) > 0:
+                # questionmarks is a hack to overcome sqlite not supporting
+                # tuples in 'WHERE IN %s'
+                questionmarks = '?' * len(self.reserved_users)
+
+                query_args.extend(self.reserved_users)
+                sql = base_sql + """ AND user_id NOT IN ({})""".format(
+                    ','.join(questionmarks)
+                )
+            else:
+                sql = base_sql
 
             txn.execute(sql, query_args)
 
@@ -93,16 +97,24 @@ class MonthlyActiveUsersStore(SQLBaseStore):
             # negative LIMIT values. So there is no way to write it that both can
             # support
             query_args = [self.hs.config.max_mau_value]
-            query_args.extend(self.reserved_users)
-            sql = """
+
+            base_sql = """
                 DELETE FROM monthly_active_users
                 WHERE user_id NOT IN (
                     SELECT user_id FROM monthly_active_users
                     ORDER BY timestamp DESC
                     LIMIT ?
                     )
-                AND user_id NOT IN ({})
-                """.format(','.join(questionmarks))
+                """
+            # Need if/else since 'AND user_id NOT IN ({})' fails on Postgres
+            # when len(reserved_users) == 0. Works fine on sqlite.
+            if len(self.reserved_users) > 0:
+                query_args.extend(self.reserved_users)
+                sql = base_sql + """ AND user_id NOT IN ({})""".format(
+                    ','.join(questionmarks)
+                )
+            else:
+                sql = base_sql
             txn.execute(sql, query_args)
 
         yield self.runInteraction("reap_monthly_active_users", _reap_users)
@@ -113,7 +125,7 @@ class MonthlyActiveUsersStore(SQLBaseStore):
         # is racy.
         # Have resolved to invalidate the whole cache for now and do
         # something about it if and when the perf becomes significant
-        self._user_last_seen_monthly_active.invalidate_all()
+        self.user_last_seen_monthly_active.invalidate_all()
         self.get_monthly_active_count.invalidate_all()
 
     @cached(num_args=0)
@@ -152,11 +164,11 @@ class MonthlyActiveUsersStore(SQLBaseStore):
             lock=False,
         )
         if is_insert:
-            self._user_last_seen_monthly_active.invalidate((user_id,))
+            self.user_last_seen_monthly_active.invalidate((user_id,))
             self.get_monthly_active_count.invalidate(())
 
     @cached(num_args=1)
-    def _user_last_seen_monthly_active(self, user_id):
+    def user_last_seen_monthly_active(self, user_id):
         """
             Checks if a given user is part of the monthly active user group
             Arguments:
@@ -173,7 +185,7 @@ class MonthlyActiveUsersStore(SQLBaseStore):
             },
             retcol="timestamp",
             allow_none=True,
-            desc="_user_last_seen_monthly_active",
+            desc="user_last_seen_monthly_active",
         ))
 
     @defer.inlineCallbacks
@@ -185,7 +197,7 @@ class MonthlyActiveUsersStore(SQLBaseStore):
             user_id(str): the user_id to query
         """
         if self.hs.config.limit_usage_by_mau:
-            last_seen_timestamp = yield self._user_last_seen_monthly_active(user_id)
+            last_seen_timestamp = yield self.user_last_seen_monthly_active(user_id)
             now = self.hs.get_clock().time_msec()
 
             # We want to reduce to the total number of db writes, and are happy
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 3147fb6827..3378fc77d1 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -41,6 +41,22 @@ RatelimitOverride = collections.namedtuple(
 
 
 class RoomWorkerStore(SQLBaseStore):
+    def get_room(self, room_id):
+        """Retrieve a room.
+
+        Args:
+            room_id (str): The ID of the room to retrieve.
+        Returns:
+            A namedtuple containing the room information, or an empty list.
+        """
+        return self._simple_select_one(
+            table="rooms",
+            keyvalues={"room_id": room_id},
+            retcols=("room_id", "is_public", "creator"),
+            desc="get_room",
+            allow_none=True,
+        )
+
     def get_public_room_ids(self):
         return self._simple_select_onecol(
             table="rooms",
@@ -215,22 +231,6 @@ class RoomStore(RoomWorkerStore, SearchStore):
             logger.error("store_room with room_id=%s failed: %s", room_id, e)
             raise StoreError(500, "Problem creating room.")
 
-    def get_room(self, room_id):
-        """Retrieve a room.
-
-        Args:
-            room_id (str): The ID of the room to retrieve.
-        Returns:
-            A namedtuple containing the room information, or an empty list.
-        """
-        return self._simple_select_one(
-            table="rooms",
-            keyvalues={"room_id": room_id},
-            retcols=("room_id", "is_public", "creator"),
-            desc="get_room",
-            allow_none=True,
-        )
-
     @defer.inlineCallbacks
     def set_room_is_public(self, room_id, is_public):
         def set_room_is_public_txn(txn, next_id):
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 17b14d464b..754dfa6973 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -116,6 +116,69 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
             _get_current_state_ids_txn,
         )
 
+    # FIXME: how should this be cached?
+    def get_filtered_current_state_ids(self, room_id, types, filtered_types=None):
+        """Get the current state event of a given type for a room based on the
+        current_state_events table.  This may not be as up-to-date as the result
+        of doing a fresh state resolution as per state_handler.get_current_state
+        Args:
+            room_id (str)
+            types (list[(Str, (Str|None))]): List of (type, state_key) tuples
+                which are used to filter the state fetched. `state_key` may be
+                None, which matches any `state_key`
+            filtered_types (list[Str]|None): List of types to apply the above filter to.
+        Returns:
+            deferred: dict of (type, state_key) -> event
+        """
+
+        include_other_types = False if filtered_types is None else True
+
+        def _get_filtered_current_state_ids_txn(txn):
+            results = {}
+            sql = """SELECT type, state_key, event_id FROM current_state_events
+                     WHERE room_id = ? %s"""
+            # Turns out that postgres doesn't like doing a list of OR's and
+            # is about 1000x slower, so we just issue a query for each specific
+            # type seperately.
+            if types:
+                clause_to_args = [
+                    (
+                        "AND type = ? AND state_key = ?",
+                        (etype, state_key)
+                    ) if state_key is not None else (
+                        "AND type = ?",
+                        (etype,)
+                    )
+                    for etype, state_key in types
+                ]
+
+                if include_other_types:
+                    unique_types = set(filtered_types)
+                    clause_to_args.append(
+                        (
+                            "AND type <> ? " * len(unique_types),
+                            list(unique_types)
+                        )
+                    )
+            else:
+                # If types is None we fetch all the state, and so just use an
+                # empty where clause with no extra args.
+                clause_to_args = [("", [])]
+            for where_clause, where_args in clause_to_args:
+                args = [room_id]
+                args.extend(where_args)
+                txn.execute(sql % (where_clause,), args)
+                for row in txn:
+                    typ, state_key, event_id = row
+                    key = (intern_string(typ), intern_string(state_key))
+                    results[key] = event_id
+            return results
+
+        return self.runInteraction(
+            "get_filtered_current_state_ids",
+            _get_filtered_current_state_ids_txn,
+        )
+
     @cached(max_entries=10000, iterable=True)
     def get_state_group_delta(self, state_group):
         """Given a state group try to return a previous group and a delta between
@@ -389,8 +452,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
                 If None, `types` filtering is applied to all events.
 
         Returns:
-            deferred: A list of dicts corresponding to the event_ids given.
-            The dicts are mappings from (type, state_key) -> state_events
+            deferred: A dict of (event_id) -> (type, state_key) -> [state_events]
         """
         event_to_groups = yield self._get_state_group_for_events(
             event_ids,
diff --git a/tests/api/test_auth.py b/tests/api/test_auth.py
index a65689ba89..32a2b5fc3d 100644
--- a/tests/api/test_auth.py
+++ b/tests/api/test_auth.py
@@ -455,8 +455,11 @@ class AuthTestCase(unittest.TestCase):
             return_value=defer.succeed(lots_of_users)
         )
 
-        with self.assertRaises(AuthError):
+        with self.assertRaises(AuthError) as e:
             yield self.auth.check_auth_blocking()
+        self.assertEquals(e.exception.admin_uri, self.hs.config.admin_uri)
+        self.assertEquals(e.exception.errcode, Codes.RESOURCE_LIMIT_EXCEED)
+        self.assertEquals(e.exception.code, 403)
 
         # Ensure does not throw an error
         self.store.get_monthly_active_count = Mock(
@@ -470,5 +473,6 @@ class AuthTestCase(unittest.TestCase):
         self.hs.config.hs_disabled_message = "Reason for being disabled"
         with self.assertRaises(AuthError) as e:
             yield self.auth.check_auth_blocking()
-        self.assertEquals(e.exception.errcode, Codes.HS_DISABLED)
+        self.assertEquals(e.exception.admin_uri, self.hs.config.admin_uri)
+        self.assertEquals(e.exception.errcode, Codes.RESOURCE_LIMIT_EXCEED)
         self.assertEquals(e.exception.code, 403)
diff --git a/tests/handlers/test_auth.py b/tests/handlers/test_auth.py
index 56c0f87fb7..3046bd6093 100644
--- a/tests/handlers/test_auth.py
+++ b/tests/handlers/test_auth.py
@@ -124,7 +124,7 @@ class AuthTestCase(unittest.TestCase):
         )
 
     @defer.inlineCallbacks
-    def test_mau_limits_exceeded(self):
+    def test_mau_limits_exceeded_large(self):
         self.hs.config.limit_usage_by_mau = True
         self.hs.get_datastore().get_monthly_active_count = Mock(
             return_value=defer.succeed(self.large_number_of_users)
@@ -142,6 +142,42 @@ class AuthTestCase(unittest.TestCase):
             )
 
     @defer.inlineCallbacks
+    def test_mau_limits_parity(self):
+        self.hs.config.limit_usage_by_mau = True
+
+        # If not in monthly active cohort
+        self.hs.get_datastore().get_monthly_active_count = Mock(
+            return_value=defer.succeed(self.hs.config.max_mau_value)
+        )
+        with self.assertRaises(AuthError):
+            yield self.auth_handler.get_access_token_for_user_id('user_a')
+
+        self.hs.get_datastore().get_monthly_active_count = Mock(
+            return_value=defer.succeed(self.hs.config.max_mau_value)
+        )
+        with self.assertRaises(AuthError):
+            yield self.auth_handler.validate_short_term_login_token_and_get_user_id(
+                self._get_macaroon().serialize()
+            )
+        # If in monthly active cohort
+        self.hs.get_datastore().user_last_seen_monthly_active = Mock(
+            return_value=defer.succeed(self.hs.get_clock().time_msec())
+        )
+        self.hs.get_datastore().get_monthly_active_count = Mock(
+            return_value=defer.succeed(self.hs.config.max_mau_value)
+        )
+        yield self.auth_handler.get_access_token_for_user_id('user_a')
+        self.hs.get_datastore().user_last_seen_monthly_active = Mock(
+            return_value=defer.succeed(self.hs.get_clock().time_msec())
+        )
+        self.hs.get_datastore().get_monthly_active_count = Mock(
+            return_value=defer.succeed(self.hs.config.max_mau_value)
+        )
+        yield self.auth_handler.validate_short_term_login_token_and_get_user_id(
+            self._get_macaroon().serialize()
+        )
+
+    @defer.inlineCallbacks
     def test_mau_limits_not_exceeded(self):
         self.hs.config.limit_usage_by_mau = True
 
diff --git a/tests/handlers/test_register.py b/tests/handlers/test_register.py
index d48d40c8dd..7154816a34 100644
--- a/tests/handlers/test_register.py
+++ b/tests/handlers/test_register.py
@@ -17,7 +17,7 @@ from mock import Mock
 
 from twisted.internet import defer
 
-from synapse.api.errors import RegistrationError
+from synapse.api.errors import AuthError
 from synapse.handlers.register import RegistrationHandler
 from synapse.types import UserID, create_requester
 
@@ -98,7 +98,7 @@ class RegistrationTestCase(unittest.TestCase):
     def test_get_or_create_user_mau_not_blocked(self):
         self.hs.config.limit_usage_by_mau = True
         self.store.count_monthly_users = Mock(
-            return_value=defer.succeed(self.small_number_of_users)
+            return_value=defer.succeed(self.hs.config.max_mau_value - 1)
         )
         # Ensure does not throw exception
         yield self.handler.get_or_create_user("@user:server", 'c', "User")
@@ -109,7 +109,13 @@ class RegistrationTestCase(unittest.TestCase):
         self.store.get_monthly_active_count = Mock(
             return_value=defer.succeed(self.lots_of_users)
         )
-        with self.assertRaises(RegistrationError):
+        with self.assertRaises(AuthError):
+            yield self.handler.get_or_create_user("requester", 'b', "display_name")
+
+        self.store.get_monthly_active_count = Mock(
+            return_value=defer.succeed(self.hs.config.max_mau_value)
+        )
+        with self.assertRaises(AuthError):
             yield self.handler.get_or_create_user("requester", 'b', "display_name")
 
     @defer.inlineCallbacks
@@ -118,7 +124,13 @@ class RegistrationTestCase(unittest.TestCase):
         self.store.get_monthly_active_count = Mock(
             return_value=defer.succeed(self.lots_of_users)
         )
-        with self.assertRaises(RegistrationError):
+        with self.assertRaises(AuthError):
+            yield self.handler.register(localpart="local_part")
+
+        self.store.get_monthly_active_count = Mock(
+            return_value=defer.succeed(self.hs.config.max_mau_value)
+        )
+        with self.assertRaises(AuthError):
             yield self.handler.register(localpart="local_part")
 
     @defer.inlineCallbacks
@@ -127,5 +139,11 @@ class RegistrationTestCase(unittest.TestCase):
         self.store.get_monthly_active_count = Mock(
             return_value=defer.succeed(self.lots_of_users)
         )
-        with self.assertRaises(RegistrationError):
+        with self.assertRaises(AuthError):
+            yield self.handler.register_saml2(localpart="local_part")
+
+        self.store.get_monthly_active_count = Mock(
+            return_value=defer.succeed(self.hs.config.max_mau_value)
+        )
+        with self.assertRaises(AuthError):
             yield self.handler.register_saml2(localpart="local_part")
diff --git a/tests/handlers/test_sync.py b/tests/handlers/test_sync.py
new file mode 100644
index 0000000000..33d861bd64
--- /dev/null
+++ b/tests/handlers/test_sync.py
@@ -0,0 +1,71 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from twisted.internet import defer
+
+from synapse.api.errors import AuthError, Codes
+from synapse.api.filtering import DEFAULT_FILTER_COLLECTION
+from synapse.handlers.sync import SyncConfig, SyncHandler
+from synapse.types import UserID
+
+import tests.unittest
+import tests.utils
+from tests.utils import setup_test_homeserver
+
+
+class SyncTestCase(tests.unittest.TestCase):
+    """ Tests Sync Handler. """
+
+    @defer.inlineCallbacks
+    def setUp(self):
+        self.hs = yield setup_test_homeserver(self.addCleanup)
+        self.sync_handler = SyncHandler(self.hs)
+        self.store = self.hs.get_datastore()
+
+    @defer.inlineCallbacks
+    def test_wait_for_sync_for_user_auth_blocking(self):
+
+        user_id1 = "@user1:server"
+        user_id2 = "@user2:server"
+        sync_config = self._generate_sync_config(user_id1)
+
+        self.hs.config.limit_usage_by_mau = True
+        self.hs.config.max_mau_value = 1
+
+        # Check that the happy case does not throw errors
+        yield self.store.upsert_monthly_active_user(user_id1)
+        yield self.sync_handler.wait_for_sync_for_user(sync_config)
+
+        # Test that global lock works
+        self.hs.config.hs_disabled = True
+        with self.assertRaises(AuthError) as e:
+            yield self.sync_handler.wait_for_sync_for_user(sync_config)
+        self.assertEquals(e.exception.errcode, Codes.RESOURCE_LIMIT_EXCEED)
+
+        self.hs.config.hs_disabled = False
+
+        sync_config = self._generate_sync_config(user_id2)
+
+        with self.assertRaises(AuthError) as e:
+            yield self.sync_handler.wait_for_sync_for_user(sync_config)
+        self.assertEquals(e.exception.errcode, Codes.RESOURCE_LIMIT_EXCEED)
+
+    def _generate_sync_config(self, user_id):
+        return SyncConfig(
+            user=UserID(user_id.split(":")[0][1:], user_id.split(":")[1]),
+            filter_collection=DEFAULT_FILTER_COLLECTION,
+            is_guest=False,
+            request_key="request_key",
+            device_id="device_id",
+        )
diff --git a/tests/rest/client/v1/test_typing.py b/tests/rest/client/v1/test_typing.py
index 677265edf6..0ad814c5e5 100644
--- a/tests/rest/client/v1/test_typing.py
+++ b/tests/rest/client/v1/test_typing.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -17,41 +18,32 @@
 
 from mock import Mock, NonCallableMock
 
-# twisted imports
 from twisted.internet import defer
 
-import synapse.rest.client.v1.room
+from synapse.rest.client.v1 import room
 from synapse.types import UserID
 
-from ....utils import MockClock, MockHttpResource, setup_test_homeserver
-from .utils import RestTestCase
+from tests import unittest
 
 PATH_PREFIX = "/_matrix/client/api/v1"
 
 
-class RoomTypingTestCase(RestTestCase):
+class RoomTypingTestCase(unittest.HomeserverTestCase):
     """ Tests /rooms/$room_id/typing/$user_id REST API. """
 
     user_id = "@sid:red"
 
     user = UserID.from_string(user_id)
+    servlets = [room.register_servlets]
 
-    @defer.inlineCallbacks
-    def setUp(self):
-        self.clock = MockClock()
+    def make_homeserver(self, reactor, clock):
 
-        self.mock_resource = MockHttpResource(prefix=PATH_PREFIX)
-        self.auth_user_id = self.user_id
-
-        hs = yield setup_test_homeserver(
-            self.addCleanup,
+        hs = self.setup_test_homeserver(
             "red",
-            clock=self.clock,
             http_client=None,
             federation_client=Mock(),
             ratelimiter=NonCallableMock(spec_set=["send_message"]),
         )
-        self.hs = hs
 
         self.event_source = hs.get_event_sources().sources["typing"]
 
@@ -100,25 +92,24 @@ class RoomTypingTestCase(RestTestCase):
             fetch_room_distributions_into
         )
 
-        synapse.rest.client.v1.room.register_servlets(hs, self.mock_resource)
+        return hs
 
-        self.room_id = yield self.create_room_as(self.user_id)
+    def prepare(self, reactor, clock, hs):
+        self.room_id = self.helper.create_room_as(self.user_id)
         # Need another user to make notifications actually work
-        yield self.join(self.room_id, user="@jim:red")
+        self.helper.join(self.room_id, user="@jim:red")
 
-    @defer.inlineCallbacks
     def test_set_typing(self):
-        (code, _) = yield self.mock_resource.trigger(
+        request, channel = self.make_request(
             "PUT",
             "/rooms/%s/typing/%s" % (self.room_id, self.user_id),
-            '{"typing": true, "timeout": 30000}',
+            b'{"typing": true, "timeout": 30000}',
         )
-        self.assertEquals(200, code)
+        self.render(request)
+        self.assertEquals(200, channel.code)
 
         self.assertEquals(self.event_source.get_current_key(), 1)
-        events = yield self.event_source.get_new_events(
-            from_key=0, room_ids=[self.room_id]
-        )
+        events = self.event_source.get_new_events(from_key=0, room_ids=[self.room_id])
         self.assertEquals(
             events[0],
             [
@@ -130,35 +121,36 @@ class RoomTypingTestCase(RestTestCase):
             ],
         )
 
-    @defer.inlineCallbacks
     def test_set_not_typing(self):
-        (code, _) = yield self.mock_resource.trigger(
+        request, channel = self.make_request(
             "PUT",
             "/rooms/%s/typing/%s" % (self.room_id, self.user_id),
-            '{"typing": false}',
+            b'{"typing": false}',
         )
-        self.assertEquals(200, code)
+        self.render(request)
+        self.assertEquals(200, channel.code)
 
-    @defer.inlineCallbacks
     def test_typing_timeout(self):
-        (code, _) = yield self.mock_resource.trigger(
+        request, channel = self.make_request(
             "PUT",
             "/rooms/%s/typing/%s" % (self.room_id, self.user_id),
-            '{"typing": true, "timeout": 30000}',
+            b'{"typing": true, "timeout": 30000}',
         )
-        self.assertEquals(200, code)
+        self.render(request)
+        self.assertEquals(200, channel.code)
 
         self.assertEquals(self.event_source.get_current_key(), 1)
 
-        self.clock.advance_time(36)
+        self.reactor.advance(36)
 
         self.assertEquals(self.event_source.get_current_key(), 2)
 
-        (code, _) = yield self.mock_resource.trigger(
+        request, channel = self.make_request(
             "PUT",
             "/rooms/%s/typing/%s" % (self.room_id, self.user_id),
-            '{"typing": true, "timeout": 30000}',
+            b'{"typing": true, "timeout": 30000}',
         )
-        self.assertEquals(200, code)
+        self.render(request)
+        self.assertEquals(200, channel.code)
 
         self.assertEquals(self.event_source.get_current_key(), 3)
diff --git a/tests/storage/test_client_ips.py b/tests/storage/test_client_ips.py
index fa60d949ba..c2e88bdbaf 100644
--- a/tests/storage/test_client_ips.py
+++ b/tests/storage/test_client_ips.py
@@ -63,7 +63,7 @@ class ClientIpStoreTestCase(tests.unittest.TestCase):
         yield self.store.insert_client_ip(
             user_id, "access_token", "ip", "user_agent", "device_id"
         )
-        active = yield self.store._user_last_seen_monthly_active(user_id)
+        active = yield self.store.user_last_seen_monthly_active(user_id)
         self.assertFalse(active)
 
     @defer.inlineCallbacks
@@ -79,7 +79,7 @@ class ClientIpStoreTestCase(tests.unittest.TestCase):
         yield self.store.insert_client_ip(
             user_id, "access_token", "ip", "user_agent", "device_id"
         )
-        active = yield self.store._user_last_seen_monthly_active(user_id)
+        active = yield self.store.user_last_seen_monthly_active(user_id)
         self.assertFalse(active)
 
     @defer.inlineCallbacks
@@ -87,13 +87,13 @@ class ClientIpStoreTestCase(tests.unittest.TestCase):
         self.hs.config.limit_usage_by_mau = True
         self.hs.config.max_mau_value = 50
         user_id = "@user:server"
-        active = yield self.store._user_last_seen_monthly_active(user_id)
+        active = yield self.store.user_last_seen_monthly_active(user_id)
         self.assertFalse(active)
 
         yield self.store.insert_client_ip(
             user_id, "access_token", "ip", "user_agent", "device_id"
         )
-        active = yield self.store._user_last_seen_monthly_active(user_id)
+        active = yield self.store.user_last_seen_monthly_active(user_id)
         self.assertTrue(active)
 
     @defer.inlineCallbacks
@@ -102,7 +102,7 @@ class ClientIpStoreTestCase(tests.unittest.TestCase):
         self.hs.config.max_mau_value = 50
         user_id = "@user:server"
 
-        active = yield self.store._user_last_seen_monthly_active(user_id)
+        active = yield self.store.user_last_seen_monthly_active(user_id)
         self.assertFalse(active)
 
         yield self.store.insert_client_ip(
@@ -111,5 +111,5 @@ class ClientIpStoreTestCase(tests.unittest.TestCase):
         yield self.store.insert_client_ip(
             user_id, "access_token", "ip", "user_agent", "device_id"
         )
-        active = yield self.store._user_last_seen_monthly_active(user_id)
+        active = yield self.store.user_last_seen_monthly_active(user_id)
         self.assertTrue(active)
diff --git a/tests/storage/test_monthly_active_users.py b/tests/storage/test_monthly_active_users.py
index 0a2c859f26..511acbde9b 100644
--- a/tests/storage/test_monthly_active_users.py
+++ b/tests/storage/test_monthly_active_users.py
@@ -33,7 +33,7 @@ class MonthlyActiveUsersTestCase(tests.unittest.TestCase):
 
     @defer.inlineCallbacks
     def test_initialise_reserved_users(self):
-
+        self.hs.config.max_mau_value = 5
         user1 = "@user1:server"
         user1_email = "user1@matrix.org"
         user2 = "@user2:server"
@@ -60,9 +60,9 @@ class MonthlyActiveUsersTestCase(tests.unittest.TestCase):
 
         # Test user is marked as active
 
-        timestamp = yield self.store._user_last_seen_monthly_active(user1)
+        timestamp = yield self.store.user_last_seen_monthly_active(user1)
         self.assertTrue(timestamp)
-        timestamp = yield self.store._user_last_seen_monthly_active(user2)
+        timestamp = yield self.store.user_last_seen_monthly_active(user2)
         self.assertTrue(timestamp)
 
         # Test that users are never removed from the db.
@@ -86,17 +86,18 @@ class MonthlyActiveUsersTestCase(tests.unittest.TestCase):
         self.assertEqual(1, count)
 
     @defer.inlineCallbacks
-    def test__user_last_seen_monthly_active(self):
+    def test_user_last_seen_monthly_active(self):
         user_id1 = "@user1:server"
         user_id2 = "@user2:server"
         user_id3 = "@user3:server"
-        result = yield self.store._user_last_seen_monthly_active(user_id1)
+
+        result = yield self.store.user_last_seen_monthly_active(user_id1)
         self.assertFalse(result == 0)
         yield self.store.upsert_monthly_active_user(user_id1)
         yield self.store.upsert_monthly_active_user(user_id2)
-        result = yield self.store._user_last_seen_monthly_active(user_id1)
+        result = yield self.store.user_last_seen_monthly_active(user_id1)
         self.assertTrue(result > 0)
-        result = yield self.store._user_last_seen_monthly_active(user_id3)
+        result = yield self.store.user_last_seen_monthly_active(user_id3)
         self.assertFalse(result == 0)
 
     @defer.inlineCallbacks
diff --git a/tests/storage/test_state.py b/tests/storage/test_state.py
index 6168c46248..ebfd969b36 100644
--- a/tests/storage/test_state.py
+++ b/tests/storage/test_state.py
@@ -148,7 +148,7 @@ class StateStoreTestCase(tests.unittest.TestCase):
             {(e3.type, e3.state_key): e3, (e5.type, e5.state_key): e5}, state
         )
 
-        # check we can use filter_types to grab a specific room member
+        # check we can use filtered_types to grab a specific room member
         # without filtering out the other event types
         state = yield self.store.get_state_for_event(
             e5.event_id,
diff --git a/tests/unittest.py b/tests/unittest.py
index f448a6dfbd..e6afe3b96d 100644
--- a/tests/unittest.py
+++ b/tests/unittest.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -15,12 +16,19 @@
 
 import logging
 
+from mock import Mock
+
 import twisted
 import twisted.logger
 from twisted.trial import unittest
 
+from synapse.http.server import JsonResource
+from synapse.server import HomeServer
+from synapse.types import UserID, create_requester
 from synapse.util.logcontext import LoggingContextFilter
 
+from tests.server import get_clock, make_request, render, setup_test_homeserver
+
 # Set up putting Synapse's logs into Trial's.
 rootLogger = logging.getLogger()
 
@@ -129,3 +137,139 @@ def DEBUG(target):
     Can apply to either a TestCase or an individual test method."""
     target.loglevel = logging.DEBUG
     return target
+
+
+class HomeserverTestCase(TestCase):
+    """
+    A base TestCase that reduces boilerplate for HomeServer-using test cases.
+
+    Attributes:
+        servlets (list[function]): List of servlet registration function.
+        user_id (str): The user ID to assume if auth is hijacked.
+        hijack_auth (bool): Whether to hijack auth to return the user specified
+        in user_id.
+    """
+    servlets = []
+    hijack_auth = True
+
+    def setUp(self):
+        """
+        Set up the TestCase by calling the homeserver constructor, optionally
+        hijacking the authentication system to return a fixed user, and then
+        calling the prepare function.
+        """
+        self.reactor, self.clock = get_clock()
+        self._hs_args = {"clock": self.clock, "reactor": self.reactor}
+        self.hs = self.make_homeserver(self.reactor, self.clock)
+
+        if self.hs is None:
+            raise Exception("No homeserver returned from make_homeserver.")
+
+        if not isinstance(self.hs, HomeServer):
+            raise Exception("A homeserver wasn't returned, but %r" % (self.hs,))
+
+        # Register the resources
+        self.resource = JsonResource(self.hs)
+
+        for servlet in self.servlets:
+            servlet(self.hs, self.resource)
+
+        if hasattr(self, "user_id"):
+            from tests.rest.client.v1.utils import RestHelper
+
+            self.helper = RestHelper(self.hs, self.resource, self.user_id)
+
+            if self.hijack_auth:
+
+                def get_user_by_access_token(token=None, allow_guest=False):
+                    return {
+                        "user": UserID.from_string(self.helper.auth_user_id),
+                        "token_id": 1,
+                        "is_guest": False,
+                    }
+
+                def get_user_by_req(request, allow_guest=False, rights="access"):
+                    return create_requester(
+                        UserID.from_string(self.helper.auth_user_id), 1, False, None
+                    )
+
+                self.hs.get_auth().get_user_by_req = get_user_by_req
+                self.hs.get_auth().get_user_by_access_token = get_user_by_access_token
+                self.hs.get_auth().get_access_token_from_request = Mock(
+                    return_value="1234"
+                )
+
+        if hasattr(self, "prepare"):
+            self.prepare(self.reactor, self.clock, self.hs)
+
+    def make_homeserver(self, reactor, clock):
+        """
+        Make and return a homeserver.
+
+        Args:
+            reactor: A Twisted Reactor, or something that pretends to be one.
+            clock (synapse.util.Clock): The Clock, associated with the reactor.
+
+        Returns:
+            A homeserver (synapse.server.HomeServer) suitable for testing.
+
+        Function to be overridden in subclasses.
+        """
+        raise NotImplementedError()
+
+    def prepare(self, reactor, clock, homeserver):
+        """
+        Prepare for the test.  This involves things like mocking out parts of
+        the homeserver, or building test data common across the whole test
+        suite.
+
+        Args:
+            reactor: A Twisted Reactor, or something that pretends to be one.
+            clock (synapse.util.Clock): The Clock, associated with the reactor.
+            homeserver (synapse.server.HomeServer): The HomeServer to test
+            against.
+
+        Function to optionally be overridden in subclasses.
+        """
+
+    def make_request(self, method, path, content=b""):
+        """
+        Create a SynapseRequest at the path using the method and containing the
+        given content.
+
+        Args:
+            method (bytes/unicode): The HTTP request method ("verb").
+            path (bytes/unicode): The HTTP path, suitably URL encoded (e.g.
+            escaped UTF-8 & spaces and such).
+            content (bytes): The body of the request.
+
+        Returns:
+            A synapse.http.site.SynapseRequest.
+        """
+        return make_request(method, path, content)
+
+    def render(self, request):
+        """
+        Render a request against the resources registered by the test class's
+        servlets.
+
+        Args:
+            request (synapse.http.site.SynapseRequest): The request to render.
+        """
+        render(request, self.resource, self.reactor)
+
+    def setup_test_homeserver(self, *args, **kwargs):
+        """
+        Set up the test homeserver, meant to be called by the overridable
+        make_homeserver. It automatically passes through the test class's
+        clock & reactor.
+
+        Args:
+            See tests.utils.setup_test_homeserver.
+
+        Returns:
+            synapse.server.HomeServer
+        """
+        kwargs = dict(kwargs)
+        kwargs.update(self._hs_args)
+        return setup_test_homeserver(self.addCleanup, *args, **kwargs)
diff --git a/tests/utils.py b/tests/utils.py
index 90378326f8..52326d4f67 100644
--- a/tests/utils.py
+++ b/tests/utils.py
@@ -139,6 +139,7 @@ def setup_test_homeserver(
         config.hs_disabled_message = ""
         config.max_mau_value = 50
         config.mau_limits_reserved_threepids = []
+        config.admin_uri = None
 
         # we need a sane default_room_version, otherwise attempts to create rooms will
         # fail.