summary refs log tree commit diff
path: root/synapse/app
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/app')
-rw-r--r--synapse/app/__init__.py4
-rw-r--r--synapse/app/_base.py17
-rw-r--r--synapse/app/appservice.py18
-rw-r--r--synapse/app/client_reader.py34
-rw-r--r--synapse/app/event_creator.py34
-rw-r--r--synapse/app/federation_reader.py30
-rw-r--r--synapse/app/federation_sender.py23
-rw-r--r--synapse/app/frontend_proxy.py57
-rwxr-xr-xsynapse/app/homeserver.py106
-rw-r--r--synapse/app/media_repository.py22
-rw-r--r--synapse/app/pusher.py19
-rw-r--r--synapse/app/synchrotron.py42
-rwxr-xr-xsynapse/app/synctl.py9
-rw-r--r--synapse/app/user_dir.py17
14 files changed, 281 insertions, 151 deletions
diff --git a/synapse/app/__init__.py b/synapse/app/__init__.py
index 9c2b627590..3b6b9368b8 100644
--- a/synapse/app/__init__.py
+++ b/synapse/app/__init__.py
@@ -14,9 +14,11 @@
 # limitations under the License.
 
 import sys
+
+from synapse import python_dependencies  # noqa: E402
+
 sys.dont_write_bytecode = True
 
-from synapse import python_dependencies   # noqa: E402
 
 try:
     python_dependencies.check_requirements()
diff --git a/synapse/app/_base.py b/synapse/app/_base.py
index a6925ab139..7c866e246a 100644
--- a/synapse/app/_base.py
+++ b/synapse/app/_base.py
@@ -17,15 +17,18 @@ import gc
 import logging
 import sys
 
+from daemonize import Daemonize
+
+from twisted.internet import error, reactor
+
+from synapse.util import PreserveLoggingContext
+from synapse.util.rlimit import change_resource_limit
+
 try:
     import affinity
 except Exception:
     affinity = None
 
-from daemonize import Daemonize
-from synapse.util import PreserveLoggingContext
-from synapse.util.rlimit import change_resource_limit
-from twisted.internet import error, reactor
 
 logger = logging.getLogger(__name__)
 
@@ -137,7 +140,7 @@ def listen_metrics(bind_addresses, port):
         logger.info("Metrics now reporting on %s:%d", host, port)
 
 
-def listen_tcp(bind_addresses, port, factory, backlog=50):
+def listen_tcp(bind_addresses, port, factory, reactor=reactor, backlog=50):
     """
     Create a TCP socket for a port and several addresses
     """
@@ -153,7 +156,9 @@ def listen_tcp(bind_addresses, port, factory, backlog=50):
             check_bind_error(e, address, bind_addresses)
 
 
-def listen_ssl(bind_addresses, port, factory, context_factory, backlog=50):
+def listen_ssl(
+    bind_addresses, port, factory, context_factory, reactor=reactor, backlog=50
+):
     """
     Create an SSL socket for a port and several addresses
     """
diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py
index dd114dee07..86b5067400 100644
--- a/synapse/app/appservice.py
+++ b/synapse/app/appservice.py
@@ -16,6 +16,9 @@
 import logging
 import sys
 
+from twisted.internet import defer, reactor
+from twisted.web.resource import NoResource
+
 import synapse
 from synapse import events
 from synapse.app import _base
@@ -23,6 +26,7 @@ from synapse.config._base import ConfigError
 from synapse.config.homeserver import HomeServerConfig
 from synapse.config.logger import setup_logging
 from synapse.http.site import SynapseSite
+from synapse.metrics import RegistryProxy
 from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
 from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
 from synapse.replication.slave.storage.directory import DirectoryStore
@@ -35,8 +39,6 @@ from synapse.util.httpresourcetree import create_resource_tree
 from synapse.util.logcontext import LoggingContext, run_in_background
 from synapse.util.manhole import manhole
 from synapse.util.versionstring import get_version_string
-from twisted.internet import reactor, defer
-from twisted.web.resource import NoResource
 
 logger = logging.getLogger("synapse.app.appservice")
 
@@ -49,10 +51,7 @@ class AppserviceSlaveStore(
 
 
 class AppserviceServer(HomeServer):
-    def setup(self):
-        logger.info("Setting up.")
-        self.datastore = AppserviceSlaveStore(self.get_db_conn(), self)
-        logger.info("Finished setting up.")
+    DATASTORE_CLASS = AppserviceSlaveStore
 
     def _listen_http(self, listener_config):
         port = listener_config["port"]
@@ -62,7 +61,7 @@ class AppserviceServer(HomeServer):
         for res in listener_config["resources"]:
             for name in res["names"]:
                 if name == "metrics":
-                    resources[METRICS_PREFIX] = MetricsResource(self)
+                    resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
 
         root_resource = create_resource_tree(resources, NoResource())
 
@@ -97,7 +96,7 @@ class AppserviceServer(HomeServer):
             elif listener["type"] == "metrics":
                 if not self.get_config().enable_metrics:
                     logger.warn(("Metrics listener configured, but "
-                                 "collect_metrics is not enabled!"))
+                                 "enable_metrics is not True!"))
                 else:
                     _base.listen_metrics(listener["bind_addresses"],
                                          listener["port"])
@@ -115,8 +114,9 @@ class ASReplicationHandler(ReplicationClientHandler):
         super(ASReplicationHandler, self).__init__(hs.get_datastore())
         self.appservice_handler = hs.get_application_service_handler()
 
+    @defer.inlineCallbacks
     def on_rdata(self, stream_name, token, rows):
-        super(ASReplicationHandler, self).on_rdata(stream_name, token, rows)
+        yield super(ASReplicationHandler, self).on_rdata(stream_name, token, rows)
 
         if stream_name == "events":
             max_stream_id = self.store.get_room_max_stream_ordering()
diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py
index 85dada7f9f..ce2b113dbb 100644
--- a/synapse/app/client_reader.py
+++ b/synapse/app/client_reader.py
@@ -16,6 +16,9 @@
 import logging
 import sys
 
+from twisted.internet import reactor
+from twisted.web.resource import NoResource
+
 import synapse
 from synapse import events
 from synapse.app import _base
@@ -28,6 +31,7 @@ from synapse.http.site import SynapseSite
 from synapse.metrics import RegistryProxy
 from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
 from synapse.replication.slave.storage._base import BaseSlavedStore
+from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
 from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
 from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
 from synapse.replication.slave.storage.directory import DirectoryStore
@@ -35,29 +39,34 @@ from synapse.replication.slave.storage.events import SlavedEventStore
 from synapse.replication.slave.storage.keys import SlavedKeyStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
 from synapse.replication.slave.storage.room import RoomStore
-from synapse.replication.slave.storage.transactions import TransactionStore
+from synapse.replication.slave.storage.transactions import SlavedTransactionStore
 from synapse.replication.tcp.client import ReplicationClientHandler
-from synapse.rest.client.v1.room import PublicRoomListRestServlet
+from synapse.rest.client.v1.room import (
+    JoinedRoomMemberListRestServlet,
+    PublicRoomListRestServlet,
+    RoomEventContextServlet,
+    RoomMemberListRestServlet,
+    RoomStateRestServlet,
+)
 from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
 from synapse.util.httpresourcetree import create_resource_tree
 from synapse.util.logcontext import LoggingContext
 from synapse.util.manhole import manhole
 from synapse.util.versionstring import get_version_string
-from twisted.internet import reactor
-from twisted.web.resource import NoResource
 
 logger = logging.getLogger("synapse.app.client_reader")
 
 
 class ClientReaderSlavedStore(
+    SlavedAccountDataStore,
     SlavedEventStore,
     SlavedKeyStore,
     RoomStore,
     DirectoryStore,
     SlavedApplicationServiceStore,
     SlavedRegistrationStore,
-    TransactionStore,
+    SlavedTransactionStore,
     SlavedClientIpStore,
     BaseSlavedStore,
 ):
@@ -65,10 +74,7 @@ class ClientReaderSlavedStore(
 
 
 class ClientReaderServer(HomeServer):
-    def setup(self):
-        logger.info("Setting up.")
-        self.datastore = ClientReaderSlavedStore(self.get_db_conn(), self)
-        logger.info("Finished setting up.")
+    DATASTORE_CLASS = ClientReaderSlavedStore
 
     def _listen_http(self, listener_config):
         port = listener_config["port"]
@@ -81,7 +87,13 @@ class ClientReaderServer(HomeServer):
                     resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
                 elif name == "client":
                     resource = JsonResource(self, canonical_json=False)
+
                     PublicRoomListRestServlet(self).register(resource)
+                    RoomMemberListRestServlet(self).register(resource)
+                    JoinedRoomMemberListRestServlet(self).register(resource)
+                    RoomStateRestServlet(self).register(resource)
+                    RoomEventContextServlet(self).register(resource)
+
                     resources.update({
                         "/_matrix/client/r0": resource,
                         "/_matrix/client/unstable": resource,
@@ -122,7 +134,7 @@ class ClientReaderServer(HomeServer):
             elif listener["type"] == "metrics":
                 if not self.get_config().enable_metrics:
                     logger.warn(("Metrics listener configured, but "
-                                 "collect_metrics is not enabled!"))
+                                 "enable_metrics is not True!"))
                 else:
                     _base.listen_metrics(listener["bind_addresses"],
                                          listener["port"])
@@ -153,11 +165,13 @@ def start(config_options):
     database_engine = create_engine(config.database_config)
 
     tls_server_context_factory = context_factory.ServerContextFactory(config)
+    tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
 
     ss = ClientReaderServer(
         config.server_name,
         db_config=config.database_config,
         tls_server_context_factory=tls_server_context_factory,
+        tls_client_options_factory=tls_client_options_factory,
         config=config,
         version_string="Synapse/" + get_version_string(synapse),
         database_engine=database_engine,
diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py
index 5ca77c0f1a..f98e456ea0 100644
--- a/synapse/app/event_creator.py
+++ b/synapse/app/event_creator.py
@@ -16,6 +16,9 @@
 import logging
 import sys
 
+from twisted.internet import reactor
+from twisted.web.resource import NoResource
+
 import synapse
 from synapse import events
 from synapse.app import _base
@@ -40,27 +43,36 @@ from synapse.replication.slave.storage.pushers import SlavedPusherStore
 from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
 from synapse.replication.slave.storage.room import RoomStore
-from synapse.replication.slave.storage.transactions import TransactionStore
+from synapse.replication.slave.storage.transactions import SlavedTransactionStore
 from synapse.replication.tcp.client import ReplicationClientHandler
+from synapse.rest.client.v1.profile import (
+    ProfileAvatarURLRestServlet,
+    ProfileDisplaynameRestServlet,
+    ProfileRestServlet,
+)
 from synapse.rest.client.v1.room import (
-    RoomSendEventRestServlet, RoomMembershipRestServlet, RoomStateEventRestServlet,
     JoinRoomAliasServlet,
+    RoomMembershipRestServlet,
+    RoomSendEventRestServlet,
+    RoomStateEventRestServlet,
 )
 from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
+from synapse.storage.user_directory import UserDirectoryStore
 from synapse.util.httpresourcetree import create_resource_tree
 from synapse.util.logcontext import LoggingContext
 from synapse.util.manhole import manhole
 from synapse.util.versionstring import get_version_string
-from twisted.internet import reactor
-from twisted.web.resource import NoResource
 
 logger = logging.getLogger("synapse.app.event_creator")
 
 
 class EventCreatorSlavedStore(
+    # FIXME(#3714): We need to add UserDirectoryStore as we write directly
+    # rather than going via the correct worker.
+    UserDirectoryStore,
     DirectoryStore,
-    TransactionStore,
+    SlavedTransactionStore,
     SlavedProfileStore,
     SlavedAccountDataStore,
     SlavedPusherStore,
@@ -78,10 +90,7 @@ class EventCreatorSlavedStore(
 
 
 class EventCreatorServer(HomeServer):
-    def setup(self):
-        logger.info("Setting up.")
-        self.datastore = EventCreatorSlavedStore(self.get_db_conn(), self)
-        logger.info("Finished setting up.")
+    DATASTORE_CLASS = EventCreatorSlavedStore
 
     def _listen_http(self, listener_config):
         port = listener_config["port"]
@@ -98,6 +107,9 @@ class EventCreatorServer(HomeServer):
                     RoomMembershipRestServlet(self).register(resource)
                     RoomStateEventRestServlet(self).register(resource)
                     JoinRoomAliasServlet(self).register(resource)
+                    ProfileAvatarURLRestServlet(self).register(resource)
+                    ProfileDisplaynameRestServlet(self).register(resource)
+                    ProfileRestServlet(self).register(resource)
                     resources.update({
                         "/_matrix/client/r0": resource,
                         "/_matrix/client/unstable": resource,
@@ -138,7 +150,7 @@ class EventCreatorServer(HomeServer):
             elif listener["type"] == "metrics":
                 if not self.get_config().enable_metrics:
                     logger.warn(("Metrics listener configured, but "
-                                 "collect_metrics is not enabled!"))
+                                 "enable_metrics is not True!"))
                 else:
                     _base.listen_metrics(listener["bind_addresses"],
                                          listener["port"])
@@ -171,11 +183,13 @@ def start(config_options):
     database_engine = create_engine(config.database_config)
 
     tls_server_context_factory = context_factory.ServerContextFactory(config)
+    tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
 
     ss = EventCreatorServer(
         config.server_name,
         db_config=config.database_config,
         tls_server_context_factory=tls_server_context_factory,
+        tls_client_options_factory=tls_client_options_factory,
         config=config,
         version_string="Synapse/" + get_version_string(synapse),
         database_engine=database_engine,
diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py
index 2a1995d0cd..60f5973505 100644
--- a/synapse/app/federation_reader.py
+++ b/synapse/app/federation_reader.py
@@ -16,6 +16,9 @@
 import logging
 import sys
 
+from twisted.internet import reactor
+from twisted.web.resource import NoResource
+
 import synapse
 from synapse import events
 from synapse.api.urls import FEDERATION_PREFIX
@@ -29,11 +32,17 @@ from synapse.http.site import SynapseSite
 from synapse.metrics import RegistryProxy
 from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
 from synapse.replication.slave.storage._base import BaseSlavedStore
+from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
+from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
 from synapse.replication.slave.storage.directory import DirectoryStore
 from synapse.replication.slave.storage.events import SlavedEventStore
 from synapse.replication.slave.storage.keys import SlavedKeyStore
+from synapse.replication.slave.storage.profile import SlavedProfileStore
+from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
+from synapse.replication.slave.storage.pushers import SlavedPusherStore
+from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
 from synapse.replication.slave.storage.room import RoomStore
-from synapse.replication.slave.storage.transactions import TransactionStore
+from synapse.replication.slave.storage.transactions import SlavedTransactionStore
 from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
@@ -41,28 +50,29 @@ from synapse.util.httpresourcetree import create_resource_tree
 from synapse.util.logcontext import LoggingContext
 from synapse.util.manhole import manhole
 from synapse.util.versionstring import get_version_string
-from twisted.internet import reactor
-from twisted.web.resource import NoResource
 
 logger = logging.getLogger("synapse.app.federation_reader")
 
 
 class FederationReaderSlavedStore(
+    SlavedAccountDataStore,
+    SlavedProfileStore,
+    SlavedApplicationServiceStore,
+    SlavedPusherStore,
+    SlavedPushRuleStore,
+    SlavedReceiptsStore,
     SlavedEventStore,
     SlavedKeyStore,
     RoomStore,
     DirectoryStore,
-    TransactionStore,
+    SlavedTransactionStore,
     BaseSlavedStore,
 ):
     pass
 
 
 class FederationReaderServer(HomeServer):
-    def setup(self):
-        logger.info("Setting up.")
-        self.datastore = FederationReaderSlavedStore(self.get_db_conn(), self)
-        logger.info("Finished setting up.")
+    DATASTORE_CLASS = FederationReaderSlavedStore
 
     def _listen_http(self, listener_config):
         port = listener_config["port"]
@@ -111,7 +121,7 @@ class FederationReaderServer(HomeServer):
             elif listener["type"] == "metrics":
                 if not self.get_config().enable_metrics:
                     logger.warn(("Metrics listener configured, but "
-                                 "collect_metrics is not enabled!"))
+                                 "enable_metrics is not True!"))
                 else:
                     _base.listen_metrics(listener["bind_addresses"],
                                          listener["port"])
@@ -142,11 +152,13 @@ def start(config_options):
     database_engine = create_engine(config.database_config)
 
     tls_server_context_factory = context_factory.ServerContextFactory(config)
+    tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
 
     ss = FederationReaderServer(
         config.server_name,
         db_config=config.database_config,
         tls_server_context_factory=tls_server_context_factory,
+        tls_client_options_factory=tls_client_options_factory,
         config=config,
         version_string="Synapse/" + get_version_string(synapse),
         database_engine=database_engine,
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 81ad574043..60dd09aac3 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -16,6 +16,9 @@
 import logging
 import sys
 
+from twisted.internet import defer, reactor
+from twisted.web.resource import NoResource
+
 import synapse
 from synapse import events
 from synapse.app import _base
@@ -33,23 +36,21 @@ from synapse.replication.slave.storage.events import SlavedEventStore
 from synapse.replication.slave.storage.presence import SlavedPresenceStore
 from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
-from synapse.replication.slave.storage.transactions import TransactionStore
+from synapse.replication.slave.storage.transactions import SlavedTransactionStore
 from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
-from synapse.util.async import Linearizer
+from synapse.util.async_helpers import Linearizer
 from synapse.util.httpresourcetree import create_resource_tree
 from synapse.util.logcontext import LoggingContext, run_in_background
 from synapse.util.manhole import manhole
 from synapse.util.versionstring import get_version_string
-from twisted.internet import defer, reactor
-from twisted.web.resource import NoResource
 
 logger = logging.getLogger("synapse.app.federation_sender")
 
 
 class FederationSenderSlaveStore(
-    SlavedDeviceInboxStore, TransactionStore, SlavedReceiptsStore, SlavedEventStore,
+    SlavedDeviceInboxStore, SlavedTransactionStore, SlavedReceiptsStore, SlavedEventStore,
     SlavedRegistrationStore, SlavedDeviceStore, SlavedPresenceStore,
 ):
     def __init__(self, db_conn, hs):
@@ -77,10 +78,7 @@ class FederationSenderSlaveStore(
 
 
 class FederationSenderServer(HomeServer):
-    def setup(self):
-        logger.info("Setting up.")
-        self.datastore = FederationSenderSlaveStore(self.get_db_conn(), self)
-        logger.info("Finished setting up.")
+    DATASTORE_CLASS = FederationSenderSlaveStore
 
     def _listen_http(self, listener_config):
         port = listener_config["port"]
@@ -125,7 +123,7 @@ class FederationSenderServer(HomeServer):
             elif listener["type"] == "metrics":
                 if not self.get_config().enable_metrics:
                     logger.warn(("Metrics listener configured, but "
-                                 "collect_metrics is not enabled!"))
+                                 "enable_metrics is not True!"))
                 else:
                     _base.listen_metrics(listener["bind_addresses"],
                                          listener["port"])
@@ -143,8 +141,9 @@ class FederationSenderReplicationHandler(ReplicationClientHandler):
         super(FederationSenderReplicationHandler, self).__init__(hs.get_datastore())
         self.send_handler = FederationSenderHandler(hs, self)
 
+    @defer.inlineCallbacks
     def on_rdata(self, stream_name, token, rows):
-        super(FederationSenderReplicationHandler, self).on_rdata(
+        yield super(FederationSenderReplicationHandler, self).on_rdata(
             stream_name, token, rows
         )
         self.send_handler.process_replication_rows(stream_name, token, rows)
@@ -185,11 +184,13 @@ def start(config_options):
     config.send_federation = True
 
     tls_server_context_factory = context_factory.ServerContextFactory(config)
+    tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
 
     ps = FederationSenderServer(
         config.server_name,
         db_config=config.database_config,
         tls_server_context_factory=tls_server_context_factory,
+        tls_client_options_factory=tls_client_options_factory,
         config=config,
         version_string="Synapse/" + get_version_string(synapse),
         database_engine=database_engine,
diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py
index 5a164a7a95..8c0b9c67b0 100644
--- a/synapse/app/frontend_proxy.py
+++ b/synapse/app/frontend_proxy.py
@@ -16,6 +16,9 @@
 import logging
 import sys
 
+from twisted.internet import defer, reactor
+from twisted.web.resource import NoResource
+
 import synapse
 from synapse import events
 from synapse.api.errors import SynapseError
@@ -25,9 +28,7 @@ from synapse.config.homeserver import HomeServerConfig
 from synapse.config.logger import setup_logging
 from synapse.crypto import context_factory
 from synapse.http.server import JsonResource
-from synapse.http.servlet import (
-    RestServlet, parse_json_object_from_request,
-)
+from synapse.http.servlet import RestServlet, parse_json_object_from_request
 from synapse.http.site import SynapseSite
 from synapse.metrics import RegistryProxy
 from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
@@ -37,6 +38,7 @@ from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
 from synapse.replication.slave.storage.devices import SlavedDeviceStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
 from synapse.replication.tcp.client import ReplicationClientHandler
+from synapse.rest.client.v1.base import ClientV1RestServlet, client_path_patterns
 from synapse.rest.client.v2_alpha._base import client_v2_patterns
 from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
@@ -44,12 +46,39 @@ from synapse.util.httpresourcetree import create_resource_tree
 from synapse.util.logcontext import LoggingContext
 from synapse.util.manhole import manhole
 from synapse.util.versionstring import get_version_string
-from twisted.internet import defer, reactor
-from twisted.web.resource import NoResource
 
 logger = logging.getLogger("synapse.app.frontend_proxy")
 
 
+class PresenceStatusStubServlet(ClientV1RestServlet):
+    PATTERNS = client_path_patterns("/presence/(?P<user_id>[^/]*)/status")
+
+    def __init__(self, hs):
+        super(PresenceStatusStubServlet, self).__init__(hs)
+        self.http_client = hs.get_simple_http_client()
+        self.auth = hs.get_auth()
+        self.main_uri = hs.config.worker_main_http_uri
+
+    @defer.inlineCallbacks
+    def on_GET(self, request, user_id):
+        # Pass through the auth headers, if any, in case the access token
+        # is there.
+        auth_headers = request.requestHeaders.getRawHeaders("Authorization", [])
+        headers = {
+            "Authorization": auth_headers,
+        }
+        result = yield self.http_client.get_json(
+            self.main_uri + request.uri,
+            headers=headers,
+        )
+        defer.returnValue((200, result))
+
+    @defer.inlineCallbacks
+    def on_PUT(self, request, user_id):
+        yield self.auth.get_user_by_req(request)
+        defer.returnValue((200, {}))
+
+
 class KeyUploadServlet(RestServlet):
     PATTERNS = client_v2_patterns("/keys/upload(/(?P<device_id>[^/]+))?$")
 
@@ -119,10 +148,7 @@ class FrontendProxySlavedStore(
 
 
 class FrontendProxyServer(HomeServer):
-    def setup(self):
-        logger.info("Setting up.")
-        self.datastore = FrontendProxySlavedStore(self.get_db_conn(), self)
-        logger.info("Finished setting up.")
+    DATASTORE_CLASS = FrontendProxySlavedStore
 
     def _listen_http(self, listener_config):
         port = listener_config["port"]
@@ -136,6 +162,12 @@ class FrontendProxyServer(HomeServer):
                 elif name == "client":
                     resource = JsonResource(self, canonical_json=False)
                     KeyUploadServlet(self).register(resource)
+
+                    # If presence is disabled, use the stub servlet that does
+                    # not allow sending presence
+                    if not self.config.use_presence:
+                        PresenceStatusStubServlet(self).register(resource)
+
                     resources.update({
                         "/_matrix/client/r0": resource,
                         "/_matrix/client/unstable": resource,
@@ -154,7 +186,8 @@ class FrontendProxyServer(HomeServer):
                 listener_config,
                 root_resource,
                 self.version_string,
-            )
+            ),
+            reactor=self.get_reactor()
         )
 
         logger.info("Synapse client reader now listening on port %d", port)
@@ -176,7 +209,7 @@ class FrontendProxyServer(HomeServer):
             elif listener["type"] == "metrics":
                 if not self.get_config().enable_metrics:
                     logger.warn(("Metrics listener configured, but "
-                                 "collect_metrics is not enabled!"))
+                                 "enable_metrics is not True!"))
                 else:
                     _base.listen_metrics(listener["bind_addresses"],
                                          listener["port"])
@@ -209,11 +242,13 @@ def start(config_options):
     database_engine = create_engine(config.database_config)
 
     tls_server_context_factory = context_factory.ServerContextFactory(config)
+    tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
 
     ss = FrontendProxyServer(
         config.server_name,
         db_config=config.database_config,
         tls_server_context_factory=tls_server_context_factory,
+        tls_client_options_factory=tls_client_options_factory,
         config=config,
         version_string="Synapse/" + get_version_string(synapse),
         database_engine=database_engine,
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 714f98a3e0..3eb5b663de 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -18,34 +18,51 @@ import logging
 import os
 import sys
 
+from six import iteritems
+
+from prometheus_client import Gauge
+
+from twisted.application import service
+from twisted.internet import defer, reactor
+from twisted.web.resource import EncodingResourceWrapper, NoResource
+from twisted.web.server import GzipEncoderFactory
+from twisted.web.static import File
+
 import synapse
 import synapse.config.logger
 from synapse import events
-from synapse.api.urls import CONTENT_REPO_PREFIX, FEDERATION_PREFIX, \
-    LEGACY_MEDIA_PREFIX, MEDIA_PREFIX, SERVER_KEY_PREFIX, SERVER_KEY_V2_PREFIX, \
-    STATIC_PREFIX, WEB_CLIENT_PREFIX
+from synapse.api.urls import (
+    CONTENT_REPO_PREFIX,
+    FEDERATION_PREFIX,
+    LEGACY_MEDIA_PREFIX,
+    MEDIA_PREFIX,
+    SERVER_KEY_PREFIX,
+    SERVER_KEY_V2_PREFIX,
+    STATIC_PREFIX,
+    WEB_CLIENT_PREFIX,
+)
 from synapse.app import _base
-from synapse.app._base import quit_with_error, listen_ssl, listen_tcp
+from synapse.app._base import listen_ssl, listen_tcp, quit_with_error
 from synapse.config._base import ConfigError
 from synapse.config.homeserver import HomeServerConfig
 from synapse.crypto import context_factory
 from synapse.federation.transport.server import TransportLayerServer
-from synapse.module_api import ModuleApi
 from synapse.http.additional_resource import AdditionalResource
 from synapse.http.server import RootRedirect
 from synapse.http.site import SynapseSite
 from synapse.metrics import RegistryProxy
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
-from synapse.python_dependencies import CONDITIONAL_REQUIREMENTS, \
-    check_requirements
-from synapse.replication.http import ReplicationRestResource, REPLICATION_PREFIX
+from synapse.module_api import ModuleApi
+from synapse.python_dependencies import CONDITIONAL_REQUIREMENTS, check_requirements
+from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource
 from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
 from synapse.rest import ClientRestResource
 from synapse.rest.key.v1.server_key_resource import LocalKey
 from synapse.rest.key.v2 import KeyApiV2Resource
 from synapse.rest.media.v0.content_repository import ContentRepoResource
 from synapse.server import HomeServer
-from synapse.storage import are_all_users_on_domain
+from synapse.storage import DataStore, are_all_users_on_domain
 from synapse.storage.engines import IncorrectDatabaseSetup, create_engine
 from synapse.storage.prepare_database import UpgradeDatabaseException, prepare_database
 from synapse.util.caches import CACHE_SIZE_FACTOR
@@ -55,11 +72,6 @@ from synapse.util.manhole import manhole
 from synapse.util.module_loader import load_module
 from synapse.util.rlimit import change_resource_limit
 from synapse.util.versionstring import get_version_string
-from twisted.application import service
-from twisted.internet import defer, reactor
-from twisted.web.resource import EncodingResourceWrapper, NoResource
-from twisted.web.server import GzipEncoderFactory
-from twisted.web.static import File
 
 logger = logging.getLogger("synapse.app.homeserver")
 
@@ -99,6 +111,8 @@ def build_resource_for_web_client(hs):
 
 
 class SynapseHomeServer(HomeServer):
+    DATASTORE_CLASS = DataStore
+
     def _listener_http(self, config, listener_config):
         port = listener_config["port"]
         bind_addresses = listener_config["bind_addresses"]
@@ -266,7 +280,7 @@ class SynapseHomeServer(HomeServer):
             elif listener["type"] == "metrics":
                 if not self.get_config().enable_metrics:
                     logger.warn(("Metrics listener configured, but "
-                                 "collect_metrics is not enabled!"))
+                                 "enable_metrics is not True!"))
                 else:
                     _base.listen_metrics(listener["bind_addresses"],
                                          listener["port"])
@@ -290,6 +304,11 @@ class SynapseHomeServer(HomeServer):
             quit_with_error(e.message)
 
 
+# Gauges to expose monthly active user control metrics
+current_mau_gauge = Gauge("synapse_admin_mau:current", "Current MAU")
+max_mau_gauge = Gauge("synapse_admin_mau:max", "MAU Limit")
+
+
 def setup(config_options):
     """
     Args:
@@ -318,14 +337,10 @@ def setup(config_options):
     # check any extra requirements we have now we have a config
     check_requirements(config)
 
-    version_string = "Synapse/" + get_version_string(synapse)
-
-    logger.info("Server hostname: %s", config.server_name)
-    logger.info("Server version: %s", version_string)
-
     events.USE_FROZEN_DICTS = config.use_frozen_dicts
 
     tls_server_context_factory = context_factory.ServerContextFactory(config)
+    tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
 
     database_engine = create_engine(config.database_config)
     config.database_config["args"]["cp_openfun"] = database_engine.on_new_connection
@@ -334,21 +349,22 @@ def setup(config_options):
         config.server_name,
         db_config=config.database_config,
         tls_server_context_factory=tls_server_context_factory,
+        tls_client_options_factory=tls_client_options_factory,
         config=config,
-        version_string=version_string,
+        version_string="Synapse/" + get_version_string(synapse),
         database_engine=database_engine,
     )
 
     logger.info("Preparing database: %s...", config.database_config['name'])
 
     try:
-        db_conn = hs.get_db_conn(run_new_connection=False)
-        prepare_database(db_conn, database_engine, config=config)
-        database_engine.on_new_connection(db_conn)
+        with hs.get_db_conn(run_new_connection=False) as db_conn:
+            prepare_database(db_conn, database_engine, config=config)
+            database_engine.on_new_connection(db_conn)
 
-        hs.run_startup_checks(db_conn, database_engine)
+            hs.run_startup_checks(db_conn, database_engine)
 
-        db_conn.commit()
+            db_conn.commit()
     except UpgradeDatabaseException:
         sys.stderr.write(
             "\nFailed to upgrade database.\n"
@@ -423,6 +439,9 @@ def run(hs):
     # currently either 0 or 1
     stats_process = []
 
+    def start_phone_stats_home():
+        return run_as_background_process("phone_stats_home", phone_stats_home)
+
     @defer.inlineCallbacks
     def phone_stats_home():
         logger.info("Gathering stats for reporting")
@@ -440,7 +459,7 @@ def run(hs):
         stats["total_nonbridged_users"] = total_nonbridged_users
 
         daily_user_type_results = yield hs.get_datastore().count_daily_user_type()
-        for name, count in daily_user_type_results.iteritems():
+        for name, count in iteritems(daily_user_type_results):
             stats["daily_user_type_" + name] = count
 
         room_count = yield hs.get_datastore().get_room_count()
@@ -451,7 +470,7 @@ def run(hs):
         stats["daily_messages"] = yield hs.get_datastore().count_daily_messages()
 
         r30_results = yield hs.get_datastore().count_r30_users()
-        for name, count in r30_results.iteritems():
+        for name, count in iteritems(r30_results):
             stats["r30_users_" + name] = count
 
         daily_sent_messages = yield hs.get_datastore().count_daily_sent_messages()
@@ -494,16 +513,41 @@ def run(hs):
             )
 
     def generate_user_daily_visit_stats():
-        hs.get_datastore().generate_user_daily_visits()
+        return run_as_background_process(
+            "generate_user_daily_visits",
+            hs.get_datastore().generate_user_daily_visits,
+        )
 
     # Rather than update on per session basis, batch up the requests.
     # If you increase the loop period, the accuracy of user_daily_visits
     # table will decrease
     clock.looping_call(generate_user_daily_visit_stats, 5 * 60 * 1000)
 
+    # monthly active user limiting functionality
+    clock.looping_call(
+        hs.get_datastore().reap_monthly_active_users, 1000 * 60 * 60
+    )
+    hs.get_datastore().reap_monthly_active_users()
+
+    @defer.inlineCallbacks
+    def generate_monthly_active_users():
+        count = 0
+        if hs.config.limit_usage_by_mau:
+            count = yield hs.get_datastore().get_monthly_active_count()
+        current_mau_gauge.set(float(count))
+        max_mau_gauge.set(float(hs.config.max_mau_value))
+
+    hs.get_datastore().initialise_reserved_users(
+        hs.config.mau_limits_reserved_threepids
+    )
+    generate_monthly_active_users()
+    if hs.config.limit_usage_by_mau:
+        clock.looping_call(generate_monthly_active_users, 5 * 60 * 1000)
+    # End of monthly active user settings
+
     if hs.config.report_stats:
         logger.info("Scheduling stats reporting for 3 hour intervals")
-        clock.looping_call(phone_stats_home, 3 * 60 * 60 * 1000)
+        clock.looping_call(start_phone_stats_home, 3 * 60 * 60 * 1000)
 
         # We need to defer this init for the cases that we daemonize
         # otherwise the process ID we get is that of the non-daemon process
@@ -511,7 +555,7 @@ def run(hs):
 
         # We wait 5 minutes to send the first set of stats as the server can
         # be quite busy the first few minutes
-        clock.call_later(5 * 60, phone_stats_home)
+        clock.call_later(5 * 60, start_phone_stats_home)
 
     if hs.config.daemonize and hs.config.print_pidfile:
         print (hs.config.pid_file)
diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py
index 006bba80a8..e3dbb3b4e6 100644
--- a/synapse/app/media_repository.py
+++ b/synapse/app/media_repository.py
@@ -16,11 +16,12 @@
 import logging
 import sys
 
+from twisted.internet import reactor
+from twisted.web.resource import NoResource
+
 import synapse
 from synapse import events
-from synapse.api.urls import (
-    CONTENT_REPO_PREFIX, LEGACY_MEDIA_PREFIX, MEDIA_PREFIX
-)
+from synapse.api.urls import CONTENT_REPO_PREFIX, LEGACY_MEDIA_PREFIX, MEDIA_PREFIX
 from synapse.app import _base
 from synapse.config._base import ConfigError
 from synapse.config.homeserver import HomeServerConfig
@@ -33,7 +34,7 @@ from synapse.replication.slave.storage._base import BaseSlavedStore
 from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
 from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
-from synapse.replication.slave.storage.transactions import TransactionStore
+from synapse.replication.slave.storage.transactions import SlavedTransactionStore
 from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.rest.media.v0.content_repository import ContentRepoResource
 from synapse.server import HomeServer
@@ -43,8 +44,6 @@ from synapse.util.httpresourcetree import create_resource_tree
 from synapse.util.logcontext import LoggingContext
 from synapse.util.manhole import manhole
 from synapse.util.versionstring import get_version_string
-from twisted.internet import reactor
-from twisted.web.resource import NoResource
 
 logger = logging.getLogger("synapse.app.media_repository")
 
@@ -53,7 +52,7 @@ class MediaRepositorySlavedStore(
     SlavedApplicationServiceStore,
     SlavedRegistrationStore,
     SlavedClientIpStore,
-    TransactionStore,
+    SlavedTransactionStore,
     BaseSlavedStore,
     MediaRepositoryStore,
 ):
@@ -61,10 +60,7 @@ class MediaRepositorySlavedStore(
 
 
 class MediaRepositoryServer(HomeServer):
-    def setup(self):
-        logger.info("Setting up.")
-        self.datastore = MediaRepositorySlavedStore(self.get_db_conn(), self)
-        logger.info("Finished setting up.")
+    DATASTORE_CLASS = MediaRepositorySlavedStore
 
     def _listen_http(self, listener_config):
         port = listener_config["port"]
@@ -118,7 +114,7 @@ class MediaRepositoryServer(HomeServer):
             elif listener["type"] == "metrics":
                 if not self.get_config().enable_metrics:
                     logger.warn(("Metrics listener configured, but "
-                                 "collect_metrics is not enabled!"))
+                                 "enable_metrics is not True!"))
                 else:
                     _base.listen_metrics(listener["bind_addresses"],
                                          listener["port"])
@@ -156,11 +152,13 @@ def start(config_options):
     database_engine = create_engine(config.database_config)
 
     tls_server_context_factory = context_factory.ServerContextFactory(config)
+    tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
 
     ss = MediaRepositoryServer(
         config.server_name,
         db_config=config.database_config,
         tls_server_context_factory=tls_server_context_factory,
+        tls_client_options_factory=tls_client_options_factory,
         config=config,
         version_string="Synapse/" + get_version_string(synapse),
         database_engine=database_engine,
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index 64df47f9cc..244c604de9 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -16,6 +16,9 @@
 import logging
 import sys
 
+from twisted.internet import defer, reactor
+from twisted.web.resource import NoResource
+
 import synapse
 from synapse import events
 from synapse.app import _base
@@ -37,8 +40,6 @@ from synapse.util.httpresourcetree import create_resource_tree
 from synapse.util.logcontext import LoggingContext, run_in_background
 from synapse.util.manhole import manhole
 from synapse.util.versionstring import get_version_string
-from twisted.internet import defer, reactor
-from twisted.web.resource import NoResource
 
 logger = logging.getLogger("synapse.app.pusher")
 
@@ -77,10 +78,7 @@ class PusherSlaveStore(
 
 
 class PusherServer(HomeServer):
-    def setup(self):
-        logger.info("Setting up.")
-        self.datastore = PusherSlaveStore(self.get_db_conn(), self)
-        logger.info("Finished setting up.")
+    DATASTORE_CLASS = PusherSlaveStore
 
     def remove_pusher(self, app_id, push_key, user_id):
         self.get_tcp_replication().send_remove_pusher(app_id, push_key, user_id)
@@ -128,7 +126,7 @@ class PusherServer(HomeServer):
             elif listener["type"] == "metrics":
                 if not self.get_config().enable_metrics:
                     logger.warn(("Metrics listener configured, but "
-                                 "collect_metrics is not enabled!"))
+                                 "enable_metrics is not True!"))
                 else:
                     _base.listen_metrics(listener["bind_addresses"],
                                          listener["port"])
@@ -147,8 +145,9 @@ class PusherReplicationHandler(ReplicationClientHandler):
 
         self.pusher_pool = hs.get_pusherpool()
 
+    @defer.inlineCallbacks
     def on_rdata(self, stream_name, token, rows):
-        super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows)
+        yield super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows)
         run_in_background(self.poke_pushers, stream_name, token, rows)
 
     @defer.inlineCallbacks
@@ -161,11 +160,11 @@ class PusherReplicationHandler(ReplicationClientHandler):
                     else:
                         yield self.start_pusher(row.user_id, row.app_id, row.pushkey)
             elif stream_name == "events":
-                yield self.pusher_pool.on_new_notifications(
+                self.pusher_pool.on_new_notifications(
                     token, token,
                 )
             elif stream_name == "receipts":
-                yield self.pusher_pool.on_new_receipts(
+                self.pusher_pool.on_new_receipts(
                     token, token, set(row.room_id for row in rows)
                 )
         except Exception:
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 6808d6d3e0..6662340797 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -17,6 +17,11 @@ import contextlib
 import logging
 import sys
 
+from six import iteritems
+
+from twisted.internet import defer, reactor
+from twisted.web.resource import NoResource
+
 import synapse
 from synapse.api.constants import EventTypes
 from synapse.app import _base
@@ -36,12 +41,12 @@ from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
 from synapse.replication.slave.storage.devices import SlavedDeviceStore
 from synapse.replication.slave.storage.events import SlavedEventStore
 from synapse.replication.slave.storage.filtering import SlavedFilteringStore
+from synapse.replication.slave.storage.groups import SlavedGroupServerStore
 from synapse.replication.slave.storage.presence import SlavedPresenceStore
 from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
 from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
 from synapse.replication.slave.storage.room import RoomStore
-from synapse.replication.slave.storage.groups import SlavedGroupServerStore
 from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.rest.client.v1 import events
 from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet
@@ -50,16 +55,11 @@ from synapse.rest.client.v2_alpha import sync
 from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
 from synapse.storage.presence import UserPresenceState
-from synapse.storage.roommember import RoomMemberStore
 from synapse.util.httpresourcetree import create_resource_tree
 from synapse.util.logcontext import LoggingContext, run_in_background
 from synapse.util.manhole import manhole
 from synapse.util.stringutils import random_string
 from synapse.util.versionstring import get_version_string
-from twisted.internet import defer, reactor
-from twisted.web.resource import NoResource
-
-from six import iteritems
 
 logger = logging.getLogger("synapse.app.synchrotron")
 
@@ -80,9 +80,7 @@ class SynchrotronSlavedStore(
     RoomStore,
     BaseSlavedStore,
 ):
-    did_forget = (
-        RoomMemberStore.__dict__["did_forget"]
-    )
+    pass
 
 
 UPDATE_SYNCING_USERS_MS = 10 * 1000
@@ -116,7 +114,10 @@ class SynchrotronPresence(object):
         logger.info("Presence process_id is %r", self.process_id)
 
     def send_user_sync(self, user_id, is_syncing, last_sync_ms):
-        self.hs.get_tcp_replication().send_user_sync(user_id, is_syncing, last_sync_ms)
+        if self.hs.config.use_presence:
+            self.hs.get_tcp_replication().send_user_sync(
+                user_id, is_syncing, last_sync_ms
+            )
 
     def mark_as_coming_online(self, user_id):
         """A user has started syncing. Send a UserSync to the master, unless they
@@ -213,10 +214,13 @@ class SynchrotronPresence(object):
         yield self.notify_from_replication(states, stream_id)
 
     def get_currently_syncing_users(self):
-        return [
-            user_id for user_id, count in iteritems(self.user_to_num_current_syncs)
-            if count > 0
-        ]
+        if self.hs.config.use_presence:
+            return [
+                user_id for user_id, count in iteritems(self.user_to_num_current_syncs)
+                if count > 0
+            ]
+        else:
+            return set()
 
 
 class SynchrotronTyping(object):
@@ -245,10 +249,7 @@ class SynchrotronApplicationService(object):
 
 
 class SynchrotronServer(HomeServer):
-    def setup(self):
-        logger.info("Setting up.")
-        self.datastore = SynchrotronSlavedStore(self.get_db_conn(), self)
-        logger.info("Finished setting up.")
+    DATASTORE_CLASS = SynchrotronSlavedStore
 
     def _listen_http(self, listener_config):
         port = listener_config["port"]
@@ -305,7 +306,7 @@ class SynchrotronServer(HomeServer):
             elif listener["type"] == "metrics":
                 if not self.get_config().enable_metrics:
                     logger.warn(("Metrics listener configured, but "
-                                 "collect_metrics is not enabled!"))
+                                 "enable_metrics is not True!"))
                 else:
                     _base.listen_metrics(listener["bind_addresses"],
                                          listener["port"])
@@ -334,8 +335,9 @@ class SyncReplicationHandler(ReplicationClientHandler):
         self.presence_handler = hs.get_presence_handler()
         self.notifier = hs.get_notifier()
 
+    @defer.inlineCallbacks
     def on_rdata(self, stream_name, token, rows):
-        super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows)
+        yield super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows)
         run_in_background(self.process_and_notify, stream_name, token, rows)
 
     def get_streams_to_replicate(self):
diff --git a/synapse/app/synctl.py b/synapse/app/synctl.py
index 56ae086128..d658f967ba 100755
--- a/synapse/app/synctl.py
+++ b/synapse/app/synctl.py
@@ -16,16 +16,19 @@
 
 import argparse
 import collections
+import errno
 import glob
 import os
 import os.path
 import signal
 import subprocess
 import sys
-import yaml
-import errno
 import time
 
+from six import iteritems
+
+import yaml
+
 SYNAPSE = [sys.executable, "-B", "-m", "synapse.app.homeserver"]
 
 GREEN = "\x1b[1;32m"
@@ -172,7 +175,7 @@ def main():
         os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor)
 
     cache_factors = config.get("synctl_cache_factors", {})
-    for cache_name, factor in cache_factors.iteritems():
+    for cache_name, factor in iteritems(cache_factors):
         os.environ["SYNAPSE_CACHE_FACTOR_" + cache_name.upper()] = str(factor)
 
     worker_configfiles = []
diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py
index ada1c13cec..96ffcaf073 100644
--- a/synapse/app/user_dir.py
+++ b/synapse/app/user_dir.py
@@ -17,6 +17,9 @@
 import logging
 import sys
 
+from twisted.internet import defer, reactor
+from twisted.web.resource import NoResource
+
 import synapse
 from synapse import events
 from synapse.app import _base
@@ -43,8 +46,6 @@ from synapse.util.httpresourcetree import create_resource_tree
 from synapse.util.logcontext import LoggingContext, run_in_background
 from synapse.util.manhole import manhole
 from synapse.util.versionstring import get_version_string
-from twisted.internet import reactor, defer
-from twisted.web.resource import NoResource
 
 logger = logging.getLogger("synapse.app.user_dir")
 
@@ -93,10 +94,7 @@ class UserDirectorySlaveStore(
 
 
 class UserDirectoryServer(HomeServer):
-    def setup(self):
-        logger.info("Setting up.")
-        self.datastore = UserDirectorySlaveStore(self.get_db_conn(), self)
-        logger.info("Finished setting up.")
+    DATASTORE_CLASS = UserDirectorySlaveStore
 
     def _listen_http(self, listener_config):
         port = listener_config["port"]
@@ -150,7 +148,7 @@ class UserDirectoryServer(HomeServer):
             elif listener["type"] == "metrics":
                 if not self.get_config().enable_metrics:
                     logger.warn(("Metrics listener configured, but "
-                                 "collect_metrics is not enabled!"))
+                                 "enable_metrics is not True!"))
                 else:
                     _base.listen_metrics(listener["bind_addresses"],
                                          listener["port"])
@@ -168,8 +166,9 @@ class UserDirectoryReplicationHandler(ReplicationClientHandler):
         super(UserDirectoryReplicationHandler, self).__init__(hs.get_datastore())
         self.user_directory = hs.get_user_directory_handler()
 
+    @defer.inlineCallbacks
     def on_rdata(self, stream_name, token, rows):
-        super(UserDirectoryReplicationHandler, self).on_rdata(
+        yield super(UserDirectoryReplicationHandler, self).on_rdata(
             stream_name, token, rows
         )
         if stream_name == "current_state_deltas":
@@ -213,11 +212,13 @@ def start(config_options):
     config.update_user_directory = True
 
     tls_server_context_factory = context_factory.ServerContextFactory(config)
+    tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
 
     ps = UserDirectoryServer(
         config.server_name,
         db_config=config.database_config,
         tls_server_context_factory=tls_server_context_factory,
+        tls_client_options_factory=tls_client_options_factory,
         config=config,
         version_string="Synapse/" + get_version_string(synapse),
         database_engine=database_engine,