summary refs log tree commit diff
path: root/synapse/app
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/app')
-rw-r--r--synapse/app/appservice.py58
-rw-r--r--synapse/app/client_reader.py34
-rw-r--r--synapse/app/federation_reader.py32
-rw-r--r--synapse/app/federation_sender.py138
-rwxr-xr-xsynapse/app/homeserver.py24
-rw-r--r--synapse/app/media_repository.py34
-rw-r--r--synapse/app/pusher.py133
-rw-r--r--synapse/app/synchrotron.py350
-rwxr-xr-xsynapse/app/synctl.py50
9 files changed, 426 insertions, 427 deletions
diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py
index 1900930053..9a476efa63 100644
--- a/synapse/app/appservice.py
+++ b/synapse/app/appservice.py
@@ -26,17 +26,17 @@ from synapse.replication.slave.storage.directory import DirectoryStore
 from synapse.replication.slave.storage.events import SlavedEventStore
 from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
+from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.storage.engines import create_engine
-from synapse.util.async import sleep
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext
+from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn
 from synapse.util.manhole import manhole
 from synapse.util.rlimit import change_resource_limit
 from synapse.util.versionstring import get_version_string
 
 from synapse import events
 
-from twisted.internet import reactor, defer
+from twisted.internet import reactor
 from twisted.web.resource import Resource
 
 from daemonize import Daemonize
@@ -120,30 +120,25 @@ class AppserviceServer(HomeServer):
             else:
                 logger.warn("Unrecognized listener type: %s", listener["type"])
 
-    @defer.inlineCallbacks
-    def replicate(self):
-        http_client = self.get_simple_http_client()
-        store = self.get_datastore()
-        replication_url = self.config.worker_replication_url
-        appservice_handler = self.get_application_service_handler()
-
-        @defer.inlineCallbacks
-        def replicate(results):
-            stream = results.get("events")
-            if stream:
-                max_stream_id = stream["position"]
-                yield appservice_handler.notify_interested_services(max_stream_id)
-
-        while True:
-            try:
-                args = store.stream_positions()
-                args["timeout"] = 30000
-                result = yield http_client.get_json(replication_url, args=args)
-                yield store.process_replication(result)
-                replicate(result)
-            except:
-                logger.exception("Error replicating from %r", replication_url)
-                yield sleep(30)
+        self.get_tcp_replication().start_replication(self)
+
+    def build_tcp_replication(self):
+        return ASReplicationHandler(self)
+
+
+class ASReplicationHandler(ReplicationClientHandler):
+    def __init__(self, hs):
+        super(ASReplicationHandler, self).__init__(hs.get_datastore())
+        self.appservice_handler = hs.get_application_service_handler()
+
+    def on_rdata(self, stream_name, token, rows):
+        super(ASReplicationHandler, self).on_rdata(stream_name, token, rows)
+
+        if stream_name == "events":
+            max_stream_id = self.store.get_room_max_stream_ordering()
+            preserve_fn(
+                self.appservice_handler.notify_interested_services
+            )(max_stream_id)
 
 
 def start(config_options):
@@ -157,7 +152,7 @@ def start(config_options):
 
     assert config.worker_app == "synapse.app.appservice"
 
-    setup_logging(config.worker_log_config, config.worker_log_file)
+    setup_logging(config, use_worker_options=True)
 
     events.USE_FROZEN_DICTS = config.use_frozen_dicts
 
@@ -187,7 +182,11 @@ def start(config_options):
     ps.start_listening(config.worker_listeners)
 
     def run():
-        with LoggingContext("run"):
+        # make sure that we run the reactor with the sentinel log context,
+        # otherwise other PreserveLoggingContext instances will get confused
+        # and complain when they see the logcontext arbitrarily swapping
+        # between the sentinel and `run` logcontexts.
+        with PreserveLoggingContext():
             logger.info("Running")
             change_resource_limit(config.soft_file_limit)
             if config.gc_thresholds:
@@ -195,7 +194,6 @@ def start(config_options):
             reactor.run()
 
     def start():
-        ps.replicate()
         ps.get_datastore().start_profiling()
         ps.get_state_handler().start_caching()
 
diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py
index 4d081eccd1..9b72c649ac 100644
--- a/synapse/app/client_reader.py
+++ b/synapse/app/client_reader.py
@@ -29,13 +29,14 @@ from synapse.replication.slave.storage.keys import SlavedKeyStore
 from synapse.replication.slave.storage.room import RoomStore
 from synapse.replication.slave.storage.directory import DirectoryStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
+from synapse.replication.slave.storage.transactions import TransactionStore
+from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.rest.client.v1.room import PublicRoomListRestServlet
 from synapse.server import HomeServer
 from synapse.storage.client_ips import ClientIpStore
 from synapse.storage.engines import create_engine
-from synapse.util.async import sleep
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext
+from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
 from synapse.util.manhole import manhole
 from synapse.util.rlimit import change_resource_limit
 from synapse.util.versionstring import get_version_string
@@ -44,7 +45,7 @@ from synapse.crypto import context_factory
 from synapse import events
 
 
-from twisted.internet import reactor, defer
+from twisted.internet import reactor
 from twisted.web.resource import Resource
 
 from daemonize import Daemonize
@@ -63,6 +64,7 @@ class ClientReaderSlavedStore(
     DirectoryStore,
     SlavedApplicationServiceStore,
     SlavedRegistrationStore,
+    TransactionStore,
     BaseSlavedStore,
     ClientIpStore,  # After BaseSlavedStore because the constructor is different
 ):
@@ -143,21 +145,10 @@ class ClientReaderServer(HomeServer):
             else:
                 logger.warn("Unrecognized listener type: %s", listener["type"])
 
-    @defer.inlineCallbacks
-    def replicate(self):
-        http_client = self.get_simple_http_client()
-        store = self.get_datastore()
-        replication_url = self.config.worker_replication_url
+        self.get_tcp_replication().start_replication(self)
 
-        while True:
-            try:
-                args = store.stream_positions()
-                args["timeout"] = 30000
-                result = yield http_client.get_json(replication_url, args=args)
-                yield store.process_replication(result)
-            except:
-                logger.exception("Error replicating from %r", replication_url)
-                yield sleep(5)
+    def build_tcp_replication(self):
+        return ReplicationClientHandler(self.get_datastore())
 
 
 def start(config_options):
@@ -171,7 +162,7 @@ def start(config_options):
 
     assert config.worker_app == "synapse.app.client_reader"
 
-    setup_logging(config.worker_log_config, config.worker_log_file)
+    setup_logging(config, use_worker_options=True)
 
     events.USE_FROZEN_DICTS = config.use_frozen_dicts
 
@@ -193,7 +184,11 @@ def start(config_options):
     ss.start_listening(config.worker_listeners)
 
     def run():
-        with LoggingContext("run"):
+        # make sure that we run the reactor with the sentinel log context,
+        # otherwise other PreserveLoggingContext instances will get confused
+        # and complain when they see the logcontext arbitrarily swapping
+        # between the sentinel and `run` logcontexts.
+        with PreserveLoggingContext():
             logger.info("Running")
             change_resource_limit(config.soft_file_limit)
             if config.gc_thresholds:
@@ -203,7 +198,6 @@ def start(config_options):
     def start():
         ss.get_state_handler().start_caching()
         ss.get_datastore().start_profiling()
-        ss.replicate()
 
     reactor.callWhenRunning(start)
 
diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py
index 90a4816753..eb392e1c9d 100644
--- a/synapse/app/federation_reader.py
+++ b/synapse/app/federation_reader.py
@@ -27,11 +27,11 @@ from synapse.replication.slave.storage.keys import SlavedKeyStore
 from synapse.replication.slave.storage.room import RoomStore
 from synapse.replication.slave.storage.transactions import TransactionStore
 from synapse.replication.slave.storage.directory import DirectoryStore
+from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
-from synapse.util.async import sleep
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext
+from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
 from synapse.util.manhole import manhole
 from synapse.util.rlimit import change_resource_limit
 from synapse.util.versionstring import get_version_string
@@ -42,7 +42,7 @@ from synapse.crypto import context_factory
 from synapse import events
 
 
-from twisted.internet import reactor, defer
+from twisted.internet import reactor
 from twisted.web.resource import Resource
 
 from daemonize import Daemonize
@@ -134,21 +134,10 @@ class FederationReaderServer(HomeServer):
             else:
                 logger.warn("Unrecognized listener type: %s", listener["type"])
 
-    @defer.inlineCallbacks
-    def replicate(self):
-        http_client = self.get_simple_http_client()
-        store = self.get_datastore()
-        replication_url = self.config.worker_replication_url
+        self.get_tcp_replication().start_replication(self)
 
-        while True:
-            try:
-                args = store.stream_positions()
-                args["timeout"] = 30000
-                result = yield http_client.get_json(replication_url, args=args)
-                yield store.process_replication(result)
-            except:
-                logger.exception("Error replicating from %r", replication_url)
-                yield sleep(5)
+    def build_tcp_replication(self):
+        return ReplicationClientHandler(self.get_datastore())
 
 
 def start(config_options):
@@ -162,7 +151,7 @@ def start(config_options):
 
     assert config.worker_app == "synapse.app.federation_reader"
 
-    setup_logging(config.worker_log_config, config.worker_log_file)
+    setup_logging(config, use_worker_options=True)
 
     events.USE_FROZEN_DICTS = config.use_frozen_dicts
 
@@ -184,7 +173,11 @@ def start(config_options):
     ss.start_listening(config.worker_listeners)
 
     def run():
-        with LoggingContext("run"):
+        # make sure that we run the reactor with the sentinel log context,
+        # otherwise other PreserveLoggingContext instances will get confused
+        # and complain when they see the logcontext arbitrarily swapping
+        # between the sentinel and `run` logcontexts.
+        with PreserveLoggingContext():
             logger.info("Running")
             change_resource_limit(config.soft_file_limit)
             if config.gc_thresholds:
@@ -194,7 +187,6 @@ def start(config_options):
     def start():
         ss.get_state_handler().start_caching()
         ss.get_datastore().start_profiling()
-        ss.replicate()
 
     reactor.callWhenRunning(start)
 
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 411e47d98d..145c01f3a3 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -31,11 +31,12 @@ from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
 from synapse.replication.slave.storage.transactions import TransactionStore
 from synapse.replication.slave.storage.devices import SlavedDeviceStore
+from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.storage.engines import create_engine
 from synapse.storage.presence import UserPresenceState
-from synapse.util.async import sleep
+from synapse.util.async import Linearizer
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext
+from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn
 from synapse.util.manhole import manhole
 from synapse.util.rlimit import change_resource_limit
 from synapse.util.versionstring import get_version_string
@@ -50,7 +51,6 @@ from daemonize import Daemonize
 import sys
 import logging
 import gc
-import ujson as json
 
 logger = logging.getLogger("synapse.app.appservice")
 
@@ -59,7 +59,28 @@ class FederationSenderSlaveStore(
     SlavedDeviceInboxStore, TransactionStore, SlavedReceiptsStore, SlavedEventStore,
     SlavedRegistrationStore, SlavedDeviceStore,
 ):
-    pass
+    def __init__(self, db_conn, hs):
+        super(FederationSenderSlaveStore, self).__init__(db_conn, hs)
+
+        # We pull out the current federation stream position now so that we
+        # always have a known value for the federation position in memory so
+        # that we don't have to bounce via a deferred once when we start the
+        # replication streams.
+        self.federation_out_pos_startup = self._get_federation_out_pos(db_conn)
+
+    def _get_federation_out_pos(self, db_conn):
+        sql = (
+            "SELECT stream_id FROM federation_stream_position"
+            " WHERE type = ?"
+        )
+        sql = self.database_engine.convert_param_style(sql)
+
+        txn = db_conn.cursor()
+        txn.execute(sql, ("federation",))
+        rows = txn.fetchall()
+        txn.close()
+
+        return rows[0][0] if rows else -1
 
 
 class FederationSenderServer(HomeServer):
@@ -127,26 +148,27 @@ class FederationSenderServer(HomeServer):
             else:
                 logger.warn("Unrecognized listener type: %s", listener["type"])
 
-    @defer.inlineCallbacks
-    def replicate(self):
-        http_client = self.get_simple_http_client()
-        store = self.get_datastore()
-        replication_url = self.config.worker_replication_url
-        send_handler = FederationSenderHandler(self)
-
-        send_handler.on_start()
-
-        while True:
-            try:
-                args = store.stream_positions()
-                args.update((yield send_handler.stream_positions()))
-                args["timeout"] = 30000
-                result = yield http_client.get_json(replication_url, args=args)
-                yield store.process_replication(result)
-                yield send_handler.process_replication(result)
-            except:
-                logger.exception("Error replicating from %r", replication_url)
-                yield sleep(30)
+        self.get_tcp_replication().start_replication(self)
+
+    def build_tcp_replication(self):
+        return FederationSenderReplicationHandler(self)
+
+
+class FederationSenderReplicationHandler(ReplicationClientHandler):
+    def __init__(self, hs):
+        super(FederationSenderReplicationHandler, self).__init__(hs.get_datastore())
+        self.send_handler = FederationSenderHandler(hs, self)
+
+    def on_rdata(self, stream_name, token, rows):
+        super(FederationSenderReplicationHandler, self).on_rdata(
+            stream_name, token, rows
+        )
+        self.send_handler.process_replication_rows(stream_name, token, rows)
+
+    def get_streams_to_replicate(self):
+        args = super(FederationSenderReplicationHandler, self).get_streams_to_replicate()
+        args.update(self.send_handler.stream_positions())
+        return args
 
 
 def start(config_options):
@@ -160,7 +182,7 @@ def start(config_options):
 
     assert config.worker_app == "synapse.app.federation_sender"
 
-    setup_logging(config.worker_log_config, config.worker_log_file)
+    setup_logging(config, use_worker_options=True)
 
     events.USE_FROZEN_DICTS = config.use_frozen_dicts
 
@@ -193,7 +215,11 @@ def start(config_options):
     ps.start_listening(config.worker_listeners)
 
     def run():
-        with LoggingContext("run"):
+        # make sure that we run the reactor with the sentinel log context,
+        # otherwise other PreserveLoggingContext instances will get confused
+        # and complain when they see the logcontext arbitrarily swapping
+        # between the sentinel and `run` logcontexts.
+        with PreserveLoggingContext():
             logger.info("Running")
             change_resource_limit(config.soft_file_limit)
             if config.gc_thresholds:
@@ -201,7 +227,6 @@ def start(config_options):
             reactor.run()
 
     def start():
-        ps.replicate()
         ps.get_datastore().start_profiling()
         ps.get_state_handler().start_caching()
 
@@ -225,9 +250,15 @@ class FederationSenderHandler(object):
     """Processes the replication stream and forwards the appropriate entries
     to the federation sender.
     """
-    def __init__(self, hs):
+    def __init__(self, hs, replication_client):
         self.store = hs.get_datastore()
         self.federation_sender = hs.get_federation_sender()
+        self.replication_client = replication_client
+
+        self.federation_position = self.store.federation_out_pos_startup
+        self._fed_position_linearizer = Linearizer(name="_fed_position_linearizer")
+
+        self._last_ack = self.federation_position
 
         self._room_serials = {}
         self._room_typing = {}
@@ -239,25 +270,13 @@ class FederationSenderHandler(object):
             self.store.get_room_max_stream_ordering()
         )
 
-    @defer.inlineCallbacks
     def stream_positions(self):
-        stream_id = yield self.store.get_federation_out_pos("federation")
-        defer.returnValue({
-            "federation": stream_id,
-
-            # Ack stuff we've "processed", this should only be called from
-            # one process.
-            "federation_ack": stream_id,
-        })
+        return {"federation": self.federation_position}
 
-    @defer.inlineCallbacks
-    def process_replication(self, result):
+    def process_replication_rows(self, stream_name, token, rows):
         # The federation stream contains things that we want to send out, e.g.
         # presence, typing, etc.
-        fed_stream = result.get("federation")
-        if fed_stream:
-            latest_id = int(fed_stream["position"])
-
+        if stream_name == "federation":
             # The federation stream containis a bunch of different types of
             # rows that need to be handled differently. We parse the rows, put
             # them into the appropriate collection and then send them off.
@@ -268,9 +287,9 @@ class FederationSenderHandler(object):
             device_destinations = set()
 
             # Parse the rows in the stream
-            for row in fed_stream["rows"]:
-                position, typ, content_js = row
-                content = json.loads(content_js)
+            for row in rows:
+                typ = row.type
+                content = row.data
 
                 if typ == send_queue.PRESENCE_TYPE:
                     destination = content["destination"]
@@ -321,16 +340,27 @@ class FederationSenderHandler(object):
             for destination in device_destinations:
                 self.federation_sender.send_device_messages(destination)
 
-            # Record where we are in the stream.
-            yield self.store.update_federation_out_pos(
-                "federation", latest_id
-            )
+            preserve_fn(self.update_token)(token)
 
         # We also need to poke the federation sender when new events happen
-        event_stream = result.get("events")
-        if event_stream:
-            latest_pos = event_stream["position"]
-            self.federation_sender.notify_new_events(latest_pos)
+        elif stream_name == "events":
+            self.federation_sender.notify_new_events(token)
+
+    @defer.inlineCallbacks
+    def update_token(self, token):
+        self.federation_position = token
+
+        # We linearize here to ensure we don't have races updating the token
+        with (yield self._fed_position_linearizer.queue(None)):
+            if self._last_ack < self.federation_position:
+                yield self.store.update_federation_out_pos(
+                    "federation", self.federation_position
+                )
+
+                # We ACK this token over replication so that the master can drop
+                # its in memory queues
+                self.replication_client.send_federation_ack(self.federation_position)
+                self._last_ack = self.federation_position
 
 
 if __name__ == '__main__':
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index e0b87468fe..990eb477e5 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -20,6 +20,8 @@ import gc
 import logging
 import os
 import sys
+
+import synapse.config.logger
 from synapse.config._base import ConfigError
 
 from synapse.python_dependencies import (
@@ -50,10 +52,11 @@ from synapse.api.urls import (
 )
 from synapse.config.homeserver import HomeServerConfig
 from synapse.crypto import context_factory
-from synapse.util.logcontext import LoggingContext
+from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
 from synapse.metrics import register_memory_metrics, get_metrics_for
 from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
 from synapse.replication.resource import ReplicationResource, REPLICATION_PREFIX
+from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
 from synapse.federation.transport.server import TransportLayerServer
 
 from synapse.util.rlimit import change_resource_limit
@@ -220,6 +223,16 @@ class SynapseHomeServer(HomeServer):
                         ),
                         interface=address
                     )
+            elif listener["type"] == "replication":
+                bind_addresses = listener["bind_addresses"]
+                for address in bind_addresses:
+                    factory = ReplicationStreamProtocolFactory(self)
+                    server_listener = reactor.listenTCP(
+                        listener["port"], factory, interface=address
+                    )
+                    reactor.addSystemEventTrigger(
+                        "before", "shutdown", server_listener.stopListening,
+                    )
             else:
                 logger.warn("Unrecognized listener type: %s", listener["type"])
 
@@ -286,7 +299,7 @@ def setup(config_options):
         # generating config files and shouldn't try to continue.
         sys.exit(0)
 
-    config.setup_logging()
+    synapse.config.logger.setup_logging(config, use_worker_options=False)
 
     # check any extra requirements we have now we have a config
     check_requirements(config)
@@ -454,7 +467,12 @@ def run(hs):
     def in_thread():
         # Uncomment to enable tracing of log context changes.
         # sys.settrace(logcontext_tracer)
-        with LoggingContext("run"):
+
+        # make sure that we run the reactor with the sentinel log context,
+        # otherwise other PreserveLoggingContext instances will get confused
+        # and complain when they see the logcontext arbitrarily swapping
+        # between the sentinel and `run` logcontexts.
+        with PreserveLoggingContext():
             change_resource_limit(hs.config.soft_file_limit)
             if hs.config.gc_thresholds:
                 gc.set_threshold(*hs.config.gc_thresholds)
diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py
index ef17b158a5..26c4416956 100644
--- a/synapse/app/media_repository.py
+++ b/synapse/app/media_repository.py
@@ -24,15 +24,16 @@ from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
 from synapse.replication.slave.storage._base import BaseSlavedStore
 from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
+from synapse.replication.slave.storage.transactions import TransactionStore
+from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.rest.media.v0.content_repository import ContentRepoResource
 from synapse.rest.media.v1.media_repository import MediaRepositoryResource
 from synapse.server import HomeServer
 from synapse.storage.client_ips import ClientIpStore
 from synapse.storage.engines import create_engine
 from synapse.storage.media_repository import MediaRepositoryStore
-from synapse.util.async import sleep
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext
+from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
 from synapse.util.manhole import manhole
 from synapse.util.rlimit import change_resource_limit
 from synapse.util.versionstring import get_version_string
@@ -44,7 +45,7 @@ from synapse.crypto import context_factory
 from synapse import events
 
 
-from twisted.internet import reactor, defer
+from twisted.internet import reactor
 from twisted.web.resource import Resource
 
 from daemonize import Daemonize
@@ -59,6 +60,7 @@ logger = logging.getLogger("synapse.app.media_repository")
 class MediaRepositorySlavedStore(
     SlavedApplicationServiceStore,
     SlavedRegistrationStore,
+    TransactionStore,
     BaseSlavedStore,
     MediaRepositoryStore,
     ClientIpStore,
@@ -140,21 +142,10 @@ class MediaRepositoryServer(HomeServer):
             else:
                 logger.warn("Unrecognized listener type: %s", listener["type"])
 
-    @defer.inlineCallbacks
-    def replicate(self):
-        http_client = self.get_simple_http_client()
-        store = self.get_datastore()
-        replication_url = self.config.worker_replication_url
+        self.get_tcp_replication().start_replication(self)
 
-        while True:
-            try:
-                args = store.stream_positions()
-                args["timeout"] = 30000
-                result = yield http_client.get_json(replication_url, args=args)
-                yield store.process_replication(result)
-            except:
-                logger.exception("Error replicating from %r", replication_url)
-                yield sleep(5)
+    def build_tcp_replication(self):
+        return ReplicationClientHandler(self.get_datastore())
 
 
 def start(config_options):
@@ -168,7 +159,7 @@ def start(config_options):
 
     assert config.worker_app == "synapse.app.media_repository"
 
-    setup_logging(config.worker_log_config, config.worker_log_file)
+    setup_logging(config, use_worker_options=True)
 
     events.USE_FROZEN_DICTS = config.use_frozen_dicts
 
@@ -190,7 +181,11 @@ def start(config_options):
     ss.start_listening(config.worker_listeners)
 
     def run():
-        with LoggingContext("run"):
+        # make sure that we run the reactor with the sentinel log context,
+        # otherwise other PreserveLoggingContext instances will get confused
+        # and complain when they see the logcontext arbitrarily swapping
+        # between the sentinel and `run` logcontexts.
+        with PreserveLoggingContext():
             logger.info("Running")
             change_resource_limit(config.soft_file_limit)
             if config.gc_thresholds:
@@ -200,7 +195,6 @@ def start(config_options):
     def start():
         ss.get_state_handler().start_caching()
         ss.get_datastore().start_profiling()
-        ss.replicate()
 
     reactor.callWhenRunning(start)
 
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index 073f2c2489..f9114acfcb 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -27,11 +27,12 @@ from synapse.replication.slave.storage.events import SlavedEventStore
 from synapse.replication.slave.storage.pushers import SlavedPusherStore
 from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
 from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
+from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.storage.engines import create_engine
 from synapse.storage import DataStore
-from synapse.util.async import sleep
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, preserve_fn
+from synapse.util.logcontext import LoggingContext, preserve_fn, \
+    PreserveLoggingContext
 from synapse.util.manhole import manhole
 from synapse.util.rlimit import change_resource_limit
 from synapse.util.versionstring import get_version_string
@@ -88,7 +89,6 @@ class PusherSlaveStore(
 
 
 class PusherServer(HomeServer):
-
     def get_db_conn(self, run_new_connection=True):
         # Any param beginning with cp_ is a parameter for adbapi, and should
         # not be passed to the database engine.
@@ -108,16 +108,7 @@ class PusherServer(HomeServer):
         logger.info("Finished setting up.")
 
     def remove_pusher(self, app_id, push_key, user_id):
-        http_client = self.get_simple_http_client()
-        replication_url = self.config.worker_replication_url
-        url = replication_url + "/remove_pushers"
-        return http_client.post_json_get_json(url, {
-            "remove": [{
-                "app_id": app_id,
-                "push_key": push_key,
-                "user_id": user_id,
-            }]
-        })
+        self.get_tcp_replication().send_remove_pusher(app_id, push_key, user_id)
 
     def _listen_http(self, listener_config):
         port = listener_config["port"]
@@ -165,73 +156,52 @@ class PusherServer(HomeServer):
             else:
                 logger.warn("Unrecognized listener type: %s", listener["type"])
 
+        self.get_tcp_replication().start_replication(self)
+
+    def build_tcp_replication(self):
+        return PusherReplicationHandler(self)
+
+
+class PusherReplicationHandler(ReplicationClientHandler):
+    def __init__(self, hs):
+        super(PusherReplicationHandler, self).__init__(hs.get_datastore())
+
+        self.pusher_pool = hs.get_pusherpool()
+
+    def on_rdata(self, stream_name, token, rows):
+        super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows)
+        preserve_fn(self.poke_pushers)(stream_name, token, rows)
+
     @defer.inlineCallbacks
-    def replicate(self):
-        http_client = self.get_simple_http_client()
-        store = self.get_datastore()
-        replication_url = self.config.worker_replication_url
-        pusher_pool = self.get_pusherpool()
-
-        def stop_pusher(user_id, app_id, pushkey):
-            key = "%s:%s" % (app_id, pushkey)
-            pushers_for_user = pusher_pool.pushers.get(user_id, {})
-            pusher = pushers_for_user.pop(key, None)
-            if pusher is None:
-                return
-            logger.info("Stopping pusher %r / %r", user_id, key)
-            pusher.on_stop()
-
-        def start_pusher(user_id, app_id, pushkey):
-            key = "%s:%s" % (app_id, pushkey)
-            logger.info("Starting pusher %r / %r", user_id, key)
-            return pusher_pool._refresh_pusher(app_id, pushkey, user_id)
-
-        @defer.inlineCallbacks
-        def poke_pushers(results):
-            pushers_rows = set(
-                map(tuple, results.get("pushers", {}).get("rows", []))
+    def poke_pushers(self, stream_name, token, rows):
+        if stream_name == "pushers":
+            for row in rows:
+                if row.deleted:
+                    yield self.stop_pusher(row.user_id, row.app_id, row.pushkey)
+                else:
+                    yield self.start_pusher(row.user_id, row.app_id, row.pushkey)
+        elif stream_name == "events":
+            yield self.pusher_pool.on_new_notifications(
+                token, token,
             )
-            deleted_pushers_rows = set(
-                map(tuple, results.get("deleted_pushers", {}).get("rows", []))
+        elif stream_name == "receipts":
+            yield self.pusher_pool.on_new_receipts(
+                token, token, set(row.room_id for row in rows)
             )
-            for row in sorted(pushers_rows | deleted_pushers_rows):
-                if row in deleted_pushers_rows:
-                    user_id, app_id, pushkey = row[1:4]
-                    stop_pusher(user_id, app_id, pushkey)
-                elif row in pushers_rows:
-                    user_id = row[1]
-                    app_id = row[5]
-                    pushkey = row[8]
-                    yield start_pusher(user_id, app_id, pushkey)
-
-            stream = results.get("events")
-            if stream and stream["rows"]:
-                min_stream_id = stream["rows"][0][0]
-                max_stream_id = stream["position"]
-                preserve_fn(pusher_pool.on_new_notifications)(
-                    min_stream_id, max_stream_id
-                )
-
-            stream = results.get("receipts")
-            if stream and stream["rows"]:
-                rows = stream["rows"]
-                affected_room_ids = set(row[1] for row in rows)
-                min_stream_id = rows[0][0]
-                max_stream_id = stream["position"]
-                preserve_fn(pusher_pool.on_new_receipts)(
-                    min_stream_id, max_stream_id, affected_room_ids
-                )
-
-        while True:
-            try:
-                args = store.stream_positions()
-                args["timeout"] = 30000
-                result = yield http_client.get_json(replication_url, args=args)
-                yield store.process_replication(result)
-                poke_pushers(result)
-            except:
-                logger.exception("Error replicating from %r", replication_url)
-                yield sleep(30)
+
+    def stop_pusher(self, user_id, app_id, pushkey):
+        key = "%s:%s" % (app_id, pushkey)
+        pushers_for_user = self.pusher_pool.pushers.get(user_id, {})
+        pusher = pushers_for_user.pop(key, None)
+        if pusher is None:
+            return
+        logger.info("Stopping pusher %r / %r", user_id, key)
+        pusher.on_stop()
+
+    def start_pusher(self, user_id, app_id, pushkey):
+        key = "%s:%s" % (app_id, pushkey)
+        logger.info("Starting pusher %r / %r", user_id, key)
+        return self.pusher_pool._refresh_pusher(app_id, pushkey, user_id)
 
 
 def start(config_options):
@@ -245,7 +215,7 @@ def start(config_options):
 
     assert config.worker_app == "synapse.app.pusher"
 
-    setup_logging(config.worker_log_config, config.worker_log_file)
+    setup_logging(config, use_worker_options=True)
 
     events.USE_FROZEN_DICTS = config.use_frozen_dicts
 
@@ -275,7 +245,11 @@ def start(config_options):
     ps.start_listening(config.worker_listeners)
 
     def run():
-        with LoggingContext("run"):
+        # make sure that we run the reactor with the sentinel log context,
+        # otherwise other PreserveLoggingContext instances will get confused
+        # and complain when they see the logcontext arbitrarily swapping
+        # between the sentinel and `run` logcontexts.
+        with PreserveLoggingContext():
             logger.info("Running")
             change_resource_limit(config.soft_file_limit)
             if config.gc_thresholds:
@@ -283,7 +257,6 @@ def start(config_options):
             reactor.run()
 
     def start():
-        ps.replicate()
         ps.get_pusherpool().start()
         ps.get_datastore().start_profiling()
         ps.get_state_handler().start_caching()
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 3f29595256..d39e3161fe 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -16,11 +16,10 @@
 
 import synapse
 
-from synapse.api.constants import EventTypes, PresenceState
+from synapse.api.constants import EventTypes
 from synapse.config._base import ConfigError
 from synapse.config.homeserver import HomeServerConfig
 from synapse.config.logger import setup_logging
-from synapse.events import FrozenEvent
 from synapse.handlers.presence import PresenceHandler
 from synapse.http.site import SynapseSite
 from synapse.http.server import JsonResource
@@ -41,14 +40,14 @@ from synapse.replication.slave.storage.presence import SlavedPresenceStore
 from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
 from synapse.replication.slave.storage.devices import SlavedDeviceStore
 from synapse.replication.slave.storage.room import RoomStore
+from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.server import HomeServer
 from synapse.storage.client_ips import ClientIpStore
 from synapse.storage.engines import create_engine
 from synapse.storage.presence import PresenceStore, UserPresenceState
 from synapse.storage.roommember import RoomMemberStore
-from synapse.util.async import sleep
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, preserve_fn
+from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn
 from synapse.util.manhole import manhole
 from synapse.util.rlimit import change_resource_limit
 from synapse.util.stringutils import random_string
@@ -63,7 +62,6 @@ import sys
 import logging
 import contextlib
 import gc
-import ujson as json
 
 logger = logging.getLogger("synapse.app.synchrotron")
 
@@ -107,11 +105,11 @@ UPDATE_SYNCING_USERS_MS = 10 * 1000
 
 class SynchrotronPresence(object):
     def __init__(self, hs):
+        self.hs = hs
         self.is_mine_id = hs.is_mine_id
         self.http_client = hs.get_simple_http_client()
         self.store = hs.get_datastore()
         self.user_to_num_current_syncs = {}
-        self.syncing_users_url = hs.config.worker_replication_url + "/syncing_users"
         self.clock = hs.get_clock()
         self.notifier = hs.get_notifier()
 
@@ -121,17 +119,52 @@ class SynchrotronPresence(object):
             for state in active_presence
         }
 
-        self.process_id = random_string(16)
-        logger.info("Presence process_id is %r", self.process_id)
+        # user_id -> last_sync_ms. Lists the users that have stopped syncing
+        # but we haven't notified the master of that yet
+        self.users_going_offline = {}
 
-        self._sending_sync = False
-        self._need_to_send_sync = False
-        self.clock.looping_call(
-            self._send_syncing_users_regularly,
-            UPDATE_SYNCING_USERS_MS,
+        self._send_stop_syncing_loop = self.clock.looping_call(
+            self.send_stop_syncing, 10 * 1000
         )
 
-        reactor.addSystemEventTrigger("before", "shutdown", self._on_shutdown)
+        self.process_id = random_string(16)
+        logger.info("Presence process_id is %r", self.process_id)
+
+    def send_user_sync(self, user_id, is_syncing, last_sync_ms):
+        self.hs.get_tcp_replication().send_user_sync(user_id, is_syncing, last_sync_ms)
+
+    def mark_as_coming_online(self, user_id):
+        """A user has started syncing. Send a UserSync to the master, unless they
+        had recently stopped syncing.
+
+        Args:
+            user_id (str)
+        """
+        going_offline = self.users_going_offline.pop(user_id, None)
+        if not going_offline:
+            # Safe to skip because we haven't yet told the master they were offline
+            self.send_user_sync(user_id, True, self.clock.time_msec())
+
+    def mark_as_going_offline(self, user_id):
+        """A user has stopped syncing. We wait before notifying the master as
+        its likely they'll come back soon. This allows us to avoid sending
+        a stopped syncing immediately followed by a started syncing notification
+        to the master
+
+        Args:
+            user_id (str)
+        """
+        self.users_going_offline[user_id] = self.clock.time_msec()
+
+    def send_stop_syncing(self):
+        """Check if there are any users who have stopped syncing a while ago
+        and haven't come back yet. If there are poke the master about them.
+        """
+        now = self.clock.time_msec()
+        for user_id, last_sync_ms in self.users_going_offline.items():
+            if now - last_sync_ms > 10 * 1000:
+                self.users_going_offline.pop(user_id, None)
+                self.send_user_sync(user_id, False, last_sync_ms)
 
     def set_state(self, user, state, ignore_status_msg=False):
         # TODO Hows this supposed to work?
@@ -142,15 +175,14 @@ class SynchrotronPresence(object):
     _get_interested_parties = PresenceHandler._get_interested_parties.__func__
     current_state_for_users = PresenceHandler.current_state_for_users.__func__
 
-    @defer.inlineCallbacks
     def user_syncing(self, user_id, affect_presence):
         if affect_presence:
             curr_sync = self.user_to_num_current_syncs.get(user_id, 0)
             self.user_to_num_current_syncs[user_id] = curr_sync + 1
-            prev_states = yield self.current_state_for_users([user_id])
-            if prev_states[user_id].state == PresenceState.OFFLINE:
-                # TODO: Don't block the sync request on this HTTP hit.
-                yield self._send_syncing_users_now()
+
+            # If we went from no in flight sync to some, notify replication
+            if self.user_to_num_current_syncs[user_id] == 1:
+                self.mark_as_coming_online(user_id)
 
         def _end():
             # We check that the user_id is in user_to_num_current_syncs because
@@ -159,6 +191,10 @@ class SynchrotronPresence(object):
             if affect_presence and user_id in self.user_to_num_current_syncs:
                 self.user_to_num_current_syncs[user_id] -= 1
 
+                # If we went from one in flight sync to non, notify replication
+                if self.user_to_num_current_syncs[user_id] == 0:
+                    self.mark_as_going_offline(user_id)
+
         @contextlib.contextmanager
         def _user_syncing():
             try:
@@ -166,49 +202,7 @@ class SynchrotronPresence(object):
             finally:
                 _end()
 
-        defer.returnValue(_user_syncing())
-
-    @defer.inlineCallbacks
-    def _on_shutdown(self):
-        # When the synchrotron is shutdown tell the master to clear the in
-        # progress syncs for this process
-        self.user_to_num_current_syncs.clear()
-        yield self._send_syncing_users_now()
-
-    def _send_syncing_users_regularly(self):
-        # Only send an update if we aren't in the middle of sending one.
-        if not self._sending_sync:
-            preserve_fn(self._send_syncing_users_now)()
-
-    @defer.inlineCallbacks
-    def _send_syncing_users_now(self):
-        if self._sending_sync:
-            # We don't want to race with sending another update.
-            # Instead we wait for that update to finish and send another
-            # update afterwards.
-            self._need_to_send_sync = True
-            return
-
-        # Flag that we are sending an update.
-        self._sending_sync = True
-
-        yield self.http_client.post_json_get_json(self.syncing_users_url, {
-            "process_id": self.process_id,
-            "syncing_users": [
-                user_id for user_id, count in self.user_to_num_current_syncs.items()
-                if count > 0
-            ],
-        })
-
-        # Unset the flag as we are no longer sending an update.
-        self._sending_sync = False
-        if self._need_to_send_sync:
-            # If something happened while we were sending the update then
-            # we might need to send another update.
-            # TODO: Check if the update that was sent matches the current state
-            # as we only need to send an update if they are different.
-            self._need_to_send_sync = False
-            yield self._send_syncing_users_now()
+        return defer.succeed(_user_syncing())
 
     @defer.inlineCallbacks
     def notify_from_replication(self, states, stream_id):
@@ -223,26 +217,24 @@ class SynchrotronPresence(object):
         )
 
     @defer.inlineCallbacks
-    def process_replication(self, result):
-        stream = result.get("presence", {"rows": []})
-        states = []
-        for row in stream["rows"]:
-            (
-                position, user_id, state, last_active_ts,
-                last_federation_update_ts, last_user_sync_ts, status_msg,
-                currently_active
-            ) = row
-            state = UserPresenceState(
-                user_id, state, last_active_ts,
-                last_federation_update_ts, last_user_sync_ts, status_msg,
-                currently_active
-            )
-            self.user_to_current_state[user_id] = state
-            states.append(state)
+    def process_replication_rows(self, token, rows):
+        states = [UserPresenceState(
+            row.user_id, row.state, row.last_active_ts,
+            row.last_federation_update_ts, row.last_user_sync_ts, row.status_msg,
+            row.currently_active
+        ) for row in rows]
+
+        for state in states:
+            self.user_to_current_state[row.user_id] = state
 
-        if states and "position" in stream:
-            stream_id = int(stream["position"])
-            yield self.notify_from_replication(states, stream_id)
+        stream_id = token
+        yield self.notify_from_replication(states, stream_id)
+
+    def get_currently_syncing_users(self):
+        return [
+            user_id for user_id, count in self.user_to_num_current_syncs.iteritems()
+            if count > 0
+        ]
 
 
 class SynchrotronTyping(object):
@@ -257,16 +249,12 @@ class SynchrotronTyping(object):
         # value which we *must* use for the next replication request.
         return {"typing": self._latest_room_serial}
 
-    def process_replication(self, result):
-        stream = result.get("typing")
-        if stream:
-            self._latest_room_serial = int(stream["position"])
+    def process_replication_rows(self, token, rows):
+        self._latest_room_serial = token
 
-            for row in stream["rows"]:
-                position, room_id, typing_json = row
-                typing = json.loads(typing_json)
-                self._room_serials[room_id] = position
-                self._room_typing[room_id] = typing
+        for row in rows:
+            self._room_serials[row.room_id] = token
+            self._room_typing[row.room_id] = row.user_ids
 
 
 class SynchrotronApplicationService(object):
@@ -351,120 +339,89 @@ class SynchrotronServer(HomeServer):
             else:
                 logger.warn("Unrecognized listener type: %s", listener["type"])
 
-    @defer.inlineCallbacks
-    def replicate(self):
-        http_client = self.get_simple_http_client()
-        store = self.get_datastore()
-        replication_url = self.config.worker_replication_url
-        notifier = self.get_notifier()
-        presence_handler = self.get_presence_handler()
-        typing_handler = self.get_typing_handler()
-
-        def notify_from_stream(
-            result, stream_name, stream_key, room=None, user=None
-        ):
-            stream = result.get(stream_name)
-            if stream:
-                position_index = stream["field_names"].index("position")
-                if room:
-                    room_index = stream["field_names"].index(room)
-                if user:
-                    user_index = stream["field_names"].index(user)
-
-                users = ()
-                rooms = ()
-                for row in stream["rows"]:
-                    position = row[position_index]
-
-                    if user:
-                        users = (row[user_index],)
-
-                    if room:
-                        rooms = (row[room_index],)
-
-                    notifier.on_new_event(
-                        stream_key, position, users=users, rooms=rooms
-                    )
+        self.get_tcp_replication().start_replication(self)
 
-        @defer.inlineCallbacks
-        def notify_device_list_update(result):
-            stream = result.get("device_lists")
-            if not stream:
-                return
+    def build_tcp_replication(self):
+        return SyncReplicationHandler(self)
 
-            position_index = stream["field_names"].index("position")
-            user_index = stream["field_names"].index("user_id")
+    def build_presence_handler(self):
+        return SynchrotronPresence(self)
 
-            for row in stream["rows"]:
-                position = row[position_index]
-                user_id = row[user_index]
+    def build_typing_handler(self):
+        return SynchrotronTyping(self)
 
-                rooms = yield store.get_rooms_for_user(user_id)
-                room_ids = [r.room_id for r in rooms]
 
-                notifier.on_new_event(
-                    "device_list_key", position, rooms=room_ids,
-                )
+class SyncReplicationHandler(ReplicationClientHandler):
+    def __init__(self, hs):
+        super(SyncReplicationHandler, self).__init__(hs.get_datastore())
 
-        @defer.inlineCallbacks
-        def notify(result):
-            stream = result.get("events")
-            if stream:
-                max_position = stream["position"]
-                for row in stream["rows"]:
-                    position = row[0]
-                    internal = json.loads(row[1])
-                    event_json = json.loads(row[2])
-                    event = FrozenEvent(event_json, internal_metadata_dict=internal)
-                    extra_users = ()
-                    if event.type == EventTypes.Member:
-                        extra_users = (event.state_key,)
-                    notifier.on_new_room_event(
-                        event, position, max_position, extra_users
-                    )
+        self.store = hs.get_datastore()
+        self.typing_handler = hs.get_typing_handler()
+        self.presence_handler = hs.get_presence_handler()
+        self.notifier = hs.get_notifier()
 
-            notify_from_stream(
-                result, "push_rules", "push_rules_key", user="user_id"
-            )
-            notify_from_stream(
-                result, "user_account_data", "account_data_key", user="user_id"
-            )
-            notify_from_stream(
-                result, "room_account_data", "account_data_key", user="user_id"
+        self.presence_handler.sync_callback = self.send_user_sync
+
+    def on_rdata(self, stream_name, token, rows):
+        super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows)
+
+        preserve_fn(self.process_and_notify)(stream_name, token, rows)
+
+    def get_streams_to_replicate(self):
+        args = super(SyncReplicationHandler, self).get_streams_to_replicate()
+        args.update(self.typing_handler.stream_positions())
+        return args
+
+    def get_currently_syncing_users(self):
+        return self.presence_handler.get_currently_syncing_users()
+
+    @defer.inlineCallbacks
+    def process_and_notify(self, stream_name, token, rows):
+        if stream_name == "events":
+            # We shouldn't get multiple rows per token for events stream, so
+            # we don't need to optimise this for multiple rows.
+            for row in rows:
+                event = yield self.store.get_event(row.event_id)
+                extra_users = ()
+                if event.type == EventTypes.Member:
+                    extra_users = (event.state_key,)
+                max_token = self.store.get_room_max_stream_ordering()
+                self.notifier.on_new_room_event(
+                    event, token, max_token, extra_users
+                )
+        elif stream_name == "push_rules":
+            self.notifier.on_new_event(
+                "push_rules_key", token, users=[row.user_id for row in rows],
             )
-            notify_from_stream(
-                result, "tag_account_data", "account_data_key", user="user_id"
+        elif stream_name in ("account_data", "tag_account_data",):
+            self.notifier.on_new_event(
+                "account_data_key", token, users=[row.user_id for row in rows],
             )
-            notify_from_stream(
-                result, "receipts", "receipt_key", room="room_id"
+        elif stream_name == "receipts":
+            self.notifier.on_new_event(
+                "receipt_key", token, rooms=[row.room_id for row in rows],
             )
-            notify_from_stream(
-                result, "typing", "typing_key", room="room_id"
+        elif stream_name == "typing":
+            self.typing_handler.process_replication_rows(token, rows)
+            self.notifier.on_new_event(
+                "typing_key", token, rooms=[row.room_id for row in rows],
             )
-            notify_from_stream(
-                result, "to_device", "to_device_key", user="user_id"
+        elif stream_name == "to_device":
+            entities = [row.entity for row in rows if row.entity.startswith("@")]
+            if entities:
+                self.notifier.on_new_event(
+                    "to_device_key", token, users=entities,
+                )
+        elif stream_name == "device_lists":
+            all_room_ids = set()
+            for row in rows:
+                room_ids = yield self.store.get_rooms_for_user(row.user_id)
+                all_room_ids.update(room_ids)
+            self.notifier.on_new_event(
+                "device_list_key", token, rooms=all_room_ids,
             )
-            yield notify_device_list_update(result)
-
-        while True:
-            try:
-                args = store.stream_positions()
-                args.update(typing_handler.stream_positions())
-                args["timeout"] = 30000
-                result = yield http_client.get_json(replication_url, args=args)
-                yield store.process_replication(result)
-                typing_handler.process_replication(result)
-                yield presence_handler.process_replication(result)
-                yield notify(result)
-            except:
-                logger.exception("Error replicating from %r", replication_url)
-                yield sleep(5)
-
-    def build_presence_handler(self):
-        return SynchrotronPresence(self)
-
-    def build_typing_handler(self):
-        return SynchrotronTyping(self)
+        elif stream_name == "presence":
+            yield self.presence_handler.process_replication_rows(token, rows)
 
 
 def start(config_options):
@@ -478,7 +435,7 @@ def start(config_options):
 
     assert config.worker_app == "synapse.app.synchrotron"
 
-    setup_logging(config.worker_log_config, config.worker_log_file)
+    setup_logging(config, use_worker_options=True)
 
     synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts
 
@@ -497,7 +454,11 @@ def start(config_options):
     ss.start_listening(config.worker_listeners)
 
     def run():
-        with LoggingContext("run"):
+        # make sure that we run the reactor with the sentinel log context,
+        # otherwise other PreserveLoggingContext instances will get confused
+        # and complain when they see the logcontext arbitrarily swapping
+        # between the sentinel and `run` logcontexts.
+        with PreserveLoggingContext():
             logger.info("Running")
             change_resource_limit(config.soft_file_limit)
             if config.gc_thresholds:
@@ -506,7 +467,6 @@ def start(config_options):
 
     def start():
         ss.get_datastore().start_profiling()
-        ss.replicate()
         ss.get_state_handler().start_caching()
 
     reactor.callWhenRunning(start)
diff --git a/synapse/app/synctl.py b/synapse/app/synctl.py
index e84045c7d1..8223734845 100755
--- a/synapse/app/synctl.py
+++ b/synapse/app/synctl.py
@@ -23,14 +23,27 @@ import signal
 import subprocess
 import sys
 import yaml
+import errno
+import time
 
 SYNAPSE = [sys.executable, "-B", "-m", "synapse.app.homeserver"]
 
 GREEN = "\x1b[1;32m"
+YELLOW = "\x1b[1;33m"
 RED = "\x1b[1;31m"
 NORMAL = "\x1b[m"
 
 
+def pid_running(pid):
+    try:
+        os.kill(pid, 0)
+        return True
+    except OSError, err:
+        if err.errno == errno.EPERM:
+            return True
+        return False
+
+
 def write(message, colour=NORMAL, stream=sys.stdout):
     if colour == NORMAL:
         stream.write(message + "\n")
@@ -38,6 +51,11 @@ def write(message, colour=NORMAL, stream=sys.stdout):
         stream.write(colour + message + NORMAL + "\n")
 
 
+def abort(message, colour=RED, stream=sys.stderr):
+    write(message, colour, stream)
+    sys.exit(1)
+
+
 def start(configfile):
     write("Starting ...")
     args = SYNAPSE
@@ -45,7 +63,8 @@ def start(configfile):
 
     try:
         subprocess.check_call(args)
-        write("started synapse.app.homeserver(%r)" % (configfile,), colour=GREEN)
+        write("started synapse.app.homeserver(%r)" %
+              (configfile,), colour=GREEN)
     except subprocess.CalledProcessError as e:
         write(
             "error starting (exit code: %d); see above for logs" % e.returncode,
@@ -76,8 +95,16 @@ def start_worker(app, configfile, worker_configfile):
 def stop(pidfile, app):
     if os.path.exists(pidfile):
         pid = int(open(pidfile).read())
-        os.kill(pid, signal.SIGTERM)
-        write("stopped %s" % (app,), colour=GREEN)
+        try:
+            os.kill(pid, signal.SIGTERM)
+            write("stopped %s" % (app,), colour=GREEN)
+        except OSError, err:
+            if err.errno == errno.ESRCH:
+                write("%s not running" % (app,), colour=YELLOW)
+            elif err.errno == errno.EPERM:
+                abort("Cannot stop %s: Operation not permitted" % (app,))
+            else:
+                abort("Cannot stop %s: Unknown error" % (app,))
 
 
 Worker = collections.namedtuple("Worker", [
@@ -175,7 +202,8 @@ def main():
         worker_app = worker_config["worker_app"]
         worker_pidfile = worker_config["worker_pid_file"]
         worker_daemonize = worker_config["worker_daemonize"]
-        assert worker_daemonize  # TODO print something more user friendly
+        assert worker_daemonize, "In config %r: expected '%s' to be True" % (
+            worker_configfile, "worker_daemonize")
         worker_cache_factor = worker_config.get("synctl_cache_factor")
         workers.append(Worker(
             worker_app, worker_configfile, worker_pidfile, worker_cache_factor,
@@ -190,7 +218,19 @@ def main():
         if start_stop_synapse:
             stop(pidfile, "synapse.app.homeserver")
 
-        # TODO: Wait for synapse to actually shutdown before starting it again
+    # Wait for synapse to actually shutdown before starting it again
+    if action == "restart":
+        running_pids = []
+        if start_stop_synapse and os.path.exists(pidfile):
+            running_pids.append(int(open(pidfile).read()))
+        for worker in workers:
+            if os.path.exists(worker.pidfile):
+                running_pids.append(int(open(worker.pidfile).read()))
+        if len(running_pids) > 0:
+            write("Waiting for process to exit before restarting...")
+            for running_pid in running_pids:
+                while pid_running(running_pid):
+                    time.sleep(0.2)
 
     if action == "start" or action == "restart":
         if start_stop_synapse: