summary refs log tree commit diff
path: root/synapse/app
diff options
context:
space:
mode:
authorRichard van der Hoff <richard@matrix.org>2018-08-22 14:28:55 +0100
committerRichard van der Hoff <richard@matrix.org>2018-08-22 14:28:55 +0100
commit48fec6753639d32445f043f720456920532eab97 (patch)
tree5d182f245ec0acfc4fc2169e8cc1ce65d0ca8103 /synapse/app
parentMerge pull request #3685 from matrix-org/revert-3677-master (diff)
parentchangelog for 0.33.3 (diff)
downloadsynapse-48fec6753639d32445f043f720456920532eab97.tar.xz
Merge tag 'v0.33.3'
Features
--------

- Add support for the SNI extension to federation TLS connections. Thanks to @vojeroen! ([\#3439](https://github.com/matrix-org/synapse/issues/3439))
- Add /_media/r0/config ([\#3184](https://github.com/matrix-org/synapse/issues/3184))
- speed up /members API and add `at` and `membership` params as per MSC1227 ([\#3568](https://github.com/matrix-org/synapse/issues/3568))
- implement `summary` block in /sync response as per MSC688 ([\#3574](https://github.com/matrix-org/synapse/issues/3574))
- Add lazy-loading support to /messages as per MSC1227 ([\#3589](https://github.com/matrix-org/synapse/issues/3589))
- Add ability to limit number of monthly active users on the server ([\#3633](https://github.com/matrix-org/synapse/issues/3633))
- Support more federation endpoints on workers ([\#3653](https://github.com/matrix-org/synapse/issues/3653))
- Basic support for room versioning ([\#3654](https://github.com/matrix-org/synapse/issues/3654))
- Ability to disable client/server Synapse via conf toggle ([\#3655](https://github.com/matrix-org/synapse/issues/3655))
- Ability to whitelist specific threepids against monthly active user limiting ([\#3662](https://github.com/matrix-org/synapse/issues/3662))
- Add some metrics for the appservice and federation event sending loops ([\#3664](https://github.com/matrix-org/synapse/issues/3664))
- Where server is disabled, block ability for locked out users to read new messages ([\#3670](https://github.com/matrix-org/synapse/issues/3670))
- set admin uri via config, to be used in error messages where the user should contact the administrator ([\#3687](https://github.com/matrix-org/synapse/issues/3687))
- Synapse's presence functionality can now be disabled with the "use_presence" configuration option. ([\#3694](https://github.com/matrix-org/synapse/issues/3694))
- For resource limit blocked users, prevent writing into rooms ([\#3708](https://github.com/matrix-org/synapse/issues/3708))

Bugfixes
--------

- Fix occasional glitches in the synapse_event_persisted_position metric ([\#3658](https://github.com/matrix-org/synapse/issues/3658))
- Fix bug on deleting 3pid when using identity servers that don't support unbind API ([\#3661](https://github.com/matrix-org/synapse/issues/3661))
- Make the tests pass on Twisted < 18.7.0 ([\#3676](https://github.com/matrix-org/synapse/issues/3676))
- Don’t ship recaptcha_ajax.js, use it directly from Google ([\#3677](https://github.com/matrix-org/synapse/issues/3677))
- Fixes test_reap_monthly_active_users so it passes under postgres ([\#3681](https://github.com/matrix-org/synapse/issues/3681))
- Fix mau blocking calulation bug on login ([\#3689](https://github.com/matrix-org/synapse/issues/3689))
- Fix missing yield in synapse.storage.monthly_active_users.initialise_reserved_users ([\#3692](https://github.com/matrix-org/synapse/issues/3692))
- Improve HTTP request logging to include all requests ([\#3700](https://github.com/matrix-org/synapse/issues/3700))
- Avoid timing out requests while we are streaming back the response ([\#3701](https://github.com/matrix-org/synapse/issues/3701))
- Support more federation endpoints on workers ([\#3705](https://github.com/matrix-org/synapse/issues/3705), [\#3713](https://github.com/matrix-org/synapse/issues/3713))
- Fix "Starting db txn 'get_all_updated_receipts' from sentinel context" warning ([\#3710](https://github.com/matrix-org/synapse/issues/3710))
- Fix bug where `state_cache` cache factor ignored environment variables ([\#3719](https://github.com/matrix-org/synapse/issues/3719))
- Fix bug in v0.33.3rc1 which caused infinite loops and OOMs ([\#3723](https://github.com/matrix-org/synapse/issues/3723))
- Fix bug introduced in v0.33.3rc1 which made the ToS give a 500 error ([\#3732](https://github.com/matrix-org/synapse/issues/3732))

Deprecations and Removals
-------------------------

- The Shared-Secret registration method of the legacy v1/register REST endpoint has been removed. For a replacement, please see [the admin/register API documentation](https://github.com/matrix-org/synapse/blob/master/docs/admin_api/register_api.rst). ([\#3703](https://github.com/matrix-org/synapse/issues/3703))

Internal Changes
----------------

- The test suite now can run under PostgreSQL. ([\#3423](https://github.com/matrix-org/synapse/issues/3423))
- Refactor HTTP replication endpoints to reduce code duplication ([\#3632](https://github.com/matrix-org/synapse/issues/3632))
- Tests now correctly execute on Python 3. ([\#3647](https://github.com/matrix-org/synapse/issues/3647))
- Sytests can now be run inside a Docker container. ([\#3660](https://github.com/matrix-org/synapse/issues/3660))
- Port over enough to Python 3 to allow the sytests to start. ([\#3668](https://github.com/matrix-org/synapse/issues/3668))
- Update docker base image from alpine 3.7 to 3.8. ([\#3669](https://github.com/matrix-org/synapse/issues/3669))
- Rename synapse.util.async to synapse.util.async_helpers to mitigate async becoming a keyword on Python 3.7. ([\#3678](https://github.com/matrix-org/synapse/issues/3678))
- Synapse's tests are now formatted with the black autoformatter. ([\#3679](https://github.com/matrix-org/synapse/issues/3679))
- Implemented a new testing base class to reduce test boilerplate. ([\#3684](https://github.com/matrix-org/synapse/issues/3684))
- Rename MAU prometheus metrics ([\#3690](https://github.com/matrix-org/synapse/issues/3690))
- add new error type ResourceLimit ([\#3707](https://github.com/matrix-org/synapse/issues/3707))
- Logcontexts for replication command handlers ([\#3709](https://github.com/matrix-org/synapse/issues/3709))
- Update admin register API documentation to reference a real user ID. ([\#3712](https://github.com/matrix-org/synapse/issues/3712))
Diffstat (limited to 'synapse/app')
-rw-r--r--synapse/app/_base.py6
-rw-r--r--synapse/app/appservice.py3
-rw-r--r--synapse/app/client_reader.py6
-rw-r--r--synapse/app/event_creator.py6
-rw-r--r--synapse/app/federation_reader.py18
-rw-r--r--synapse/app/federation_sender.py11
-rw-r--r--synapse/app/frontend_proxy.py41
-rwxr-xr-xsynapse/app/homeserver.py20
-rw-r--r--synapse/app/media_repository.py6
-rw-r--r--synapse/app/pusher.py7
-rw-r--r--synapse/app/synchrotron.py19
-rw-r--r--synapse/app/user_dir.py5
12 files changed, 118 insertions, 30 deletions
diff --git a/synapse/app/_base.py b/synapse/app/_base.py
index 391bd14c5c..7c866e246a 100644
--- a/synapse/app/_base.py
+++ b/synapse/app/_base.py
@@ -140,7 +140,7 @@ def listen_metrics(bind_addresses, port):
         logger.info("Metrics now reporting on %s:%d", host, port)
 
 
-def listen_tcp(bind_addresses, port, factory, backlog=50):
+def listen_tcp(bind_addresses, port, factory, reactor=reactor, backlog=50):
     """
     Create a TCP socket for a port and several addresses
     """
@@ -156,7 +156,9 @@ def listen_tcp(bind_addresses, port, factory, backlog=50):
             check_bind_error(e, address, bind_addresses)
 
 
-def listen_ssl(bind_addresses, port, factory, context_factory, backlog=50):
+def listen_ssl(
+    bind_addresses, port, factory, context_factory, reactor=reactor, backlog=50
+):
     """
     Create an SSL socket for a port and several addresses
     """
diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py
index 9a37384fb7..3348a8ec6d 100644
--- a/synapse/app/appservice.py
+++ b/synapse/app/appservice.py
@@ -117,8 +117,9 @@ class ASReplicationHandler(ReplicationClientHandler):
         super(ASReplicationHandler, self).__init__(hs.get_datastore())
         self.appservice_handler = hs.get_application_service_handler()
 
+    @defer.inlineCallbacks
     def on_rdata(self, stream_name, token, rows):
-        super(ASReplicationHandler, self).on_rdata(stream_name, token, rows)
+        yield super(ASReplicationHandler, self).on_rdata(stream_name, token, rows)
 
         if stream_name == "events":
             max_stream_id = self.store.get_room_max_stream_ordering()
diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py
index e2c91123db..ab79a45646 100644
--- a/synapse/app/client_reader.py
+++ b/synapse/app/client_reader.py
@@ -39,7 +39,7 @@ from synapse.replication.slave.storage.events import SlavedEventStore
 from synapse.replication.slave.storage.keys import SlavedKeyStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
 from synapse.replication.slave.storage.room import RoomStore
-from synapse.replication.slave.storage.transactions import TransactionStore
+from synapse.replication.slave.storage.transactions import SlavedTransactionStore
 from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.rest.client.v1.room import (
     JoinedRoomMemberListRestServlet,
@@ -66,7 +66,7 @@ class ClientReaderSlavedStore(
     DirectoryStore,
     SlavedApplicationServiceStore,
     SlavedRegistrationStore,
-    TransactionStore,
+    SlavedTransactionStore,
     SlavedClientIpStore,
     BaseSlavedStore,
 ):
@@ -168,11 +168,13 @@ def start(config_options):
     database_engine = create_engine(config.database_config)
 
     tls_server_context_factory = context_factory.ServerContextFactory(config)
+    tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
 
     ss = ClientReaderServer(
         config.server_name,
         db_config=config.database_config,
         tls_server_context_factory=tls_server_context_factory,
+        tls_client_options_factory=tls_client_options_factory,
         config=config,
         version_string="Synapse/" + get_version_string(synapse),
         database_engine=database_engine,
diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py
index 374f115644..03d39968a8 100644
--- a/synapse/app/event_creator.py
+++ b/synapse/app/event_creator.py
@@ -43,7 +43,7 @@ from synapse.replication.slave.storage.pushers import SlavedPusherStore
 from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
 from synapse.replication.slave.storage.room import RoomStore
-from synapse.replication.slave.storage.transactions import TransactionStore
+from synapse.replication.slave.storage.transactions import SlavedTransactionStore
 from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.rest.client.v1.room import (
     JoinRoomAliasServlet,
@@ -63,7 +63,7 @@ logger = logging.getLogger("synapse.app.event_creator")
 
 class EventCreatorSlavedStore(
     DirectoryStore,
-    TransactionStore,
+    SlavedTransactionStore,
     SlavedProfileStore,
     SlavedAccountDataStore,
     SlavedPusherStore,
@@ -174,11 +174,13 @@ def start(config_options):
     database_engine = create_engine(config.database_config)
 
     tls_server_context_factory = context_factory.ServerContextFactory(config)
+    tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
 
     ss = EventCreatorServer(
         config.server_name,
         db_config=config.database_config,
         tls_server_context_factory=tls_server_context_factory,
+        tls_client_options_factory=tls_client_options_factory,
         config=config,
         version_string="Synapse/" + get_version_string(synapse),
         database_engine=database_engine,
diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py
index 7af00b8bcf..7d8105778d 100644
--- a/synapse/app/federation_reader.py
+++ b/synapse/app/federation_reader.py
@@ -32,11 +32,17 @@ from synapse.http.site import SynapseSite
 from synapse.metrics import RegistryProxy
 from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
 from synapse.replication.slave.storage._base import BaseSlavedStore
+from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
+from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
 from synapse.replication.slave.storage.directory import DirectoryStore
 from synapse.replication.slave.storage.events import SlavedEventStore
 from synapse.replication.slave.storage.keys import SlavedKeyStore
+from synapse.replication.slave.storage.profile import SlavedProfileStore
+from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
+from synapse.replication.slave.storage.pushers import SlavedPusherStore
+from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
 from synapse.replication.slave.storage.room import RoomStore
-from synapse.replication.slave.storage.transactions import TransactionStore
+from synapse.replication.slave.storage.transactions import SlavedTransactionStore
 from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
@@ -49,11 +55,17 @@ logger = logging.getLogger("synapse.app.federation_reader")
 
 
 class FederationReaderSlavedStore(
+    SlavedAccountDataStore,
+    SlavedProfileStore,
+    SlavedApplicationServiceStore,
+    SlavedPusherStore,
+    SlavedPushRuleStore,
+    SlavedReceiptsStore,
     SlavedEventStore,
     SlavedKeyStore,
     RoomStore,
     DirectoryStore,
-    TransactionStore,
+    SlavedTransactionStore,
     BaseSlavedStore,
 ):
     pass
@@ -143,11 +155,13 @@ def start(config_options):
     database_engine = create_engine(config.database_config)
 
     tls_server_context_factory = context_factory.ServerContextFactory(config)
+    tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
 
     ss = FederationReaderServer(
         config.server_name,
         db_config=config.database_config,
         tls_server_context_factory=tls_server_context_factory,
+        tls_client_options_factory=tls_client_options_factory,
         config=config,
         version_string="Synapse/" + get_version_string(synapse),
         database_engine=database_engine,
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 18469013fa..d59007099b 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -36,11 +36,11 @@ from synapse.replication.slave.storage.events import SlavedEventStore
 from synapse.replication.slave.storage.presence import SlavedPresenceStore
 from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
-from synapse.replication.slave.storage.transactions import TransactionStore
+from synapse.replication.slave.storage.transactions import SlavedTransactionStore
 from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
-from synapse.util.async import Linearizer
+from synapse.util.async_helpers import Linearizer
 from synapse.util.httpresourcetree import create_resource_tree
 from synapse.util.logcontext import LoggingContext, run_in_background
 from synapse.util.manhole import manhole
@@ -50,7 +50,7 @@ logger = logging.getLogger("synapse.app.federation_sender")
 
 
 class FederationSenderSlaveStore(
-    SlavedDeviceInboxStore, TransactionStore, SlavedReceiptsStore, SlavedEventStore,
+    SlavedDeviceInboxStore, SlavedTransactionStore, SlavedReceiptsStore, SlavedEventStore,
     SlavedRegistrationStore, SlavedDeviceStore, SlavedPresenceStore,
 ):
     def __init__(self, db_conn, hs):
@@ -144,8 +144,9 @@ class FederationSenderReplicationHandler(ReplicationClientHandler):
         super(FederationSenderReplicationHandler, self).__init__(hs.get_datastore())
         self.send_handler = FederationSenderHandler(hs, self)
 
+    @defer.inlineCallbacks
     def on_rdata(self, stream_name, token, rows):
-        super(FederationSenderReplicationHandler, self).on_rdata(
+        yield super(FederationSenderReplicationHandler, self).on_rdata(
             stream_name, token, rows
         )
         self.send_handler.process_replication_rows(stream_name, token, rows)
@@ -186,11 +187,13 @@ def start(config_options):
     config.send_federation = True
 
     tls_server_context_factory = context_factory.ServerContextFactory(config)
+    tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
 
     ps = FederationSenderServer(
         config.server_name,
         db_config=config.database_config,
         tls_server_context_factory=tls_server_context_factory,
+        tls_client_options_factory=tls_client_options_factory,
         config=config,
         version_string="Synapse/" + get_version_string(synapse),
         database_engine=database_engine,
diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py
index b5f78f4640..8d484c1cd4 100644
--- a/synapse/app/frontend_proxy.py
+++ b/synapse/app/frontend_proxy.py
@@ -38,6 +38,7 @@ from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
 from synapse.replication.slave.storage.devices import SlavedDeviceStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
 from synapse.replication.tcp.client import ReplicationClientHandler
+from synapse.rest.client.v1.base import ClientV1RestServlet, client_path_patterns
 from synapse.rest.client.v2_alpha._base import client_v2_patterns
 from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
@@ -49,6 +50,35 @@ from synapse.util.versionstring import get_version_string
 logger = logging.getLogger("synapse.app.frontend_proxy")
 
 
+class PresenceStatusStubServlet(ClientV1RestServlet):
+    PATTERNS = client_path_patterns("/presence/(?P<user_id>[^/]*)/status")
+
+    def __init__(self, hs):
+        super(PresenceStatusStubServlet, self).__init__(hs)
+        self.http_client = hs.get_simple_http_client()
+        self.auth = hs.get_auth()
+        self.main_uri = hs.config.worker_main_http_uri
+
+    @defer.inlineCallbacks
+    def on_GET(self, request, user_id):
+        # Pass through the auth headers, if any, in case the access token
+        # is there.
+        auth_headers = request.requestHeaders.getRawHeaders("Authorization", [])
+        headers = {
+            "Authorization": auth_headers,
+        }
+        result = yield self.http_client.get_json(
+            self.main_uri + request.uri,
+            headers=headers,
+        )
+        defer.returnValue((200, result))
+
+    @defer.inlineCallbacks
+    def on_PUT(self, request, user_id):
+        yield self.auth.get_user_by_req(request)
+        defer.returnValue((200, {}))
+
+
 class KeyUploadServlet(RestServlet):
     PATTERNS = client_v2_patterns("/keys/upload(/(?P<device_id>[^/]+))?$")
 
@@ -135,6 +165,12 @@ class FrontendProxyServer(HomeServer):
                 elif name == "client":
                     resource = JsonResource(self, canonical_json=False)
                     KeyUploadServlet(self).register(resource)
+
+                    # If presence is disabled, use the stub servlet that does
+                    # not allow sending presence
+                    if not self.config.use_presence:
+                        PresenceStatusStubServlet(self).register(resource)
+
                     resources.update({
                         "/_matrix/client/r0": resource,
                         "/_matrix/client/unstable": resource,
@@ -153,7 +189,8 @@ class FrontendProxyServer(HomeServer):
                 listener_config,
                 root_resource,
                 self.version_string,
-            )
+            ),
+            reactor=self.get_reactor()
         )
 
         logger.info("Synapse client reader now listening on port %d", port)
@@ -208,11 +245,13 @@ def start(config_options):
     database_engine = create_engine(config.database_config)
 
     tls_server_context_factory = context_factory.ServerContextFactory(config)
+    tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
 
     ss = FrontendProxyServer(
         config.server_name,
         db_config=config.database_config,
         tls_server_context_factory=tls_server_context_factory,
+        tls_client_options_factory=tls_client_options_factory,
         config=config,
         version_string="Synapse/" + get_version_string(synapse),
         database_engine=database_engine,
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index fba51c26e8..005921dcf7 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -303,8 +303,8 @@ class SynapseHomeServer(HomeServer):
 
 
 # Gauges to expose monthly active user control metrics
-current_mau_gauge = Gauge("synapse_admin_current_mau", "Current MAU")
-max_mau_value_gauge = Gauge("synapse_admin_max_mau_value", "MAU Limit")
+current_mau_gauge = Gauge("synapse_admin_mau:current", "Current MAU")
+max_mau_gauge = Gauge("synapse_admin_mau:max", "MAU Limit")
 
 
 def setup(config_options):
@@ -338,6 +338,7 @@ def setup(config_options):
     events.USE_FROZEN_DICTS = config.use_frozen_dicts
 
     tls_server_context_factory = context_factory.ServerContextFactory(config)
+    tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
 
     database_engine = create_engine(config.database_config)
     config.database_config["args"]["cp_openfun"] = database_engine.on_new_connection
@@ -346,6 +347,7 @@ def setup(config_options):
         config.server_name,
         db_config=config.database_config,
         tls_server_context_factory=tls_server_context_factory,
+        tls_client_options_factory=tls_client_options_factory,
         config=config,
         version_string="Synapse/" + get_version_string(synapse),
         database_engine=database_engine,
@@ -519,17 +521,27 @@ def run(hs):
     # table will decrease
     clock.looping_call(generate_user_daily_visit_stats, 5 * 60 * 1000)
 
+    # monthly active user limiting functionality
+    clock.looping_call(
+        hs.get_datastore().reap_monthly_active_users, 1000 * 60 * 60
+    )
+    hs.get_datastore().reap_monthly_active_users()
+
     @defer.inlineCallbacks
     def generate_monthly_active_users():
         count = 0
         if hs.config.limit_usage_by_mau:
-            count = yield hs.get_datastore().count_monthly_users()
+            count = yield hs.get_datastore().get_monthly_active_count()
         current_mau_gauge.set(float(count))
-        max_mau_value_gauge.set(float(hs.config.max_mau_value))
+        max_mau_gauge.set(float(hs.config.max_mau_value))
 
+    hs.get_datastore().initialise_reserved_users(
+        hs.config.mau_limits_reserved_threepids
+    )
     generate_monthly_active_users()
     if hs.config.limit_usage_by_mau:
         clock.looping_call(generate_monthly_active_users, 5 * 60 * 1000)
+    # End of monthly active user settings
 
     if hs.config.report_stats:
         logger.info("Scheduling stats reporting for 3 hour intervals")
diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py
index 749bbf37d0..fd1f6cbf7e 100644
--- a/synapse/app/media_repository.py
+++ b/synapse/app/media_repository.py
@@ -34,7 +34,7 @@ from synapse.replication.slave.storage._base import BaseSlavedStore
 from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
 from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
-from synapse.replication.slave.storage.transactions import TransactionStore
+from synapse.replication.slave.storage.transactions import SlavedTransactionStore
 from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.rest.media.v0.content_repository import ContentRepoResource
 from synapse.server import HomeServer
@@ -52,7 +52,7 @@ class MediaRepositorySlavedStore(
     SlavedApplicationServiceStore,
     SlavedRegistrationStore,
     SlavedClientIpStore,
-    TransactionStore,
+    SlavedTransactionStore,
     BaseSlavedStore,
     MediaRepositoryStore,
 ):
@@ -155,11 +155,13 @@ def start(config_options):
     database_engine = create_engine(config.database_config)
 
     tls_server_context_factory = context_factory.ServerContextFactory(config)
+    tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
 
     ss = MediaRepositoryServer(
         config.server_name,
         db_config=config.database_config,
         tls_server_context_factory=tls_server_context_factory,
+        tls_client_options_factory=tls_client_options_factory,
         config=config,
         version_string="Synapse/" + get_version_string(synapse),
         database_engine=database_engine,
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index 9295a51d5b..a4fc7e91fa 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -148,8 +148,9 @@ class PusherReplicationHandler(ReplicationClientHandler):
 
         self.pusher_pool = hs.get_pusherpool()
 
+    @defer.inlineCallbacks
     def on_rdata(self, stream_name, token, rows):
-        super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows)
+        yield super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows)
         run_in_background(self.poke_pushers, stream_name, token, rows)
 
     @defer.inlineCallbacks
@@ -162,11 +163,11 @@ class PusherReplicationHandler(ReplicationClientHandler):
                     else:
                         yield self.start_pusher(row.user_id, row.app_id, row.pushkey)
             elif stream_name == "events":
-                yield self.pusher_pool.on_new_notifications(
+                self.pusher_pool.on_new_notifications(
                     token, token,
                 )
             elif stream_name == "receipts":
-                yield self.pusher_pool.on_new_receipts(
+                self.pusher_pool.on_new_receipts(
                     token, token, set(row.room_id for row in rows)
                 )
         except Exception:
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index e201f18efd..27e1998660 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -114,7 +114,10 @@ class SynchrotronPresence(object):
         logger.info("Presence process_id is %r", self.process_id)
 
     def send_user_sync(self, user_id, is_syncing, last_sync_ms):
-        self.hs.get_tcp_replication().send_user_sync(user_id, is_syncing, last_sync_ms)
+        if self.hs.config.use_presence:
+            self.hs.get_tcp_replication().send_user_sync(
+                user_id, is_syncing, last_sync_ms
+            )
 
     def mark_as_coming_online(self, user_id):
         """A user has started syncing. Send a UserSync to the master, unless they
@@ -211,10 +214,13 @@ class SynchrotronPresence(object):
         yield self.notify_from_replication(states, stream_id)
 
     def get_currently_syncing_users(self):
-        return [
-            user_id for user_id, count in iteritems(self.user_to_num_current_syncs)
-            if count > 0
-        ]
+        if self.hs.config.use_presence:
+            return [
+                user_id for user_id, count in iteritems(self.user_to_num_current_syncs)
+                if count > 0
+            ]
+        else:
+            return set()
 
 
 class SynchrotronTyping(object):
@@ -332,8 +338,9 @@ class SyncReplicationHandler(ReplicationClientHandler):
         self.presence_handler = hs.get_presence_handler()
         self.notifier = hs.get_notifier()
 
+    @defer.inlineCallbacks
     def on_rdata(self, stream_name, token, rows):
-        super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows)
+        yield super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows)
         run_in_background(self.process_and_notify, stream_name, token, rows)
 
     def get_streams_to_replicate(self):
diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py
index 637a89530a..1388a42b59 100644
--- a/synapse/app/user_dir.py
+++ b/synapse/app/user_dir.py
@@ -169,8 +169,9 @@ class UserDirectoryReplicationHandler(ReplicationClientHandler):
         super(UserDirectoryReplicationHandler, self).__init__(hs.get_datastore())
         self.user_directory = hs.get_user_directory_handler()
 
+    @defer.inlineCallbacks
     def on_rdata(self, stream_name, token, rows):
-        super(UserDirectoryReplicationHandler, self).on_rdata(
+        yield super(UserDirectoryReplicationHandler, self).on_rdata(
             stream_name, token, rows
         )
         if stream_name == "current_state_deltas":
@@ -214,11 +215,13 @@ def start(config_options):
     config.update_user_directory = True
 
     tls_server_context_factory = context_factory.ServerContextFactory(config)
+    tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
 
     ps = UserDirectoryServer(
         config.server_name,
         db_config=config.database_config,
         tls_server_context_factory=tls_server_context_factory,
+        tls_client_options_factory=tls_client_options_factory,
         config=config,
         version_string="Synapse/" + get_version_string(synapse),
         database_engine=database_engine,