summary refs log tree commit diff
path: root/synapse/app
diff options
context:
space:
mode:
authorDavid Baker <dave@matrix.org>2016-08-11 14:09:13 +0100
committerDavid Baker <dave@matrix.org>2016-08-11 14:09:13 +0100
commitb4ecf0b886c67437901e0af457c5f801ebde9a72 (patch)
treeef66b0684edcfeb4ad68d20375641f4654393f44 /synapse/app
parentInclude the ts the notif was received at (diff)
parentMerge pull request #1003 from matrix-org/erikj/redaction_prev_content (diff)
downloadsynapse-b4ecf0b886c67437901e0af457c5f801ebde9a72.tar.xz
Merge remote-tracking branch 'origin/develop' into dbkr/notifications_api
Diffstat (limited to '')
-rw-r--r--synapse/app/__init__.py8
-rw-r--r--synapse/app/federation_reader.py206
-rwxr-xr-xsynapse/app/homeserver.py16
-rw-r--r--synapse/app/pusher.py136
-rw-r--r--synapse/app/synchrotron.py465
-rwxr-xr-xsynapse/app/synctl.py178
-rw-r--r--synapse/appservice/scheduler.py14
7 files changed, 890 insertions, 133 deletions
diff --git a/synapse/app/__init__.py b/synapse/app/__init__.py
index 1bc4279807..9c2b627590 100644
--- a/synapse/app/__init__.py
+++ b/synapse/app/__init__.py
@@ -16,13 +16,11 @@
 import sys
 sys.dont_write_bytecode = True
 
-from synapse.python_dependencies import (
-    check_requirements, MissingRequirementError
-)  # NOQA
+from synapse import python_dependencies   # noqa: E402
 
 try:
-    check_requirements()
-except MissingRequirementError as e:
+    python_dependencies.check_requirements()
+except python_dependencies.MissingRequirementError as e:
     message = "\n".join([
         "Missing Requirement: %s" % (e.message,),
         "To install run:",
diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py
new file mode 100644
index 0000000000..7355499ae2
--- /dev/null
+++ b/synapse/app/federation_reader.py
@@ -0,0 +1,206 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import synapse
+
+from synapse.config._base import ConfigError
+from synapse.config.homeserver import HomeServerConfig
+from synapse.config.logger import setup_logging
+from synapse.http.site import SynapseSite
+from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.replication.slave.storage._base import BaseSlavedStore
+from synapse.replication.slave.storage.events import SlavedEventStore
+from synapse.replication.slave.storage.keys import SlavedKeyStore
+from synapse.replication.slave.storage.room import RoomStore
+from synapse.replication.slave.storage.transactions import TransactionStore
+from synapse.replication.slave.storage.directory import DirectoryStore
+from synapse.server import HomeServer
+from synapse.storage.engines import create_engine
+from synapse.util.async import sleep
+from synapse.util.httpresourcetree import create_resource_tree
+from synapse.util.logcontext import LoggingContext
+from synapse.util.manhole import manhole
+from synapse.util.rlimit import change_resource_limit
+from synapse.util.versionstring import get_version_string
+from synapse.api.urls import FEDERATION_PREFIX
+from synapse.federation.transport.server import TransportLayerServer
+from synapse.crypto import context_factory
+
+
+from twisted.internet import reactor, defer
+from twisted.web.resource import Resource
+
+from daemonize import Daemonize
+
+import sys
+import logging
+import gc
+
+logger = logging.getLogger("synapse.app.federation_reader")
+
+
+class FederationReaderSlavedStore(
+    SlavedEventStore,
+    SlavedKeyStore,
+    RoomStore,
+    DirectoryStore,
+    TransactionStore,
+    BaseSlavedStore,
+):
+    pass
+
+
+class FederationReaderServer(HomeServer):
+    def get_db_conn(self, run_new_connection=True):
+        # Any param beginning with cp_ is a parameter for adbapi, and should
+        # not be passed to the database engine.
+        db_params = {
+            k: v for k, v in self.db_config.get("args", {}).items()
+            if not k.startswith("cp_")
+        }
+        db_conn = self.database_engine.module.connect(**db_params)
+
+        if run_new_connection:
+            self.database_engine.on_new_connection(db_conn)
+        return db_conn
+
+    def setup(self):
+        logger.info("Setting up.")
+        self.datastore = FederationReaderSlavedStore(self.get_db_conn(), self)
+        logger.info("Finished setting up.")
+
+    def _listen_http(self, listener_config):
+        port = listener_config["port"]
+        bind_address = listener_config.get("bind_address", "")
+        site_tag = listener_config.get("tag", port)
+        resources = {}
+        for res in listener_config["resources"]:
+            for name in res["names"]:
+                if name == "metrics":
+                    resources[METRICS_PREFIX] = MetricsResource(self)
+                elif name == "federation":
+                    resources.update({
+                        FEDERATION_PREFIX: TransportLayerServer(self),
+                    })
+
+        root_resource = create_resource_tree(resources, Resource())
+        reactor.listenTCP(
+            port,
+            SynapseSite(
+                "synapse.access.http.%s" % (site_tag,),
+                site_tag,
+                listener_config,
+                root_resource,
+            ),
+            interface=bind_address
+        )
+        logger.info("Synapse federation reader now listening on port %d", port)
+
+    def start_listening(self, listeners):
+        for listener in listeners:
+            if listener["type"] == "http":
+                self._listen_http(listener)
+            elif listener["type"] == "manhole":
+                reactor.listenTCP(
+                    listener["port"],
+                    manhole(
+                        username="matrix",
+                        password="rabbithole",
+                        globals={"hs": self},
+                    ),
+                    interface=listener.get("bind_address", '127.0.0.1')
+                )
+            else:
+                logger.warn("Unrecognized listener type: %s", listener["type"])
+
+    @defer.inlineCallbacks
+    def replicate(self):
+        http_client = self.get_simple_http_client()
+        store = self.get_datastore()
+        replication_url = self.config.worker_replication_url
+
+        while True:
+            try:
+                args = store.stream_positions()
+                args["timeout"] = 30000
+                result = yield http_client.get_json(replication_url, args=args)
+                yield store.process_replication(result)
+            except:
+                logger.exception("Error replicating from %r", replication_url)
+                yield sleep(5)
+
+
+def start(config_options):
+    try:
+        config = HomeServerConfig.load_config(
+            "Synapse federation reader", config_options
+        )
+    except ConfigError as e:
+        sys.stderr.write("\n" + e.message + "\n")
+        sys.exit(1)
+
+    assert config.worker_app == "synapse.app.federation_reader"
+
+    setup_logging(config.worker_log_config, config.worker_log_file)
+
+    database_engine = create_engine(config.database_config)
+
+    tls_server_context_factory = context_factory.ServerContextFactory(config)
+
+    ss = FederationReaderServer(
+        config.server_name,
+        db_config=config.database_config,
+        tls_server_context_factory=tls_server_context_factory,
+        config=config,
+        version_string="Synapse/" + get_version_string(synapse),
+        database_engine=database_engine,
+    )
+
+    ss.setup()
+    ss.get_handlers()
+    ss.start_listening(config.worker_listeners)
+
+    def run():
+        with LoggingContext("run"):
+            logger.info("Running")
+            change_resource_limit(config.soft_file_limit)
+            if config.gc_thresholds:
+                gc.set_threshold(*config.gc_thresholds)
+            reactor.run()
+
+    def start():
+        ss.get_datastore().start_profiling()
+        ss.replicate()
+
+    reactor.callWhenRunning(start)
+
+    if config.worker_daemonize:
+        daemon = Daemonize(
+            app="synapse-federation-reader",
+            pid=config.worker_pid_file,
+            action=run,
+            auto_close_fds=False,
+            verbose=True,
+            logger=logger,
+        )
+        daemon.start()
+    else:
+        run()
+
+
+if __name__ == '__main__':
+    with LoggingContext("main"):
+        start(sys.argv[1:])
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index df675c0ed4..40e6f65236 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -16,6 +16,7 @@
 
 import synapse
 
+import gc
 import logging
 import os
 import sys
@@ -50,6 +51,7 @@ from synapse.api.urls import (
 from synapse.config.homeserver import HomeServerConfig
 from synapse.crypto import context_factory
 from synapse.util.logcontext import LoggingContext
+from synapse.metrics import register_memory_metrics
 from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
 from synapse.replication.resource import ReplicationResource, REPLICATION_PREFIX
 from synapse.federation.transport.server import TransportLayerServer
@@ -146,7 +148,7 @@ class SynapseHomeServer(HomeServer):
                         MEDIA_PREFIX: media_repo,
                         LEGACY_MEDIA_PREFIX: media_repo,
                         CONTENT_REPO_PREFIX: ContentRepoResource(
-                            self, self.config.uploads_path, self.auth, self.content_addr
+                            self, self.config.uploads_path
                         ),
                     })
 
@@ -265,10 +267,9 @@ def setup(config_options):
         HomeServer
     """
     try:
-        config = HomeServerConfig.load_config(
+        config = HomeServerConfig.load_or_generate_config(
             "Synapse Homeserver",
             config_options,
-            generate_section="Homeserver"
         )
     except ConfigError as e:
         sys.stderr.write("\n" + e.message + "\n")
@@ -284,7 +285,7 @@ def setup(config_options):
     # check any extra requirements we have now we have a config
     check_requirements(config)
 
-    version_string = get_version_string("Synapse", synapse)
+    version_string = "Synapse/" + get_version_string(synapse)
 
     logger.info("Server hostname: %s", config.server_name)
     logger.info("Server version: %s", version_string)
@@ -301,7 +302,6 @@ def setup(config_options):
         db_config=config.database_config,
         tls_server_context_factory=tls_server_context_factory,
         config=config,
-        content_addr=config.content_addr,
         version_string=version_string,
         database_engine=database_engine,
     )
@@ -336,6 +336,8 @@ def setup(config_options):
         hs.get_datastore().start_doing_background_updates()
         hs.get_replication_layer().start_get_pdu_cache()
 
+        register_memory_metrics(hs)
+
     reactor.callWhenRunning(start)
 
     return hs
@@ -351,6 +353,8 @@ class SynapseService(service.Service):
     def startService(self):
         hs = setup(self.config)
         change_resource_limit(hs.config.soft_file_limit)
+        if hs.config.gc_thresholds:
+            gc.set_threshold(*hs.config.gc_thresholds)
 
     def stopService(self):
         return self._port.stopListening()
@@ -422,6 +426,8 @@ def run(hs):
         # sys.settrace(logcontext_tracer)
         with LoggingContext("run"):
             change_resource_limit(hs.config.soft_file_limit)
+            if hs.config.gc_thresholds:
+                gc.set_threshold(*hs.config.gc_thresholds)
             reactor.run()
 
     if hs.config.daemonize:
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index 135dd58c15..c8dde0fcb8 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -18,9 +18,8 @@ import synapse
 
 from synapse.server import HomeServer
 from synapse.config._base import ConfigError
-from synapse.config.database import DatabaseConfig
-from synapse.config.logger import LoggingConfig
-from synapse.config.emailconfig import EmailConfig
+from synapse.config.logger import setup_logging
+from synapse.config.homeserver import HomeServerConfig
 from synapse.http.site import SynapseSite
 from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
 from synapse.storage.roommember import RoomMemberStore
@@ -44,61 +43,11 @@ from daemonize import Daemonize
 
 import sys
 import logging
+import gc
 
 logger = logging.getLogger("synapse.app.pusher")
 
 
-class SlaveConfig(DatabaseConfig):
-    def read_config(self, config):
-        self.replication_url = config["replication_url"]
-        self.server_name = config["server_name"]
-        self.use_insecure_ssl_client_just_for_testing_do_not_use = config.get(
-            "use_insecure_ssl_client_just_for_testing_do_not_use", False
-        )
-        self.user_agent_suffix = None
-        self.start_pushers = True
-        self.listeners = config["listeners"]
-        self.soft_file_limit = config.get("soft_file_limit")
-        self.daemonize = config.get("daemonize")
-        self.pid_file = self.abspath(config.get("pid_file"))
-        self.public_baseurl = config["public_baseurl"]
-
-    def default_config(self, server_name, **kwargs):
-        pid_file = self.abspath("pusher.pid")
-        return """\
-        # Slave configuration
-
-        # The replication listener on the synapse to talk to.
-        #replication_url: https://localhost:{replication_port}/_synapse/replication
-
-        server_name: "%(server_name)s"
-
-        listeners: []
-        # Enable a ssh manhole listener on the pusher.
-        # - type: manhole
-        #   port: {manhole_port}
-        #   bind_address: 127.0.0.1
-        # Enable a metric listener on the pusher.
-        # - type: http
-        #   port: {metrics_port}
-        #   bind_address: 127.0.0.1
-        #   resources:
-        #    - names: ["metrics"]
-        #      compress: False
-
-        report_stats: False
-
-        daemonize: False
-
-        pid_file: %(pid_file)s
-
-        """ % locals()
-
-
-class PusherSlaveConfig(SlaveConfig, LoggingConfig, EmailConfig):
-    pass
-
-
 class PusherSlaveStore(
     SlavedEventStore, SlavedPusherStore, SlavedReceiptsStore,
     SlavedAccountDataStore
@@ -163,7 +112,7 @@ class PusherServer(HomeServer):
 
     def remove_pusher(self, app_id, push_key, user_id):
         http_client = self.get_simple_http_client()
-        replication_url = self.config.replication_url
+        replication_url = self.config.worker_replication_url
         url = replication_url + "/remove_pushers"
         return http_client.post_json_get_json(url, {
             "remove": [{
@@ -196,8 +145,8 @@ class PusherServer(HomeServer):
         )
         logger.info("Synapse pusher now listening on port %d", port)
 
-    def start_listening(self):
-        for listener in self.config.listeners:
+    def start_listening(self, listeners):
+        for listener in listeners:
             if listener["type"] == "http":
                 self._listen_http(listener)
             elif listener["type"] == "manhole":
@@ -217,7 +166,7 @@ class PusherServer(HomeServer):
     def replicate(self):
         http_client = self.get_simple_http_client()
         store = self.get_datastore()
-        replication_url = self.config.replication_url
+        replication_url = self.config.worker_replication_url
         pusher_pool = self.get_pusherpool()
         clock = self.get_clock()
 
@@ -290,22 +239,33 @@ class PusherServer(HomeServer):
                 poke_pushers(result)
             except:
                 logger.exception("Error replicating from %r", replication_url)
-                sleep(30)
+                yield sleep(30)
 
 
-def setup(config_options):
+def start(config_options):
     try:
-        config = PusherSlaveConfig.load_config(
+        config = HomeServerConfig.load_config(
             "Synapse pusher", config_options
         )
     except ConfigError as e:
         sys.stderr.write("\n" + e.message + "\n")
         sys.exit(1)
 
-    if not config:
-        sys.exit(0)
+    assert config.worker_app == "synapse.app.pusher"
+
+    setup_logging(config.worker_log_config, config.worker_log_file)
+
+    if config.start_pushers:
+        sys.stderr.write(
+            "\nThe pushers must be disabled in the main synapse process"
+            "\nbefore they can be run in a separate worker."
+            "\nPlease add ``start_pushers: false`` to the main config"
+            "\n"
+        )
+        sys.exit(1)
 
-    config.setup_logging()
+    # Force the pushers to start since they will be disabled in the main config
+    config.start_pushers = True
 
     database_engine = create_engine(config.database_config)
 
@@ -313,14 +273,20 @@ def setup(config_options):
         config.server_name,
         db_config=config.database_config,
         config=config,
-        version_string=get_version_string("Synapse", synapse),
+        version_string="Synapse/" + get_version_string(synapse),
         database_engine=database_engine,
     )
 
     ps.setup()
-    ps.start_listening()
-
-    change_resource_limit(ps.config.soft_file_limit)
+    ps.start_listening(config.worker_listeners)
+
+    def run():
+        with LoggingContext("run"):
+            logger.info("Running")
+            change_resource_limit(config.soft_file_limit)
+            if config.gc_thresholds:
+                gc.set_threshold(*config.gc_thresholds)
+            reactor.run()
 
     def start():
         ps.replicate()
@@ -329,28 +295,20 @@ def setup(config_options):
 
     reactor.callWhenRunning(start)
 
-    return ps
+    if config.worker_daemonize:
+        daemon = Daemonize(
+            app="synapse-pusher",
+            pid=config.worker_pid_file,
+            action=run,
+            auto_close_fds=False,
+            verbose=True,
+            logger=logger,
+        )
+        daemon.start()
+    else:
+        run()
 
 
 if __name__ == '__main__':
     with LoggingContext("main"):
-        ps = setup(sys.argv[1:])
-
-        if ps.config.daemonize:
-            def run():
-                with LoggingContext("run"):
-                    change_resource_limit(ps.config.soft_file_limit)
-                    reactor.run()
-
-            daemon = Daemonize(
-                app="synapse-pusher",
-                pid=ps.config.pid_file,
-                action=run,
-                auto_close_fds=False,
-                verbose=True,
-                logger=logger,
-            )
-
-            daemon.start()
-        else:
-            reactor.run()
+        ps = start(sys.argv[1:])
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
new file mode 100644
index 0000000000..215ccfd522
--- /dev/null
+++ b/synapse/app/synchrotron.py
@@ -0,0 +1,465 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import synapse
+
+from synapse.api.constants import EventTypes, PresenceState
+from synapse.config._base import ConfigError
+from synapse.config.homeserver import HomeServerConfig
+from synapse.config.logger import setup_logging
+from synapse.events import FrozenEvent
+from synapse.handlers.presence import PresenceHandler
+from synapse.http.site import SynapseSite
+from synapse.http.server import JsonResource
+from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.rest.client.v2_alpha import sync
+from synapse.replication.slave.storage._base import BaseSlavedStore
+from synapse.replication.slave.storage.events import SlavedEventStore
+from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
+from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
+from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
+from synapse.replication.slave.storage.registration import SlavedRegistrationStore
+from synapse.replication.slave.storage.filtering import SlavedFilteringStore
+from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
+from synapse.replication.slave.storage.presence import SlavedPresenceStore
+from synapse.server import HomeServer
+from synapse.storage.client_ips import ClientIpStore
+from synapse.storage.engines import create_engine
+from synapse.storage.presence import PresenceStore, UserPresenceState
+from synapse.storage.roommember import RoomMemberStore
+from synapse.util.async import sleep
+from synapse.util.httpresourcetree import create_resource_tree
+from synapse.util.logcontext import LoggingContext, preserve_fn
+from synapse.util.manhole import manhole
+from synapse.util.rlimit import change_resource_limit
+from synapse.util.stringutils import random_string
+from synapse.util.versionstring import get_version_string
+
+from twisted.internet import reactor, defer
+from twisted.web.resource import Resource
+
+from daemonize import Daemonize
+
+import sys
+import logging
+import contextlib
+import gc
+import ujson as json
+
+logger = logging.getLogger("synapse.app.synchrotron")
+
+
+class SynchrotronSlavedStore(
+    SlavedPushRuleStore,
+    SlavedEventStore,
+    SlavedReceiptsStore,
+    SlavedAccountDataStore,
+    SlavedApplicationServiceStore,
+    SlavedRegistrationStore,
+    SlavedFilteringStore,
+    SlavedPresenceStore,
+    BaseSlavedStore,
+    ClientIpStore,  # After BaseSlavedStore because the constructor is different
+):
+    # XXX: This is a bit broken because we don't persist forgotten rooms
+    # in a way that they can be streamed. This means that we don't have a
+    # way to invalidate the forgotten rooms cache correctly.
+    # For now we expire the cache every 10 minutes.
+    BROKEN_CACHE_EXPIRY_MS = 60 * 60 * 1000
+    who_forgot_in_room = (
+        RoomMemberStore.__dict__["who_forgot_in_room"]
+    )
+
+    # XXX: This is a bit broken because we don't persist the accepted list in a
+    # way that can be replicated. This means that we don't have a way to
+    # invalidate the cache correctly.
+    get_presence_list_accepted = PresenceStore.__dict__[
+        "get_presence_list_accepted"
+    ]
+
+UPDATE_SYNCING_USERS_MS = 10 * 1000
+
+
+class SynchrotronPresence(object):
+    def __init__(self, hs):
+        self.http_client = hs.get_simple_http_client()
+        self.store = hs.get_datastore()
+        self.user_to_num_current_syncs = {}
+        self.syncing_users_url = hs.config.worker_replication_url + "/syncing_users"
+        self.clock = hs.get_clock()
+
+        active_presence = self.store.take_presence_startup_info()
+        self.user_to_current_state = {
+            state.user_id: state
+            for state in active_presence
+        }
+
+        self.process_id = random_string(16)
+        logger.info("Presence process_id is %r", self.process_id)
+
+        self._sending_sync = False
+        self._need_to_send_sync = False
+        self.clock.looping_call(
+            self._send_syncing_users_regularly,
+            UPDATE_SYNCING_USERS_MS,
+        )
+
+        reactor.addSystemEventTrigger("before", "shutdown", self._on_shutdown)
+
+    def set_state(self, user, state):
+        # TODO Hows this supposed to work?
+        pass
+
+    get_states = PresenceHandler.get_states.__func__
+    current_state_for_users = PresenceHandler.current_state_for_users.__func__
+
+    @defer.inlineCallbacks
+    def user_syncing(self, user_id, affect_presence):
+        if affect_presence:
+            curr_sync = self.user_to_num_current_syncs.get(user_id, 0)
+            self.user_to_num_current_syncs[user_id] = curr_sync + 1
+            prev_states = yield self.current_state_for_users([user_id])
+            if prev_states[user_id].state == PresenceState.OFFLINE:
+                # TODO: Don't block the sync request on this HTTP hit.
+                yield self._send_syncing_users_now()
+
+        def _end():
+            # We check that the user_id is in user_to_num_current_syncs because
+            # user_to_num_current_syncs may have been cleared if we are
+            # shutting down.
+            if affect_presence and user_id in self.user_to_num_current_syncs:
+                self.user_to_num_current_syncs[user_id] -= 1
+
+        @contextlib.contextmanager
+        def _user_syncing():
+            try:
+                yield
+            finally:
+                _end()
+
+        defer.returnValue(_user_syncing())
+
+    @defer.inlineCallbacks
+    def _on_shutdown(self):
+        # When the synchrotron is shutdown tell the master to clear the in
+        # progress syncs for this process
+        self.user_to_num_current_syncs.clear()
+        yield self._send_syncing_users_now()
+
+    def _send_syncing_users_regularly(self):
+        # Only send an update if we aren't in the middle of sending one.
+        if not self._sending_sync:
+            preserve_fn(self._send_syncing_users_now)()
+
+    @defer.inlineCallbacks
+    def _send_syncing_users_now(self):
+        if self._sending_sync:
+            # We don't want to race with sending another update.
+            # Instead we wait for that update to finish and send another
+            # update afterwards.
+            self._need_to_send_sync = True
+            return
+
+        # Flag that we are sending an update.
+        self._sending_sync = True
+
+        yield self.http_client.post_json_get_json(self.syncing_users_url, {
+            "process_id": self.process_id,
+            "syncing_users": [
+                user_id for user_id, count in self.user_to_num_current_syncs.items()
+                if count > 0
+            ],
+        })
+
+        # Unset the flag as we are no longer sending an update.
+        self._sending_sync = False
+        if self._need_to_send_sync:
+            # If something happened while we were sending the update then
+            # we might need to send another update.
+            # TODO: Check if the update that was sent matches the current state
+            # as we only need to send an update if they are different.
+            self._need_to_send_sync = False
+            yield self._send_syncing_users_now()
+
+    def process_replication(self, result):
+        stream = result.get("presence", {"rows": []})
+        for row in stream["rows"]:
+            (
+                position, user_id, state, last_active_ts,
+                last_federation_update_ts, last_user_sync_ts, status_msg,
+                currently_active
+            ) = row
+            self.user_to_current_state[user_id] = UserPresenceState(
+                user_id, state, last_active_ts,
+                last_federation_update_ts, last_user_sync_ts, status_msg,
+                currently_active
+            )
+
+
+class SynchrotronTyping(object):
+    def __init__(self, hs):
+        self._latest_room_serial = 0
+        self._room_serials = {}
+        self._room_typing = {}
+
+    def stream_positions(self):
+        return {"typing": self._latest_room_serial}
+
+    def process_replication(self, result):
+        stream = result.get("typing")
+        if stream:
+            self._latest_room_serial = int(stream["position"])
+
+            for row in stream["rows"]:
+                position, room_id, typing_json = row
+                typing = json.loads(typing_json)
+                self._room_serials[room_id] = position
+                self._room_typing[room_id] = typing
+
+
+class SynchrotronApplicationService(object):
+    def notify_interested_services(self, event):
+        pass
+
+
+class SynchrotronServer(HomeServer):
+    def get_db_conn(self, run_new_connection=True):
+        # Any param beginning with cp_ is a parameter for adbapi, and should
+        # not be passed to the database engine.
+        db_params = {
+            k: v for k, v in self.db_config.get("args", {}).items()
+            if not k.startswith("cp_")
+        }
+        db_conn = self.database_engine.module.connect(**db_params)
+
+        if run_new_connection:
+            self.database_engine.on_new_connection(db_conn)
+        return db_conn
+
+    def setup(self):
+        logger.info("Setting up.")
+        self.datastore = SynchrotronSlavedStore(self.get_db_conn(), self)
+        logger.info("Finished setting up.")
+
+    def _listen_http(self, listener_config):
+        port = listener_config["port"]
+        bind_address = listener_config.get("bind_address", "")
+        site_tag = listener_config.get("tag", port)
+        resources = {}
+        for res in listener_config["resources"]:
+            for name in res["names"]:
+                if name == "metrics":
+                    resources[METRICS_PREFIX] = MetricsResource(self)
+                elif name == "client":
+                    resource = JsonResource(self, canonical_json=False)
+                    sync.register_servlets(self, resource)
+                    resources.update({
+                        "/_matrix/client/r0": resource,
+                        "/_matrix/client/unstable": resource,
+                        "/_matrix/client/v2_alpha": resource,
+                    })
+
+        root_resource = create_resource_tree(resources, Resource())
+        reactor.listenTCP(
+            port,
+            SynapseSite(
+                "synapse.access.http.%s" % (site_tag,),
+                site_tag,
+                listener_config,
+                root_resource,
+            ),
+            interface=bind_address
+        )
+        logger.info("Synapse synchrotron now listening on port %d", port)
+
+    def start_listening(self, listeners):
+        for listener in listeners:
+            if listener["type"] == "http":
+                self._listen_http(listener)
+            elif listener["type"] == "manhole":
+                reactor.listenTCP(
+                    listener["port"],
+                    manhole(
+                        username="matrix",
+                        password="rabbithole",
+                        globals={"hs": self},
+                    ),
+                    interface=listener.get("bind_address", '127.0.0.1')
+                )
+            else:
+                logger.warn("Unrecognized listener type: %s", listener["type"])
+
+    @defer.inlineCallbacks
+    def replicate(self):
+        http_client = self.get_simple_http_client()
+        store = self.get_datastore()
+        replication_url = self.config.worker_replication_url
+        clock = self.get_clock()
+        notifier = self.get_notifier()
+        presence_handler = self.get_presence_handler()
+        typing_handler = self.get_typing_handler()
+
+        def expire_broken_caches():
+            store.who_forgot_in_room.invalidate_all()
+            store.get_presence_list_accepted.invalidate_all()
+
+        def notify_from_stream(
+            result, stream_name, stream_key, room=None, user=None
+        ):
+            stream = result.get(stream_name)
+            if stream:
+                position_index = stream["field_names"].index("position")
+                if room:
+                    room_index = stream["field_names"].index(room)
+                if user:
+                    user_index = stream["field_names"].index(user)
+
+                users = ()
+                rooms = ()
+                for row in stream["rows"]:
+                    position = row[position_index]
+
+                    if user:
+                        users = (row[user_index],)
+
+                    if room:
+                        rooms = (row[room_index],)
+
+                    notifier.on_new_event(
+                        stream_key, position, users=users, rooms=rooms
+                    )
+
+        def notify(result):
+            stream = result.get("events")
+            if stream:
+                max_position = stream["position"]
+                for row in stream["rows"]:
+                    position = row[0]
+                    internal = json.loads(row[1])
+                    event_json = json.loads(row[2])
+                    event = FrozenEvent(event_json, internal_metadata_dict=internal)
+                    extra_users = ()
+                    if event.type == EventTypes.Member:
+                        extra_users = (event.state_key,)
+                    notifier.on_new_room_event(
+                        event, position, max_position, extra_users
+                    )
+
+            notify_from_stream(
+                result, "push_rules", "push_rules_key", user="user_id"
+            )
+            notify_from_stream(
+                result, "user_account_data", "account_data_key", user="user_id"
+            )
+            notify_from_stream(
+                result, "room_account_data", "account_data_key", user="user_id"
+            )
+            notify_from_stream(
+                result, "tag_account_data", "account_data_key", user="user_id"
+            )
+            notify_from_stream(
+                result, "receipts", "receipt_key", room="room_id"
+            )
+            notify_from_stream(
+                result, "typing", "typing_key", room="room_id"
+            )
+
+        next_expire_broken_caches_ms = 0
+        while True:
+            try:
+                args = store.stream_positions()
+                args.update(typing_handler.stream_positions())
+                args["timeout"] = 30000
+                result = yield http_client.get_json(replication_url, args=args)
+                now_ms = clock.time_msec()
+                if now_ms > next_expire_broken_caches_ms:
+                    expire_broken_caches()
+                    next_expire_broken_caches_ms = (
+                        now_ms + store.BROKEN_CACHE_EXPIRY_MS
+                    )
+                yield store.process_replication(result)
+                typing_handler.process_replication(result)
+                presence_handler.process_replication(result)
+                notify(result)
+            except:
+                logger.exception("Error replicating from %r", replication_url)
+                yield sleep(5)
+
+    def build_presence_handler(self):
+        return SynchrotronPresence(self)
+
+    def build_typing_handler(self):
+        return SynchrotronTyping(self)
+
+
+def start(config_options):
+    try:
+        config = HomeServerConfig.load_config(
+            "Synapse synchrotron", config_options
+        )
+    except ConfigError as e:
+        sys.stderr.write("\n" + e.message + "\n")
+        sys.exit(1)
+
+    assert config.worker_app == "synapse.app.synchrotron"
+
+    setup_logging(config.worker_log_config, config.worker_log_file)
+
+    database_engine = create_engine(config.database_config)
+
+    ss = SynchrotronServer(
+        config.server_name,
+        db_config=config.database_config,
+        config=config,
+        version_string="Synapse/" + get_version_string(synapse),
+        database_engine=database_engine,
+        application_service_handler=SynchrotronApplicationService(),
+    )
+
+    ss.setup()
+    ss.start_listening(config.worker_listeners)
+
+    def run():
+        with LoggingContext("run"):
+            logger.info("Running")
+            change_resource_limit(config.soft_file_limit)
+            if config.gc_thresholds:
+                gc.set_threshold(*config.gc_thresholds)
+            reactor.run()
+
+    def start():
+        ss.get_datastore().start_profiling()
+        ss.replicate()
+
+    reactor.callWhenRunning(start)
+
+    if config.worker_daemonize:
+        daemon = Daemonize(
+            app="synapse-synchrotron",
+            pid=config.worker_pid_file,
+            action=run,
+            auto_close_fds=False,
+            verbose=True,
+            logger=logger,
+        )
+        daemon.start()
+    else:
+        run()
+
+
+if __name__ == '__main__':
+    with LoggingContext("main"):
+        start(sys.argv[1:])
diff --git a/synapse/app/synctl.py b/synapse/app/synctl.py
index 39f4bf6e53..bb41962d47 100755
--- a/synapse/app/synctl.py
+++ b/synapse/app/synctl.py
@@ -14,11 +14,14 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import sys
+import argparse
+import collections
+import glob
 import os
 import os.path
-import subprocess
 import signal
+import subprocess
+import sys
 import yaml
 
 SYNAPSE = ["python", "-B", "-m", "synapse.app.homeserver"]
@@ -28,60 +31,181 @@ RED = "\x1b[1;31m"
 NORMAL = "\x1b[m"
 
 
+def write(message, colour=NORMAL, stream=sys.stdout):
+    if colour == NORMAL:
+        stream.write(message + "\n")
+    else:
+        stream.write(colour + message + NORMAL + "\n")
+
+
 def start(configfile):
-    print ("Starting ...")
+    write("Starting ...")
     args = SYNAPSE
     args.extend(["--daemonize", "-c", configfile])
 
     try:
         subprocess.check_call(args)
-        print (GREEN + "started" + NORMAL)
+        write("started synapse.app.homeserver(%r)" % (configfile,), colour=GREEN)
+    except subprocess.CalledProcessError as e:
+        write(
+            "error starting (exit code: %d); see above for logs" % e.returncode,
+            colour=RED,
+        )
+
+
+def start_worker(app, configfile, worker_configfile):
+    args = [
+        "python", "-B",
+        "-m", app,
+        "-c", configfile,
+        "-c", worker_configfile
+    ]
+
+    try:
+        subprocess.check_call(args)
+        write("started %s(%r)" % (app, worker_configfile), colour=GREEN)
     except subprocess.CalledProcessError as e:
-        print (
-            RED +
-            "error starting (exit code: %d); see above for logs" % e.returncode +
-            NORMAL
+        write(
+            "error starting %s(%r) (exit code: %d); see above for logs" % (
+                app, worker_configfile, e.returncode,
+            ),
+            colour=RED,
         )
 
 
-def stop(pidfile):
+def stop(pidfile, app):
     if os.path.exists(pidfile):
         pid = int(open(pidfile).read())
         os.kill(pid, signal.SIGTERM)
-        print (GREEN + "stopped" + NORMAL)
+        write("stopped %s" % (app,), colour=GREEN)
+
+
+Worker = collections.namedtuple("Worker", [
+    "app", "configfile", "pidfile", "cache_factor"
+])
 
 
 def main():
-    configfile = sys.argv[2] if len(sys.argv) == 3 else "homeserver.yaml"
+
+    parser = argparse.ArgumentParser()
+
+    parser.add_argument(
+        "action",
+        choices=["start", "stop", "restart"],
+        help="whether to start, stop or restart the synapse",
+    )
+    parser.add_argument(
+        "configfile",
+        nargs="?",
+        default="homeserver.yaml",
+        help="the homeserver config file, defaults to homserver.yaml",
+    )
+    parser.add_argument(
+        "-w", "--worker",
+        metavar="WORKERCONFIG",
+        help="start or stop a single worker",
+    )
+    parser.add_argument(
+        "-a", "--all-processes",
+        metavar="WORKERCONFIGDIR",
+        help="start or stop all the workers in the given directory"
+             " and the main synapse process",
+    )
+
+    options = parser.parse_args()
+
+    if options.worker and options.all_processes:
+        write(
+            'Cannot use "--worker" with "--all-processes"',
+            stream=sys.stderr
+        )
+        sys.exit(1)
+
+    configfile = options.configfile
 
     if not os.path.exists(configfile):
-        sys.stderr.write(
+        write(
             "No config file found\n"
             "To generate a config file, run '%s -c %s --generate-config"
             " --server-name=<server name>'\n" % (
-                " ".join(SYNAPSE), configfile
-            )
+                " ".join(SYNAPSE), options.configfile
+            ),
+            stream=sys.stderr,
         )
         sys.exit(1)
 
-    config = yaml.load(open(configfile))
+    with open(configfile) as stream:
+        config = yaml.load(stream)
+
     pidfile = config["pid_file"]
-    cache_factor = config.get("synctl_cache_factor", None)
+    cache_factor = config.get("synctl_cache_factor")
+    start_stop_synapse = True
 
     if cache_factor:
         os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor)
 
-    action = sys.argv[1] if sys.argv[1:] else "usage"
-    if action == "start":
-        start(configfile)
-    elif action == "stop":
-        stop(pidfile)
-    elif action == "restart":
-        stop(pidfile)
-        start(configfile)
-    else:
-        sys.stderr.write("Usage: %s [start|stop|restart] [configfile]\n" % (sys.argv[0],))
-        sys.exit(1)
+    worker_configfiles = []
+    if options.worker:
+        start_stop_synapse = False
+        worker_configfile = options.worker
+        if not os.path.exists(worker_configfile):
+            write(
+                "No worker config found at %r" % (worker_configfile,),
+                stream=sys.stderr,
+            )
+            sys.exit(1)
+        worker_configfiles.append(worker_configfile)
+
+    if options.all_processes:
+        worker_configdir = options.all_processes
+        if not os.path.isdir(worker_configdir):
+            write(
+                "No worker config directory found at %r" % (worker_configdir,),
+                stream=sys.stderr,
+            )
+            sys.exit(1)
+        worker_configfiles.extend(sorted(glob.glob(
+            os.path.join(worker_configdir, "*.yaml")
+        )))
+
+    workers = []
+    for worker_configfile in worker_configfiles:
+        with open(worker_configfile) as stream:
+            worker_config = yaml.load(stream)
+        worker_app = worker_config["worker_app"]
+        worker_pidfile = worker_config["worker_pid_file"]
+        worker_daemonize = worker_config["worker_daemonize"]
+        assert worker_daemonize  # TODO print something more user friendly
+        worker_cache_factor = worker_config.get("synctl_cache_factor")
+        workers.append(Worker(
+            worker_app, worker_configfile, worker_pidfile, worker_cache_factor,
+        ))
+
+    action = options.action
+
+    if action == "stop" or action == "restart":
+        for worker in workers:
+            stop(worker.pidfile, worker.app)
+
+        if start_stop_synapse:
+            stop(pidfile, "synapse.app.homeserver")
+
+        # TODO: Wait for synapse to actually shutdown before starting it again
+
+    if action == "start" or action == "restart":
+        if start_stop_synapse:
+            start(configfile)
+
+        for worker in workers:
+            if worker.cache_factor:
+                os.environ["SYNAPSE_CACHE_FACTOR"] = str(worker.cache_factor)
+
+            start_worker(worker.app, configfile, worker.configfile)
+
+            if cache_factor:
+                os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor)
+            else:
+                os.environ.pop("SYNAPSE_CACHE_FACTOR", None)
 
 
 if __name__ == "__main__":
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index 47a4e9f864..9afc8fd754 100644
--- a/synapse/appservice/scheduler.py
+++ b/synapse/appservice/scheduler.py
@@ -56,22 +56,22 @@ import logging
 logger = logging.getLogger(__name__)
 
 
-class AppServiceScheduler(object):
+class ApplicationServiceScheduler(object):
     """ Public facing API for this module. Does the required DI to tie the
     components together. This also serves as the "event_pool", which in this
     case is a simple array.
     """
 
-    def __init__(self, clock, store, as_api):
-        self.clock = clock
-        self.store = store
-        self.as_api = as_api
+    def __init__(self, hs):
+        self.clock = hs.get_clock()
+        self.store = hs.get_datastore()
+        self.as_api = hs.get_application_service_api()
 
         def create_recoverer(service, callback):
-            return _Recoverer(clock, store, as_api, service, callback)
+            return _Recoverer(self.clock, self.store, self.as_api, service, callback)
 
         self.txn_ctrl = _TransactionController(
-            clock, store, as_api, create_recoverer
+            self.clock, self.store, self.as_api, create_recoverer
         )
         self.queuer = _ServiceQueuer(self.txn_ctrl)