summary refs log tree commit diff
path: root/synapse/app
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/app')
-rw-r--r--synapse/app/__init__.py8
-rw-r--r--synapse/app/appservice.py214
-rw-r--r--synapse/app/client_reader.py220
-rw-r--r--synapse/app/federation_reader.py211
-rw-r--r--synapse/app/federation_sender.py331
-rwxr-xr-xsynapse/app/homeserver.py393
-rw-r--r--synapse/app/media_repository.py217
-rw-r--r--synapse/app/pusher.py303
-rw-r--r--synapse/app/synchrotron.py496
-rwxr-xr-xsynapse/app/synctl.py182
10 files changed, 2214 insertions, 361 deletions
diff --git a/synapse/app/__init__.py b/synapse/app/__init__.py
index 1bc4279807..9c2b627590 100644
--- a/synapse/app/__init__.py
+++ b/synapse/app/__init__.py
@@ -16,13 +16,11 @@
 import sys
 sys.dont_write_bytecode = True
 
-from synapse.python_dependencies import (
-    check_requirements, MissingRequirementError
-)  # NOQA
+from synapse import python_dependencies   # noqa: E402
 
 try:
-    check_requirements()
-except MissingRequirementError as e:
+    python_dependencies.check_requirements()
+except python_dependencies.MissingRequirementError as e:
     message = "\n".join([
         "Missing Requirement: %s" % (e.message,),
         "To install run:",
diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py
new file mode 100644
index 0000000000..dd9ee406a1
--- /dev/null
+++ b/synapse/app/appservice.py
@@ -0,0 +1,214 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import synapse
+
+from synapse.server import HomeServer
+from synapse.config._base import ConfigError
+from synapse.config.logger import setup_logging
+from synapse.config.homeserver import HomeServerConfig
+from synapse.http.site import SynapseSite
+from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.replication.slave.storage.directory import DirectoryStore
+from synapse.replication.slave.storage.events import SlavedEventStore
+from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
+from synapse.replication.slave.storage.registration import SlavedRegistrationStore
+from synapse.storage.engines import create_engine
+from synapse.util.async import sleep
+from synapse.util.httpresourcetree import create_resource_tree
+from synapse.util.logcontext import LoggingContext
+from synapse.util.manhole import manhole
+from synapse.util.rlimit import change_resource_limit
+from synapse.util.versionstring import get_version_string
+
+from synapse import events
+
+from twisted.internet import reactor, defer
+from twisted.web.resource import Resource
+
+from daemonize import Daemonize
+
+import sys
+import logging
+import gc
+
+logger = logging.getLogger("synapse.app.appservice")
+
+
+class AppserviceSlaveStore(
+    DirectoryStore, SlavedEventStore, SlavedApplicationServiceStore,
+    SlavedRegistrationStore,
+):
+    pass
+
+
+class AppserviceServer(HomeServer):
+    def get_db_conn(self, run_new_connection=True):
+        # Any param beginning with cp_ is a parameter for adbapi, and should
+        # not be passed to the database engine.
+        db_params = {
+            k: v for k, v in self.db_config.get("args", {}).items()
+            if not k.startswith("cp_")
+        }
+        db_conn = self.database_engine.module.connect(**db_params)
+
+        if run_new_connection:
+            self.database_engine.on_new_connection(db_conn)
+        return db_conn
+
+    def setup(self):
+        logger.info("Setting up.")
+        self.datastore = AppserviceSlaveStore(self.get_db_conn(), self)
+        logger.info("Finished setting up.")
+
+    def _listen_http(self, listener_config):
+        port = listener_config["port"]
+        bind_address = listener_config.get("bind_address", "")
+        site_tag = listener_config.get("tag", port)
+        resources = {}
+        for res in listener_config["resources"]:
+            for name in res["names"]:
+                if name == "metrics":
+                    resources[METRICS_PREFIX] = MetricsResource(self)
+
+        root_resource = create_resource_tree(resources, Resource())
+        reactor.listenTCP(
+            port,
+            SynapseSite(
+                "synapse.access.http.%s" % (site_tag,),
+                site_tag,
+                listener_config,
+                root_resource,
+            ),
+            interface=bind_address
+        )
+        logger.info("Synapse appservice now listening on port %d", port)
+
+    def start_listening(self, listeners):
+        for listener in listeners:
+            if listener["type"] == "http":
+                self._listen_http(listener)
+            elif listener["type"] == "manhole":
+                reactor.listenTCP(
+                    listener["port"],
+                    manhole(
+                        username="matrix",
+                        password="rabbithole",
+                        globals={"hs": self},
+                    ),
+                    interface=listener.get("bind_address", '127.0.0.1')
+                )
+            else:
+                logger.warn("Unrecognized listener type: %s", listener["type"])
+
+    @defer.inlineCallbacks
+    def replicate(self):
+        http_client = self.get_simple_http_client()
+        store = self.get_datastore()
+        replication_url = self.config.worker_replication_url
+        appservice_handler = self.get_application_service_handler()
+
+        @defer.inlineCallbacks
+        def replicate(results):
+            stream = results.get("events")
+            if stream:
+                max_stream_id = stream["position"]
+                yield appservice_handler.notify_interested_services(max_stream_id)
+
+        while True:
+            try:
+                args = store.stream_positions()
+                args["timeout"] = 30000
+                result = yield http_client.get_json(replication_url, args=args)
+                yield store.process_replication(result)
+                replicate(result)
+            except:
+                logger.exception("Error replicating from %r", replication_url)
+                yield sleep(30)
+
+
+def start(config_options):
+    try:
+        config = HomeServerConfig.load_config(
+            "Synapse appservice", config_options
+        )
+    except ConfigError as e:
+        sys.stderr.write("\n" + e.message + "\n")
+        sys.exit(1)
+
+    assert config.worker_app == "synapse.app.appservice"
+
+    setup_logging(config.worker_log_config, config.worker_log_file)
+
+    events.USE_FROZEN_DICTS = config.use_frozen_dicts
+
+    database_engine = create_engine(config.database_config)
+
+    if config.notify_appservices:
+        sys.stderr.write(
+            "\nThe appservices must be disabled in the main synapse process"
+            "\nbefore they can be run in a separate worker."
+            "\nPlease add ``notify_appservices: false`` to the main config"
+            "\n"
+        )
+        sys.exit(1)
+
+    # Force the pushers to start since they will be disabled in the main config
+    config.notify_appservices = True
+
+    ps = AppserviceServer(
+        config.server_name,
+        db_config=config.database_config,
+        config=config,
+        version_string="Synapse/" + get_version_string(synapse),
+        database_engine=database_engine,
+    )
+
+    ps.setup()
+    ps.start_listening(config.worker_listeners)
+
+    def run():
+        with LoggingContext("run"):
+            logger.info("Running")
+            change_resource_limit(config.soft_file_limit)
+            if config.gc_thresholds:
+                gc.set_threshold(*config.gc_thresholds)
+            reactor.run()
+
+    def start():
+        ps.replicate()
+        ps.get_datastore().start_profiling()
+        ps.get_state_handler().start_caching()
+
+    reactor.callWhenRunning(start)
+
+    if config.worker_daemonize:
+        daemon = Daemonize(
+            app="synapse-appservice",
+            pid=config.worker_pid_file,
+            action=run,
+            auto_close_fds=False,
+            verbose=True,
+            logger=logger,
+        )
+        daemon.start()
+    else:
+        run()
+
+
+if __name__ == '__main__':
+    with LoggingContext("main"):
+        start(sys.argv[1:])
diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py
new file mode 100644
index 0000000000..0086a2977e
--- /dev/null
+++ b/synapse/app/client_reader.py
@@ -0,0 +1,220 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import synapse
+
+from synapse.config._base import ConfigError
+from synapse.config.homeserver import HomeServerConfig
+from synapse.config.logger import setup_logging
+from synapse.http.site import SynapseSite
+from synapse.http.server import JsonResource
+from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.replication.slave.storage._base import BaseSlavedStore
+from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
+from synapse.replication.slave.storage.events import SlavedEventStore
+from synapse.replication.slave.storage.keys import SlavedKeyStore
+from synapse.replication.slave.storage.room import RoomStore
+from synapse.replication.slave.storage.directory import DirectoryStore
+from synapse.replication.slave.storage.registration import SlavedRegistrationStore
+from synapse.rest.client.v1.room import PublicRoomListRestServlet
+from synapse.server import HomeServer
+from synapse.storage.client_ips import ClientIpStore
+from synapse.storage.engines import create_engine
+from synapse.util.async import sleep
+from synapse.util.httpresourcetree import create_resource_tree
+from synapse.util.logcontext import LoggingContext
+from synapse.util.manhole import manhole
+from synapse.util.rlimit import change_resource_limit
+from synapse.util.versionstring import get_version_string
+from synapse.crypto import context_factory
+
+from synapse import events
+
+
+from twisted.internet import reactor, defer
+from twisted.web.resource import Resource
+
+from daemonize import Daemonize
+
+import sys
+import logging
+import gc
+
+logger = logging.getLogger("synapse.app.client_reader")
+
+
+class ClientReaderSlavedStore(
+    SlavedEventStore,
+    SlavedKeyStore,
+    RoomStore,
+    DirectoryStore,
+    SlavedApplicationServiceStore,
+    SlavedRegistrationStore,
+    BaseSlavedStore,
+    ClientIpStore,  # After BaseSlavedStore because the constructor is different
+):
+    pass
+
+
+class ClientReaderServer(HomeServer):
+    def get_db_conn(self, run_new_connection=True):
+        # Any param beginning with cp_ is a parameter for adbapi, and should
+        # not be passed to the database engine.
+        db_params = {
+            k: v for k, v in self.db_config.get("args", {}).items()
+            if not k.startswith("cp_")
+        }
+        db_conn = self.database_engine.module.connect(**db_params)
+
+        if run_new_connection:
+            self.database_engine.on_new_connection(db_conn)
+        return db_conn
+
+    def setup(self):
+        logger.info("Setting up.")
+        self.datastore = ClientReaderSlavedStore(self.get_db_conn(), self)
+        logger.info("Finished setting up.")
+
+    def _listen_http(self, listener_config):
+        port = listener_config["port"]
+        bind_address = listener_config.get("bind_address", "")
+        site_tag = listener_config.get("tag", port)
+        resources = {}
+        for res in listener_config["resources"]:
+            for name in res["names"]:
+                if name == "metrics":
+                    resources[METRICS_PREFIX] = MetricsResource(self)
+                elif name == "client":
+                    resource = JsonResource(self, canonical_json=False)
+                    PublicRoomListRestServlet(self).register(resource)
+                    resources.update({
+                        "/_matrix/client/r0": resource,
+                        "/_matrix/client/unstable": resource,
+                        "/_matrix/client/v2_alpha": resource,
+                        "/_matrix/client/api/v1": resource,
+                    })
+
+        root_resource = create_resource_tree(resources, Resource())
+        reactor.listenTCP(
+            port,
+            SynapseSite(
+                "synapse.access.http.%s" % (site_tag,),
+                site_tag,
+                listener_config,
+                root_resource,
+            ),
+            interface=bind_address
+        )
+        logger.info("Synapse client reader now listening on port %d", port)
+
+    def start_listening(self, listeners):
+        for listener in listeners:
+            if listener["type"] == "http":
+                self._listen_http(listener)
+            elif listener["type"] == "manhole":
+                reactor.listenTCP(
+                    listener["port"],
+                    manhole(
+                        username="matrix",
+                        password="rabbithole",
+                        globals={"hs": self},
+                    ),
+                    interface=listener.get("bind_address", '127.0.0.1')
+                )
+            else:
+                logger.warn("Unrecognized listener type: %s", listener["type"])
+
+    @defer.inlineCallbacks
+    def replicate(self):
+        http_client = self.get_simple_http_client()
+        store = self.get_datastore()
+        replication_url = self.config.worker_replication_url
+
+        while True:
+            try:
+                args = store.stream_positions()
+                args["timeout"] = 30000
+                result = yield http_client.get_json(replication_url, args=args)
+                yield store.process_replication(result)
+            except:
+                logger.exception("Error replicating from %r", replication_url)
+                yield sleep(5)
+
+
+def start(config_options):
+    try:
+        config = HomeServerConfig.load_config(
+            "Synapse client reader", config_options
+        )
+    except ConfigError as e:
+        sys.stderr.write("\n" + e.message + "\n")
+        sys.exit(1)
+
+    assert config.worker_app == "synapse.app.client_reader"
+
+    setup_logging(config.worker_log_config, config.worker_log_file)
+
+    events.USE_FROZEN_DICTS = config.use_frozen_dicts
+
+    database_engine = create_engine(config.database_config)
+
+    tls_server_context_factory = context_factory.ServerContextFactory(config)
+
+    ss = ClientReaderServer(
+        config.server_name,
+        db_config=config.database_config,
+        tls_server_context_factory=tls_server_context_factory,
+        config=config,
+        version_string="Synapse/" + get_version_string(synapse),
+        database_engine=database_engine,
+    )
+
+    ss.setup()
+    ss.get_handlers()
+    ss.start_listening(config.worker_listeners)
+
+    def run():
+        with LoggingContext("run"):
+            logger.info("Running")
+            change_resource_limit(config.soft_file_limit)
+            if config.gc_thresholds:
+                gc.set_threshold(*config.gc_thresholds)
+            reactor.run()
+
+    def start():
+        ss.get_state_handler().start_caching()
+        ss.get_datastore().start_profiling()
+        ss.replicate()
+
+    reactor.callWhenRunning(start)
+
+    if config.worker_daemonize:
+        daemon = Daemonize(
+            app="synapse-client-reader",
+            pid=config.worker_pid_file,
+            action=run,
+            auto_close_fds=False,
+            verbose=True,
+            logger=logger,
+        )
+        daemon.start()
+    else:
+        run()
+
+
+if __name__ == '__main__':
+    with LoggingContext("main"):
+        start(sys.argv[1:])
diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py
new file mode 100644
index 0000000000..b5f59a9931
--- /dev/null
+++ b/synapse/app/federation_reader.py
@@ -0,0 +1,211 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import synapse
+
+from synapse.config._base import ConfigError
+from synapse.config.homeserver import HomeServerConfig
+from synapse.config.logger import setup_logging
+from synapse.http.site import SynapseSite
+from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.replication.slave.storage._base import BaseSlavedStore
+from synapse.replication.slave.storage.events import SlavedEventStore
+from synapse.replication.slave.storage.keys import SlavedKeyStore
+from synapse.replication.slave.storage.room import RoomStore
+from synapse.replication.slave.storage.transactions import TransactionStore
+from synapse.replication.slave.storage.directory import DirectoryStore
+from synapse.server import HomeServer
+from synapse.storage.engines import create_engine
+from synapse.util.async import sleep
+from synapse.util.httpresourcetree import create_resource_tree
+from synapse.util.logcontext import LoggingContext
+from synapse.util.manhole import manhole
+from synapse.util.rlimit import change_resource_limit
+from synapse.util.versionstring import get_version_string
+from synapse.api.urls import FEDERATION_PREFIX
+from synapse.federation.transport.server import TransportLayerServer
+from synapse.crypto import context_factory
+
+from synapse import events
+
+
+from twisted.internet import reactor, defer
+from twisted.web.resource import Resource
+
+from daemonize import Daemonize
+
+import sys
+import logging
+import gc
+
+logger = logging.getLogger("synapse.app.federation_reader")
+
+
+class FederationReaderSlavedStore(
+    SlavedEventStore,
+    SlavedKeyStore,
+    RoomStore,
+    DirectoryStore,
+    TransactionStore,
+    BaseSlavedStore,
+):
+    pass
+
+
+class FederationReaderServer(HomeServer):
+    def get_db_conn(self, run_new_connection=True):
+        # Any param beginning with cp_ is a parameter for adbapi, and should
+        # not be passed to the database engine.
+        db_params = {
+            k: v for k, v in self.db_config.get("args", {}).items()
+            if not k.startswith("cp_")
+        }
+        db_conn = self.database_engine.module.connect(**db_params)
+
+        if run_new_connection:
+            self.database_engine.on_new_connection(db_conn)
+        return db_conn
+
+    def setup(self):
+        logger.info("Setting up.")
+        self.datastore = FederationReaderSlavedStore(self.get_db_conn(), self)
+        logger.info("Finished setting up.")
+
+    def _listen_http(self, listener_config):
+        port = listener_config["port"]
+        bind_address = listener_config.get("bind_address", "")
+        site_tag = listener_config.get("tag", port)
+        resources = {}
+        for res in listener_config["resources"]:
+            for name in res["names"]:
+                if name == "metrics":
+                    resources[METRICS_PREFIX] = MetricsResource(self)
+                elif name == "federation":
+                    resources.update({
+                        FEDERATION_PREFIX: TransportLayerServer(self),
+                    })
+
+        root_resource = create_resource_tree(resources, Resource())
+        reactor.listenTCP(
+            port,
+            SynapseSite(
+                "synapse.access.http.%s" % (site_tag,),
+                site_tag,
+                listener_config,
+                root_resource,
+            ),
+            interface=bind_address
+        )
+        logger.info("Synapse federation reader now listening on port %d", port)
+
+    def start_listening(self, listeners):
+        for listener in listeners:
+            if listener["type"] == "http":
+                self._listen_http(listener)
+            elif listener["type"] == "manhole":
+                reactor.listenTCP(
+                    listener["port"],
+                    manhole(
+                        username="matrix",
+                        password="rabbithole",
+                        globals={"hs": self},
+                    ),
+                    interface=listener.get("bind_address", '127.0.0.1')
+                )
+            else:
+                logger.warn("Unrecognized listener type: %s", listener["type"])
+
+    @defer.inlineCallbacks
+    def replicate(self):
+        http_client = self.get_simple_http_client()
+        store = self.get_datastore()
+        replication_url = self.config.worker_replication_url
+
+        while True:
+            try:
+                args = store.stream_positions()
+                args["timeout"] = 30000
+                result = yield http_client.get_json(replication_url, args=args)
+                yield store.process_replication(result)
+            except:
+                logger.exception("Error replicating from %r", replication_url)
+                yield sleep(5)
+
+
+def start(config_options):
+    try:
+        config = HomeServerConfig.load_config(
+            "Synapse federation reader", config_options
+        )
+    except ConfigError as e:
+        sys.stderr.write("\n" + e.message + "\n")
+        sys.exit(1)
+
+    assert config.worker_app == "synapse.app.federation_reader"
+
+    setup_logging(config.worker_log_config, config.worker_log_file)
+
+    events.USE_FROZEN_DICTS = config.use_frozen_dicts
+
+    database_engine = create_engine(config.database_config)
+
+    tls_server_context_factory = context_factory.ServerContextFactory(config)
+
+    ss = FederationReaderServer(
+        config.server_name,
+        db_config=config.database_config,
+        tls_server_context_factory=tls_server_context_factory,
+        config=config,
+        version_string="Synapse/" + get_version_string(synapse),
+        database_engine=database_engine,
+    )
+
+    ss.setup()
+    ss.get_handlers()
+    ss.start_listening(config.worker_listeners)
+
+    def run():
+        with LoggingContext("run"):
+            logger.info("Running")
+            change_resource_limit(config.soft_file_limit)
+            if config.gc_thresholds:
+                gc.set_threshold(*config.gc_thresholds)
+            reactor.run()
+
+    def start():
+        ss.get_state_handler().start_caching()
+        ss.get_datastore().start_profiling()
+        ss.replicate()
+
+    reactor.callWhenRunning(start)
+
+    if config.worker_daemonize:
+        daemon = Daemonize(
+            app="synapse-federation-reader",
+            pid=config.worker_pid_file,
+            action=run,
+            auto_close_fds=False,
+            verbose=True,
+            logger=logger,
+        )
+        daemon.start()
+    else:
+        run()
+
+
+if __name__ == '__main__':
+    with LoggingContext("main"):
+        start(sys.argv[1:])
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
new file mode 100644
index 0000000000..80ea4c8062
--- /dev/null
+++ b/synapse/app/federation_sender.py
@@ -0,0 +1,331 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import synapse
+
+from synapse.server import HomeServer
+from synapse.config._base import ConfigError
+from synapse.config.logger import setup_logging
+from synapse.config.homeserver import HomeServerConfig
+from synapse.crypto import context_factory
+from synapse.http.site import SynapseSite
+from synapse.federation import send_queue
+from synapse.federation.units import Edu
+from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
+from synapse.replication.slave.storage.events import SlavedEventStore
+from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
+from synapse.replication.slave.storage.registration import SlavedRegistrationStore
+from synapse.replication.slave.storage.transactions import TransactionStore
+from synapse.storage.engines import create_engine
+from synapse.storage.presence import UserPresenceState
+from synapse.util.async import sleep
+from synapse.util.httpresourcetree import create_resource_tree
+from synapse.util.logcontext import LoggingContext
+from synapse.util.manhole import manhole
+from synapse.util.rlimit import change_resource_limit
+from synapse.util.versionstring import get_version_string
+
+from synapse import events
+
+from twisted.internet import reactor, defer
+from twisted.web.resource import Resource
+
+from daemonize import Daemonize
+
+import sys
+import logging
+import gc
+import ujson as json
+
+logger = logging.getLogger("synapse.app.appservice")
+
+
+class FederationSenderSlaveStore(
+    SlavedDeviceInboxStore, TransactionStore, SlavedReceiptsStore, SlavedEventStore,
+    SlavedRegistrationStore,
+):
+    pass
+
+
+class FederationSenderServer(HomeServer):
+    def get_db_conn(self, run_new_connection=True):
+        # Any param beginning with cp_ is a parameter for adbapi, and should
+        # not be passed to the database engine.
+        db_params = {
+            k: v for k, v in self.db_config.get("args", {}).items()
+            if not k.startswith("cp_")
+        }
+        db_conn = self.database_engine.module.connect(**db_params)
+
+        if run_new_connection:
+            self.database_engine.on_new_connection(db_conn)
+        return db_conn
+
+    def setup(self):
+        logger.info("Setting up.")
+        self.datastore = FederationSenderSlaveStore(self.get_db_conn(), self)
+        logger.info("Finished setting up.")
+
+    def _listen_http(self, listener_config):
+        port = listener_config["port"]
+        bind_address = listener_config.get("bind_address", "")
+        site_tag = listener_config.get("tag", port)
+        resources = {}
+        for res in listener_config["resources"]:
+            for name in res["names"]:
+                if name == "metrics":
+                    resources[METRICS_PREFIX] = MetricsResource(self)
+
+        root_resource = create_resource_tree(resources, Resource())
+        reactor.listenTCP(
+            port,
+            SynapseSite(
+                "synapse.access.http.%s" % (site_tag,),
+                site_tag,
+                listener_config,
+                root_resource,
+            ),
+            interface=bind_address
+        )
+        logger.info("Synapse federation_sender now listening on port %d", port)
+
+    def start_listening(self, listeners):
+        for listener in listeners:
+            if listener["type"] == "http":
+                self._listen_http(listener)
+            elif listener["type"] == "manhole":
+                reactor.listenTCP(
+                    listener["port"],
+                    manhole(
+                        username="matrix",
+                        password="rabbithole",
+                        globals={"hs": self},
+                    ),
+                    interface=listener.get("bind_address", '127.0.0.1')
+                )
+            else:
+                logger.warn("Unrecognized listener type: %s", listener["type"])
+
+    @defer.inlineCallbacks
+    def replicate(self):
+        http_client = self.get_simple_http_client()
+        store = self.get_datastore()
+        replication_url = self.config.worker_replication_url
+        send_handler = FederationSenderHandler(self)
+
+        send_handler.on_start()
+
+        while True:
+            try:
+                args = store.stream_positions()
+                args.update((yield send_handler.stream_positions()))
+                args["timeout"] = 30000
+                result = yield http_client.get_json(replication_url, args=args)
+                yield store.process_replication(result)
+                yield send_handler.process_replication(result)
+            except:
+                logger.exception("Error replicating from %r", replication_url)
+                yield sleep(30)
+
+
+def start(config_options):
+    try:
+        config = HomeServerConfig.load_config(
+            "Synapse federation sender", config_options
+        )
+    except ConfigError as e:
+        sys.stderr.write("\n" + e.message + "\n")
+        sys.exit(1)
+
+    assert config.worker_app == "synapse.app.federation_sender"
+
+    setup_logging(config.worker_log_config, config.worker_log_file)
+
+    events.USE_FROZEN_DICTS = config.use_frozen_dicts
+
+    database_engine = create_engine(config.database_config)
+
+    if config.send_federation:
+        sys.stderr.write(
+            "\nThe send_federation must be disabled in the main synapse process"
+            "\nbefore they can be run in a separate worker."
+            "\nPlease add ``send_federation: false`` to the main config"
+            "\n"
+        )
+        sys.exit(1)
+
+    # Force the pushers to start since they will be disabled in the main config
+    config.send_federation = True
+
+    tls_server_context_factory = context_factory.ServerContextFactory(config)
+
+    ps = FederationSenderServer(
+        config.server_name,
+        db_config=config.database_config,
+        tls_server_context_factory=tls_server_context_factory,
+        config=config,
+        version_string="Synapse/" + get_version_string(synapse),
+        database_engine=database_engine,
+    )
+
+    ps.setup()
+    ps.start_listening(config.worker_listeners)
+
+    def run():
+        with LoggingContext("run"):
+            logger.info("Running")
+            change_resource_limit(config.soft_file_limit)
+            if config.gc_thresholds:
+                gc.set_threshold(*config.gc_thresholds)
+            reactor.run()
+
+    def start():
+        ps.replicate()
+        ps.get_datastore().start_profiling()
+        ps.get_state_handler().start_caching()
+
+    reactor.callWhenRunning(start)
+
+    if config.worker_daemonize:
+        daemon = Daemonize(
+            app="synapse-federation-sender",
+            pid=config.worker_pid_file,
+            action=run,
+            auto_close_fds=False,
+            verbose=True,
+            logger=logger,
+        )
+        daemon.start()
+    else:
+        run()
+
+
+class FederationSenderHandler(object):
+    """Processes the replication stream and forwards the appropriate entries
+    to the federation sender.
+    """
+    def __init__(self, hs):
+        self.store = hs.get_datastore()
+        self.federation_sender = hs.get_federation_sender()
+
+        self._room_serials = {}
+        self._room_typing = {}
+
+    def on_start(self):
+        # There may be some events that are persisted but haven't been sent,
+        # so send them now.
+        self.federation_sender.notify_new_events(
+            self.store.get_room_max_stream_ordering()
+        )
+
+    @defer.inlineCallbacks
+    def stream_positions(self):
+        stream_id = yield self.store.get_federation_out_pos("federation")
+        defer.returnValue({
+            "federation": stream_id,
+
+            # Ack stuff we've "processed", this should only be called from
+            # one process.
+            "federation_ack": stream_id,
+        })
+
+    @defer.inlineCallbacks
+    def process_replication(self, result):
+        # The federation stream contains things that we want to send out, e.g.
+        # presence, typing, etc.
+        fed_stream = result.get("federation")
+        if fed_stream:
+            latest_id = int(fed_stream["position"])
+
+            # The federation stream containis a bunch of different types of
+            # rows that need to be handled differently. We parse the rows, put
+            # them into the appropriate collection and then send them off.
+            presence_to_send = {}
+            keyed_edus = {}
+            edus = {}
+            failures = {}
+            device_destinations = set()
+
+            # Parse the rows in the stream
+            for row in fed_stream["rows"]:
+                position, typ, content_js = row
+                content = json.loads(content_js)
+
+                if typ == send_queue.PRESENCE_TYPE:
+                    destination = content["destination"]
+                    state = UserPresenceState.from_dict(content["state"])
+
+                    presence_to_send.setdefault(destination, []).append(state)
+                elif typ == send_queue.KEYED_EDU_TYPE:
+                    key = content["key"]
+                    edu = Edu(**content["edu"])
+
+                    keyed_edus.setdefault(
+                        edu.destination, {}
+                    )[(edu.destination, tuple(key))] = edu
+                elif typ == send_queue.EDU_TYPE:
+                    edu = Edu(**content)
+
+                    edus.setdefault(edu.destination, []).append(edu)
+                elif typ == send_queue.FAILURE_TYPE:
+                    destination = content["destination"]
+                    failure = content["failure"]
+
+                    failures.setdefault(destination, []).append(failure)
+                elif typ == send_queue.DEVICE_MESSAGE_TYPE:
+                    device_destinations.add(content["destination"])
+                else:
+                    raise Exception("Unrecognised federation type: %r", typ)
+
+            # We've finished collecting, send everything off
+            for destination, states in presence_to_send.items():
+                self.federation_sender.send_presence(destination, states)
+
+            for destination, edu_map in keyed_edus.items():
+                for key, edu in edu_map.items():
+                    self.federation_sender.send_edu(
+                        edu.destination, edu.edu_type, edu.content, key=key,
+                    )
+
+            for destination, edu_list in edus.items():
+                for edu in edu_list:
+                    self.federation_sender.send_edu(
+                        edu.destination, edu.edu_type, edu.content, key=None,
+                    )
+
+            for destination, failure_list in failures.items():
+                for failure in failure_list:
+                    self.federation_sender.send_failure(destination, failure)
+
+            for destination in device_destinations:
+                self.federation_sender.send_device_messages(destination)
+
+            # Record where we are in the stream.
+            yield self.store.update_federation_out_pos(
+                "federation", latest_id
+            )
+
+        # We also need to poke the federation sender when new events happen
+        event_stream = result.get("events")
+        if event_stream:
+            latest_pos = event_stream["position"]
+            self.federation_sender.notify_new_events(latest_pos)
+
+
+if __name__ == '__main__':
+    with LoggingContext("main"):
+        start(sys.argv[1:])
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index fcdc8e6e10..54f35900f8 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -16,14 +16,10 @@
 
 import synapse
 
-import contextlib
+import gc
 import logging
 import os
-import re
-import resource
-import subprocess
 import sys
-import time
 from synapse.config._base import ConfigError
 
 from synapse.python_dependencies import (
@@ -33,22 +29,15 @@ from synapse.python_dependencies import (
 from synapse.rest import ClientRestResource
 from synapse.storage.engines import create_engine, IncorrectDatabaseSetup
 from synapse.storage import are_all_users_on_domain
-from synapse.storage.prepare_database import UpgradeDatabaseException
+from synapse.storage.prepare_database import UpgradeDatabaseException, prepare_database
 
 from synapse.server import HomeServer
 
-
-from twisted.conch.manhole import ColoredManhole
-from twisted.conch.insults import insults
-from twisted.conch import manhole_ssh
-from twisted.cred import checkers, portal
-
-
 from twisted.internet import reactor, task, defer
 from twisted.application import service
 from twisted.web.resource import Resource, EncodingResourceWrapper
 from twisted.web.static import File
-from twisted.web.server import Site, GzipEncoderFactory, Request
+from twisted.web.server import GzipEncoderFactory
 from synapse.http.server import RootRedirect
 from synapse.rest.media.v0.content_repository import ContentRepoResource
 from synapse.rest.media.v1.media_repository import MediaRepositoryResource
@@ -62,10 +51,18 @@ from synapse.api.urls import (
 from synapse.config.homeserver import HomeServerConfig
 from synapse.crypto import context_factory
 from synapse.util.logcontext import LoggingContext
+from synapse.metrics import register_memory_metrics, get_metrics_for
 from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
 from synapse.replication.resource import ReplicationResource, REPLICATION_PREFIX
 from synapse.federation.transport.server import TransportLayerServer
 
+from synapse.util.rlimit import change_resource_limit
+from synapse.util.versionstring import get_version_string
+from synapse.util.httpresourcetree import create_resource_tree
+from synapse.util.manhole import manhole
+
+from synapse.http.site import SynapseSite
+
 from synapse import events
 
 from daemonize import Daemonize
@@ -73,9 +70,6 @@ from daemonize import Daemonize
 logger = logging.getLogger("synapse.app.homeserver")
 
 
-ACCESS_TOKEN_RE = re.compile(r'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$')
-
-
 def gz_wrap(r):
     return EncodingResourceWrapper(r, [GzipEncoderFactory()])
 
@@ -154,7 +148,7 @@ class SynapseHomeServer(HomeServer):
                         MEDIA_PREFIX: media_repo,
                         LEGACY_MEDIA_PREFIX: media_repo,
                         CONTENT_REPO_PREFIX: ContentRepoResource(
-                            self, self.config.uploads_path, self.auth, self.content_addr
+                            self, self.config.uploads_path
                         ),
                     })
 
@@ -173,7 +167,12 @@ class SynapseHomeServer(HomeServer):
                 if name == "replication":
                     resources[REPLICATION_PREFIX] = ReplicationResource(self)
 
-        root_resource = create_resource_tree(resources)
+        if WEB_CLIENT_PREFIX in resources:
+            root_resource = RootRedirect(WEB_CLIENT_PREFIX)
+        else:
+            root_resource = Resource()
+
+        root_resource = create_resource_tree(resources, root_resource)
         if tls:
             reactor.listenSSL(
                 port,
@@ -206,24 +205,13 @@ class SynapseHomeServer(HomeServer):
             if listener["type"] == "http":
                 self._listener_http(config, listener)
             elif listener["type"] == "manhole":
-                checker = checkers.InMemoryUsernamePasswordDatabaseDontUse(
-                    matrix="rabbithole"
-                )
-
-                rlm = manhole_ssh.TerminalRealm()
-                rlm.chainedProtocolFactory = lambda: insults.ServerProtocol(
-                    ColoredManhole,
-                    {
-                        "__name__": "__console__",
-                        "hs": self,
-                    }
-                )
-
-                f = manhole_ssh.ConchFactory(portal.Portal(rlm, [checker]))
-
                 reactor.listenTCP(
                     listener["port"],
-                    f,
+                    manhole(
+                        username="matrix",
+                        password="rabbithole",
+                        globals={"hs": self},
+                    ),
                     interface=listener.get("bind_address", '127.0.0.1')
                 )
             else:
@@ -245,7 +233,7 @@ class SynapseHomeServer(HomeServer):
         except IncorrectDatabaseSetup as e:
             quit_with_error(e.message)
 
-    def get_db_conn(self):
+    def get_db_conn(self, run_new_connection=True):
         # Any param beginning with cp_ is a parameter for adbapi, and should
         # not be passed to the database engine.
         db_params = {
@@ -254,7 +242,8 @@ class SynapseHomeServer(HomeServer):
         }
         db_conn = self.database_engine.module.connect(**db_params)
 
-        self.database_engine.on_new_connection(db_conn)
+        if run_new_connection:
+            self.database_engine.on_new_connection(db_conn)
         return db_conn
 
 
@@ -268,86 +257,6 @@ def quit_with_error(error_string):
     sys.exit(1)
 
 
-def get_version_string():
-    try:
-        null = open(os.devnull, 'w')
-        cwd = os.path.dirname(os.path.abspath(__file__))
-        try:
-            git_branch = subprocess.check_output(
-                ['git', 'rev-parse', '--abbrev-ref', 'HEAD'],
-                stderr=null,
-                cwd=cwd,
-            ).strip()
-            git_branch = "b=" + git_branch
-        except subprocess.CalledProcessError:
-            git_branch = ""
-
-        try:
-            git_tag = subprocess.check_output(
-                ['git', 'describe', '--exact-match'],
-                stderr=null,
-                cwd=cwd,
-            ).strip()
-            git_tag = "t=" + git_tag
-        except subprocess.CalledProcessError:
-            git_tag = ""
-
-        try:
-            git_commit = subprocess.check_output(
-                ['git', 'rev-parse', '--short', 'HEAD'],
-                stderr=null,
-                cwd=cwd,
-            ).strip()
-        except subprocess.CalledProcessError:
-            git_commit = ""
-
-        try:
-            dirty_string = "-this_is_a_dirty_checkout"
-            is_dirty = subprocess.check_output(
-                ['git', 'describe', '--dirty=' + dirty_string],
-                stderr=null,
-                cwd=cwd,
-            ).strip().endswith(dirty_string)
-
-            git_dirty = "dirty" if is_dirty else ""
-        except subprocess.CalledProcessError:
-            git_dirty = ""
-
-        if git_branch or git_tag or git_commit or git_dirty:
-            git_version = ",".join(
-                s for s in
-                (git_branch, git_tag, git_commit, git_dirty,)
-                if s
-            )
-
-            return (
-                "Synapse/%s (%s)" % (
-                    synapse.__version__, git_version,
-                )
-            ).encode("ascii")
-    except Exception as e:
-        logger.info("Failed to check for git repository: %s", e)
-
-    return ("Synapse/%s" % (synapse.__version__,)).encode("ascii")
-
-
-def change_resource_limit(soft_file_no):
-    try:
-        soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE)
-
-        if not soft_file_no:
-            soft_file_no = hard
-
-        resource.setrlimit(resource.RLIMIT_NOFILE, (soft_file_no, hard))
-        logger.info("Set file limit to: %d", soft_file_no)
-
-        resource.setrlimit(
-            resource.RLIMIT_CORE, (resource.RLIM_INFINITY, resource.RLIM_INFINITY)
-        )
-    except (ValueError, resource.error) as e:
-        logger.warn("Failed to set file or core limit: %s", e)
-
-
 def setup(config_options):
     """
     Args:
@@ -358,10 +267,9 @@ def setup(config_options):
         HomeServer
     """
     try:
-        config = HomeServerConfig.load_config(
+        config = HomeServerConfig.load_or_generate_config(
             "Synapse Homeserver",
             config_options,
-            generate_section="Homeserver"
         )
     except ConfigError as e:
         sys.stderr.write("\n" + e.message + "\n")
@@ -377,7 +285,7 @@ def setup(config_options):
     # check any extra requirements we have now we have a config
     check_requirements(config)
 
-    version_string = get_version_string()
+    version_string = "Synapse/" + get_version_string(synapse)
 
     logger.info("Server hostname: %s", config.server_name)
     logger.info("Server version: %s", version_string)
@@ -386,7 +294,7 @@ def setup(config_options):
 
     tls_server_context_factory = context_factory.ServerContextFactory(config)
 
-    database_engine = create_engine(config)
+    database_engine = create_engine(config.database_config)
     config.database_config["args"]["cp_openfun"] = database_engine.on_new_connection
 
     hs = SynapseHomeServer(
@@ -394,7 +302,6 @@ def setup(config_options):
         db_config=config.database_config,
         tls_server_context_factory=tls_server_context_factory,
         config=config,
-        content_addr=config.content_addr,
         version_string=version_string,
         database_engine=database_engine,
     )
@@ -402,8 +309,10 @@ def setup(config_options):
     logger.info("Preparing database: %s...", config.database_config['name'])
 
     try:
-        db_conn = hs.get_db_conn()
-        database_engine.prepare_database(db_conn)
+        db_conn = hs.get_db_conn(run_new_connection=False)
+        prepare_database(db_conn, database_engine, config=config)
+        database_engine.on_new_connection(db_conn)
+
         hs.run_startup_checks(db_conn, database_engine)
 
         db_conn.commit()
@@ -427,6 +336,8 @@ def setup(config_options):
         hs.get_datastore().start_doing_background_updates()
         hs.get_replication_layer().start_get_pdu_cache()
 
+        register_memory_metrics(hs)
+
     reactor.callWhenRunning(start)
 
     return hs
@@ -442,215 +353,13 @@ class SynapseService(service.Service):
     def startService(self):
         hs = setup(self.config)
         change_resource_limit(hs.config.soft_file_limit)
+        if hs.config.gc_thresholds:
+            gc.set_threshold(*hs.config.gc_thresholds)
 
     def stopService(self):
         return self._port.stopListening()
 
 
-class SynapseRequest(Request):
-    def __init__(self, site, *args, **kw):
-        Request.__init__(self, *args, **kw)
-        self.site = site
-        self.authenticated_entity = None
-        self.start_time = 0
-
-    def __repr__(self):
-        # We overwrite this so that we don't log ``access_token``
-        return '<%s at 0x%x method=%s uri=%s clientproto=%s site=%s>' % (
-            self.__class__.__name__,
-            id(self),
-            self.method,
-            self.get_redacted_uri(),
-            self.clientproto,
-            self.site.site_tag,
-        )
-
-    def get_redacted_uri(self):
-        return ACCESS_TOKEN_RE.sub(
-            r'\1<redacted>\3',
-            self.uri
-        )
-
-    def get_user_agent(self):
-        return self.requestHeaders.getRawHeaders("User-Agent", [None])[-1]
-
-    def started_processing(self):
-        self.site.access_logger.info(
-            "%s - %s - Received request: %s %s",
-            self.getClientIP(),
-            self.site.site_tag,
-            self.method,
-            self.get_redacted_uri()
-        )
-        self.start_time = int(time.time() * 1000)
-
-    def finished_processing(self):
-
-        try:
-            context = LoggingContext.current_context()
-            ru_utime, ru_stime = context.get_resource_usage()
-            db_txn_count = context.db_txn_count
-            db_txn_duration = context.db_txn_duration
-        except:
-            ru_utime, ru_stime = (0, 0)
-            db_txn_count, db_txn_duration = (0, 0)
-
-        self.site.access_logger.info(
-            "%s - %s - {%s}"
-            " Processed request: %dms (%dms, %dms) (%dms/%d)"
-            " %sB %s \"%s %s %s\" \"%s\"",
-            self.getClientIP(),
-            self.site.site_tag,
-            self.authenticated_entity,
-            int(time.time() * 1000) - self.start_time,
-            int(ru_utime * 1000),
-            int(ru_stime * 1000),
-            int(db_txn_duration * 1000),
-            int(db_txn_count),
-            self.sentLength,
-            self.code,
-            self.method,
-            self.get_redacted_uri(),
-            self.clientproto,
-            self.get_user_agent(),
-        )
-
-    @contextlib.contextmanager
-    def processing(self):
-        self.started_processing()
-        yield
-        self.finished_processing()
-
-
-class XForwardedForRequest(SynapseRequest):
-    def __init__(self, *args, **kw):
-        SynapseRequest.__init__(self, *args, **kw)
-
-    """
-    Add a layer on top of another request that only uses the value of an
-    X-Forwarded-For header as the result of C{getClientIP}.
-    """
-    def getClientIP(self):
-        """
-        @return: The client address (the first address) in the value of the
-            I{X-Forwarded-For header}.  If the header is not present, return
-            C{b"-"}.
-        """
-        return self.requestHeaders.getRawHeaders(
-            b"x-forwarded-for", [b"-"])[0].split(b",")[0].strip()
-
-
-class SynapseRequestFactory(object):
-    def __init__(self, site, x_forwarded_for):
-        self.site = site
-        self.x_forwarded_for = x_forwarded_for
-
-    def __call__(self, *args, **kwargs):
-        if self.x_forwarded_for:
-            return XForwardedForRequest(self.site, *args, **kwargs)
-        else:
-            return SynapseRequest(self.site, *args, **kwargs)
-
-
-class SynapseSite(Site):
-    """
-    Subclass of a twisted http Site that does access logging with python's
-    standard logging
-    """
-    def __init__(self, logger_name, site_tag, config, resource, *args, **kwargs):
-        Site.__init__(self, resource, *args, **kwargs)
-
-        self.site_tag = site_tag
-
-        proxied = config.get("x_forwarded", False)
-        self.requestFactory = SynapseRequestFactory(self, proxied)
-        self.access_logger = logging.getLogger(logger_name)
-
-    def log(self, request):
-        pass
-
-
-def create_resource_tree(desired_tree, redirect_root_to_web_client=True):
-    """Create the resource tree for this Home Server.
-
-    This in unduly complicated because Twisted does not support putting
-    child resources more than 1 level deep at a time.
-
-    Args:
-        web_client (bool): True to enable the web client.
-        redirect_root_to_web_client (bool): True to redirect '/' to the
-        location of the web client. This does nothing if web_client is not
-        True.
-    """
-    if redirect_root_to_web_client and WEB_CLIENT_PREFIX in desired_tree:
-        root_resource = RootRedirect(WEB_CLIENT_PREFIX)
-    else:
-        root_resource = Resource()
-
-    # ideally we'd just use getChild and putChild but getChild doesn't work
-    # unless you give it a Request object IN ADDITION to the name :/ So
-    # instead, we'll store a copy of this mapping so we can actually add
-    # extra resources to existing nodes. See self._resource_id for the key.
-    resource_mappings = {}
-    for full_path, res in desired_tree.items():
-        logger.info("Attaching %s to path %s", res, full_path)
-        last_resource = root_resource
-        for path_seg in full_path.split('/')[1:-1]:
-            if path_seg not in last_resource.listNames():
-                # resource doesn't exist, so make a "dummy resource"
-                child_resource = Resource()
-                last_resource.putChild(path_seg, child_resource)
-                res_id = _resource_id(last_resource, path_seg)
-                resource_mappings[res_id] = child_resource
-                last_resource = child_resource
-            else:
-                # we have an existing Resource, use that instead.
-                res_id = _resource_id(last_resource, path_seg)
-                last_resource = resource_mappings[res_id]
-
-        # ===========================
-        # now attach the actual desired resource
-        last_path_seg = full_path.split('/')[-1]
-
-        # if there is already a resource here, thieve its children and
-        # replace it
-        res_id = _resource_id(last_resource, last_path_seg)
-        if res_id in resource_mappings:
-            # there is a dummy resource at this path already, which needs
-            # to be replaced with the desired resource.
-            existing_dummy_resource = resource_mappings[res_id]
-            for child_name in existing_dummy_resource.listNames():
-                child_res_id = _resource_id(
-                    existing_dummy_resource, child_name
-                )
-                child_resource = resource_mappings[child_res_id]
-                # steal the children
-                res.putChild(child_name, child_resource)
-
-        # finally, insert the desired resource in the right place
-        last_resource.putChild(last_path_seg, res)
-        res_id = _resource_id(last_resource, last_path_seg)
-        resource_mappings[res_id] = res
-
-    return root_resource
-
-
-def _resource_id(resource, path_seg):
-    """Construct an arbitrary resource ID so you can retrieve the mapping
-    later.
-
-    If you want to represent resource A putChild resource B with path C,
-    the mapping should looks like _resource_id(A,C) = B.
-
-    Args:
-        resource (Resource): The *parent* Resourceb
-        path_seg (str): The name of the child Resource to be attached.
-    Returns:
-        str: A unique string which can be a key to the child Resource.
-    """
-    return "%s-%s" % (resource, path_seg)
-
-
 def run(hs):
     PROFILE_SYNAPSE = False
     if PROFILE_SYNAPSE:
@@ -676,6 +385,8 @@ def run(hs):
 
     start_time = hs.get_clock().time()
 
+    stats = {}
+
     @defer.inlineCallbacks
     def phone_stats_home():
         logger.info("Gathering stats for reporting")
@@ -684,7 +395,10 @@ def run(hs):
         if uptime < 0:
             uptime = 0
 
-        stats = {}
+        # If the stats directory is empty then this is the first time we've
+        # reported stats.
+        first_time = not stats
+
         stats["homeserver"] = hs.config.server_name
         stats["timestamp"] = now
         stats["uptime_seconds"] = uptime
@@ -697,6 +411,25 @@ def run(hs):
         daily_messages = yield hs.get_datastore().count_daily_messages()
         if daily_messages is not None:
             stats["daily_messages"] = daily_messages
+        else:
+            stats.pop("daily_messages", None)
+
+        if first_time:
+            # Add callbacks to report the synapse stats as metrics whenever
+            # prometheus requests them, typically every 30s.
+            # As some of the stats are expensive to calculate we only update
+            # them when synapse phones home to matrix.org every 24 hours.
+            metrics = get_metrics_for("synapse.usage")
+            metrics.add_callback("timestamp", lambda: stats["timestamp"])
+            metrics.add_callback("uptime_seconds", lambda: stats["uptime_seconds"])
+            metrics.add_callback("total_users", lambda: stats["total_users"])
+            metrics.add_callback("total_room_count", lambda: stats["total_room_count"])
+            metrics.add_callback(
+                "daily_active_users", lambda: stats["daily_active_users"]
+            )
+            metrics.add_callback(
+                "daily_messages", lambda: stats.get("daily_messages", 0)
+            )
 
         logger.info("Reporting stats to matrix.org: %s" % (stats,))
         try:
@@ -717,6 +450,8 @@ def run(hs):
         # sys.settrace(logcontext_tracer)
         with LoggingContext("run"):
             change_resource_limit(hs.config.soft_file_limit)
+            if hs.config.gc_thresholds:
+                gc.set_threshold(*hs.config.gc_thresholds)
             reactor.run()
 
     if hs.config.daemonize:
diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py
new file mode 100644
index 0000000000..44c19a1bef
--- /dev/null
+++ b/synapse/app/media_repository.py
@@ -0,0 +1,217 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import synapse
+
+from synapse.config._base import ConfigError
+from synapse.config.homeserver import HomeServerConfig
+from synapse.config.logger import setup_logging
+from synapse.http.site import SynapseSite
+from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.replication.slave.storage._base import BaseSlavedStore
+from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
+from synapse.replication.slave.storage.registration import SlavedRegistrationStore
+from synapse.rest.media.v0.content_repository import ContentRepoResource
+from synapse.rest.media.v1.media_repository import MediaRepositoryResource
+from synapse.server import HomeServer
+from synapse.storage.client_ips import ClientIpStore
+from synapse.storage.engines import create_engine
+from synapse.storage.media_repository import MediaRepositoryStore
+from synapse.util.async import sleep
+from synapse.util.httpresourcetree import create_resource_tree
+from synapse.util.logcontext import LoggingContext
+from synapse.util.manhole import manhole
+from synapse.util.rlimit import change_resource_limit
+from synapse.util.versionstring import get_version_string
+from synapse.api.urls import (
+    CONTENT_REPO_PREFIX, LEGACY_MEDIA_PREFIX, MEDIA_PREFIX
+)
+from synapse.crypto import context_factory
+
+from synapse import events
+
+
+from twisted.internet import reactor, defer
+from twisted.web.resource import Resource
+
+from daemonize import Daemonize
+
+import sys
+import logging
+import gc
+
+logger = logging.getLogger("synapse.app.media_repository")
+
+
+class MediaRepositorySlavedStore(
+    SlavedApplicationServiceStore,
+    SlavedRegistrationStore,
+    BaseSlavedStore,
+    MediaRepositoryStore,
+    ClientIpStore,
+):
+    pass
+
+
+class MediaRepositoryServer(HomeServer):
+    def get_db_conn(self, run_new_connection=True):
+        # Any param beginning with cp_ is a parameter for adbapi, and should
+        # not be passed to the database engine.
+        db_params = {
+            k: v for k, v in self.db_config.get("args", {}).items()
+            if not k.startswith("cp_")
+        }
+        db_conn = self.database_engine.module.connect(**db_params)
+
+        if run_new_connection:
+            self.database_engine.on_new_connection(db_conn)
+        return db_conn
+
+    def setup(self):
+        logger.info("Setting up.")
+        self.datastore = MediaRepositorySlavedStore(self.get_db_conn(), self)
+        logger.info("Finished setting up.")
+
+    def _listen_http(self, listener_config):
+        port = listener_config["port"]
+        bind_address = listener_config.get("bind_address", "")
+        site_tag = listener_config.get("tag", port)
+        resources = {}
+        for res in listener_config["resources"]:
+            for name in res["names"]:
+                if name == "metrics":
+                    resources[METRICS_PREFIX] = MetricsResource(self)
+                elif name == "media":
+                    media_repo = MediaRepositoryResource(self)
+                    resources.update({
+                        MEDIA_PREFIX: media_repo,
+                        LEGACY_MEDIA_PREFIX: media_repo,
+                        CONTENT_REPO_PREFIX: ContentRepoResource(
+                            self, self.config.uploads_path
+                        ),
+                    })
+
+        root_resource = create_resource_tree(resources, Resource())
+        reactor.listenTCP(
+            port,
+            SynapseSite(
+                "synapse.access.http.%s" % (site_tag,),
+                site_tag,
+                listener_config,
+                root_resource,
+            ),
+            interface=bind_address
+        )
+        logger.info("Synapse media repository now listening on port %d", port)
+
+    def start_listening(self, listeners):
+        for listener in listeners:
+            if listener["type"] == "http":
+                self._listen_http(listener)
+            elif listener["type"] == "manhole":
+                reactor.listenTCP(
+                    listener["port"],
+                    manhole(
+                        username="matrix",
+                        password="rabbithole",
+                        globals={"hs": self},
+                    ),
+                    interface=listener.get("bind_address", '127.0.0.1')
+                )
+            else:
+                logger.warn("Unrecognized listener type: %s", listener["type"])
+
+    @defer.inlineCallbacks
+    def replicate(self):
+        http_client = self.get_simple_http_client()
+        store = self.get_datastore()
+        replication_url = self.config.worker_replication_url
+
+        while True:
+            try:
+                args = store.stream_positions()
+                args["timeout"] = 30000
+                result = yield http_client.get_json(replication_url, args=args)
+                yield store.process_replication(result)
+            except:
+                logger.exception("Error replicating from %r", replication_url)
+                yield sleep(5)
+
+
+def start(config_options):
+    try:
+        config = HomeServerConfig.load_config(
+            "Synapse media repository", config_options
+        )
+    except ConfigError as e:
+        sys.stderr.write("\n" + e.message + "\n")
+        sys.exit(1)
+
+    assert config.worker_app == "synapse.app.media_repository"
+
+    setup_logging(config.worker_log_config, config.worker_log_file)
+
+    events.USE_FROZEN_DICTS = config.use_frozen_dicts
+
+    database_engine = create_engine(config.database_config)
+
+    tls_server_context_factory = context_factory.ServerContextFactory(config)
+
+    ss = MediaRepositoryServer(
+        config.server_name,
+        db_config=config.database_config,
+        tls_server_context_factory=tls_server_context_factory,
+        config=config,
+        version_string="Synapse/" + get_version_string(synapse),
+        database_engine=database_engine,
+    )
+
+    ss.setup()
+    ss.get_handlers()
+    ss.start_listening(config.worker_listeners)
+
+    def run():
+        with LoggingContext("run"):
+            logger.info("Running")
+            change_resource_limit(config.soft_file_limit)
+            if config.gc_thresholds:
+                gc.set_threshold(*config.gc_thresholds)
+            reactor.run()
+
+    def start():
+        ss.get_state_handler().start_caching()
+        ss.get_datastore().start_profiling()
+        ss.replicate()
+
+    reactor.callWhenRunning(start)
+
+    if config.worker_daemonize:
+        daemon = Daemonize(
+            app="synapse-media-repository",
+            pid=config.worker_pid_file,
+            action=run,
+            auto_close_fds=False,
+            verbose=True,
+            logger=logger,
+        )
+        daemon.start()
+    else:
+        run()
+
+
+if __name__ == '__main__':
+    with LoggingContext("main"):
+        start(sys.argv[1:])
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
new file mode 100644
index 0000000000..a0e765c54f
--- /dev/null
+++ b/synapse/app/pusher.py
@@ -0,0 +1,303 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import synapse
+
+from synapse.server import HomeServer
+from synapse.config._base import ConfigError
+from synapse.config.logger import setup_logging
+from synapse.config.homeserver import HomeServerConfig
+from synapse.http.site import SynapseSite
+from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.storage.roommember import RoomMemberStore
+from synapse.replication.slave.storage.events import SlavedEventStore
+from synapse.replication.slave.storage.pushers import SlavedPusherStore
+from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
+from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
+from synapse.storage.engines import create_engine
+from synapse.storage import DataStore
+from synapse.util.async import sleep
+from synapse.util.httpresourcetree import create_resource_tree
+from synapse.util.logcontext import LoggingContext, preserve_fn
+from synapse.util.manhole import manhole
+from synapse.util.rlimit import change_resource_limit
+from synapse.util.versionstring import get_version_string
+
+from synapse import events
+
+from twisted.internet import reactor, defer
+from twisted.web.resource import Resource
+
+from daemonize import Daemonize
+
+import sys
+import logging
+import gc
+
+logger = logging.getLogger("synapse.app.pusher")
+
+
+class PusherSlaveStore(
+    SlavedEventStore, SlavedPusherStore, SlavedReceiptsStore,
+    SlavedAccountDataStore
+):
+    update_pusher_last_stream_ordering_and_success = (
+        DataStore.update_pusher_last_stream_ordering_and_success.__func__
+    )
+
+    update_pusher_failing_since = (
+        DataStore.update_pusher_failing_since.__func__
+    )
+
+    update_pusher_last_stream_ordering = (
+        DataStore.update_pusher_last_stream_ordering.__func__
+    )
+
+    get_throttle_params_by_room = (
+        DataStore.get_throttle_params_by_room.__func__
+    )
+
+    set_throttle_params = (
+        DataStore.set_throttle_params.__func__
+    )
+
+    get_time_of_last_push_action_before = (
+        DataStore.get_time_of_last_push_action_before.__func__
+    )
+
+    get_profile_displayname = (
+        DataStore.get_profile_displayname.__func__
+    )
+
+    who_forgot_in_room = (
+        RoomMemberStore.__dict__["who_forgot_in_room"]
+    )
+
+
+class PusherServer(HomeServer):
+
+    def get_db_conn(self, run_new_connection=True):
+        # Any param beginning with cp_ is a parameter for adbapi, and should
+        # not be passed to the database engine.
+        db_params = {
+            k: v for k, v in self.db_config.get("args", {}).items()
+            if not k.startswith("cp_")
+        }
+        db_conn = self.database_engine.module.connect(**db_params)
+
+        if run_new_connection:
+            self.database_engine.on_new_connection(db_conn)
+        return db_conn
+
+    def setup(self):
+        logger.info("Setting up.")
+        self.datastore = PusherSlaveStore(self.get_db_conn(), self)
+        logger.info("Finished setting up.")
+
+    def remove_pusher(self, app_id, push_key, user_id):
+        http_client = self.get_simple_http_client()
+        replication_url = self.config.worker_replication_url
+        url = replication_url + "/remove_pushers"
+        return http_client.post_json_get_json(url, {
+            "remove": [{
+                "app_id": app_id,
+                "push_key": push_key,
+                "user_id": user_id,
+            }]
+        })
+
+    def _listen_http(self, listener_config):
+        port = listener_config["port"]
+        bind_address = listener_config.get("bind_address", "")
+        site_tag = listener_config.get("tag", port)
+        resources = {}
+        for res in listener_config["resources"]:
+            for name in res["names"]:
+                if name == "metrics":
+                    resources[METRICS_PREFIX] = MetricsResource(self)
+
+        root_resource = create_resource_tree(resources, Resource())
+        reactor.listenTCP(
+            port,
+            SynapseSite(
+                "synapse.access.http.%s" % (site_tag,),
+                site_tag,
+                listener_config,
+                root_resource,
+            ),
+            interface=bind_address
+        )
+        logger.info("Synapse pusher now listening on port %d", port)
+
+    def start_listening(self, listeners):
+        for listener in listeners:
+            if listener["type"] == "http":
+                self._listen_http(listener)
+            elif listener["type"] == "manhole":
+                reactor.listenTCP(
+                    listener["port"],
+                    manhole(
+                        username="matrix",
+                        password="rabbithole",
+                        globals={"hs": self},
+                    ),
+                    interface=listener.get("bind_address", '127.0.0.1')
+                )
+            else:
+                logger.warn("Unrecognized listener type: %s", listener["type"])
+
+    @defer.inlineCallbacks
+    def replicate(self):
+        http_client = self.get_simple_http_client()
+        store = self.get_datastore()
+        replication_url = self.config.worker_replication_url
+        pusher_pool = self.get_pusherpool()
+
+        def stop_pusher(user_id, app_id, pushkey):
+            key = "%s:%s" % (app_id, pushkey)
+            pushers_for_user = pusher_pool.pushers.get(user_id, {})
+            pusher = pushers_for_user.pop(key, None)
+            if pusher is None:
+                return
+            logger.info("Stopping pusher %r / %r", user_id, key)
+            pusher.on_stop()
+
+        def start_pusher(user_id, app_id, pushkey):
+            key = "%s:%s" % (app_id, pushkey)
+            logger.info("Starting pusher %r / %r", user_id, key)
+            return pusher_pool._refresh_pusher(app_id, pushkey, user_id)
+
+        @defer.inlineCallbacks
+        def poke_pushers(results):
+            pushers_rows = set(
+                map(tuple, results.get("pushers", {}).get("rows", []))
+            )
+            deleted_pushers_rows = set(
+                map(tuple, results.get("deleted_pushers", {}).get("rows", []))
+            )
+            for row in sorted(pushers_rows | deleted_pushers_rows):
+                if row in deleted_pushers_rows:
+                    user_id, app_id, pushkey = row[1:4]
+                    stop_pusher(user_id, app_id, pushkey)
+                elif row in pushers_rows:
+                    user_id = row[1]
+                    app_id = row[5]
+                    pushkey = row[8]
+                    yield start_pusher(user_id, app_id, pushkey)
+
+            stream = results.get("events")
+            if stream and stream["rows"]:
+                min_stream_id = stream["rows"][0][0]
+                max_stream_id = stream["position"]
+                preserve_fn(pusher_pool.on_new_notifications)(
+                    min_stream_id, max_stream_id
+                )
+
+            stream = results.get("receipts")
+            if stream and stream["rows"]:
+                rows = stream["rows"]
+                affected_room_ids = set(row[1] for row in rows)
+                min_stream_id = rows[0][0]
+                max_stream_id = stream["position"]
+                preserve_fn(pusher_pool.on_new_receipts)(
+                    min_stream_id, max_stream_id, affected_room_ids
+                )
+
+        while True:
+            try:
+                args = store.stream_positions()
+                args["timeout"] = 30000
+                result = yield http_client.get_json(replication_url, args=args)
+                yield store.process_replication(result)
+                poke_pushers(result)
+            except:
+                logger.exception("Error replicating from %r", replication_url)
+                yield sleep(30)
+
+
+def start(config_options):
+    try:
+        config = HomeServerConfig.load_config(
+            "Synapse pusher", config_options
+        )
+    except ConfigError as e:
+        sys.stderr.write("\n" + e.message + "\n")
+        sys.exit(1)
+
+    assert config.worker_app == "synapse.app.pusher"
+
+    setup_logging(config.worker_log_config, config.worker_log_file)
+
+    events.USE_FROZEN_DICTS = config.use_frozen_dicts
+
+    if config.start_pushers:
+        sys.stderr.write(
+            "\nThe pushers must be disabled in the main synapse process"
+            "\nbefore they can be run in a separate worker."
+            "\nPlease add ``start_pushers: false`` to the main config"
+            "\n"
+        )
+        sys.exit(1)
+
+    # Force the pushers to start since they will be disabled in the main config
+    config.start_pushers = True
+
+    database_engine = create_engine(config.database_config)
+
+    ps = PusherServer(
+        config.server_name,
+        db_config=config.database_config,
+        config=config,
+        version_string="Synapse/" + get_version_string(synapse),
+        database_engine=database_engine,
+    )
+
+    ps.setup()
+    ps.start_listening(config.worker_listeners)
+
+    def run():
+        with LoggingContext("run"):
+            logger.info("Running")
+            change_resource_limit(config.soft_file_limit)
+            if config.gc_thresholds:
+                gc.set_threshold(*config.gc_thresholds)
+            reactor.run()
+
+    def start():
+        ps.replicate()
+        ps.get_pusherpool().start()
+        ps.get_datastore().start_profiling()
+        ps.get_state_handler().start_caching()
+
+    reactor.callWhenRunning(start)
+
+    if config.worker_daemonize:
+        daemon = Daemonize(
+            app="synapse-pusher",
+            pid=config.worker_pid_file,
+            action=run,
+            auto_close_fds=False,
+            verbose=True,
+            logger=logger,
+        )
+        daemon.start()
+    else:
+        run()
+
+
+if __name__ == '__main__':
+    with LoggingContext("main"):
+        ps = start(sys.argv[1:])
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
new file mode 100644
index 0000000000..bf1b995dc2
--- /dev/null
+++ b/synapse/app/synchrotron.py
@@ -0,0 +1,496 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import synapse
+
+from synapse.api.constants import EventTypes, PresenceState
+from synapse.config._base import ConfigError
+from synapse.config.homeserver import HomeServerConfig
+from synapse.config.logger import setup_logging
+from synapse.events import FrozenEvent
+from synapse.handlers.presence import PresenceHandler
+from synapse.http.site import SynapseSite
+from synapse.http.server import JsonResource
+from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.rest.client.v2_alpha import sync
+from synapse.rest.client.v1 import events
+from synapse.rest.client.v1.room import RoomInitialSyncRestServlet
+from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet
+from synapse.replication.slave.storage._base import BaseSlavedStore
+from synapse.replication.slave.storage.events import SlavedEventStore
+from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
+from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
+from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
+from synapse.replication.slave.storage.registration import SlavedRegistrationStore
+from synapse.replication.slave.storage.filtering import SlavedFilteringStore
+from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
+from synapse.replication.slave.storage.presence import SlavedPresenceStore
+from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
+from synapse.replication.slave.storage.room import RoomStore
+from synapse.server import HomeServer
+from synapse.storage.client_ips import ClientIpStore
+from synapse.storage.engines import create_engine
+from synapse.storage.presence import PresenceStore, UserPresenceState
+from synapse.storage.roommember import RoomMemberStore
+from synapse.util.async import sleep
+from synapse.util.httpresourcetree import create_resource_tree
+from synapse.util.logcontext import LoggingContext, preserve_fn
+from synapse.util.manhole import manhole
+from synapse.util.rlimit import change_resource_limit
+from synapse.util.stringutils import random_string
+from synapse.util.versionstring import get_version_string
+
+from twisted.internet import reactor, defer
+from twisted.web.resource import Resource
+
+from daemonize import Daemonize
+
+import sys
+import logging
+import contextlib
+import gc
+import ujson as json
+
+logger = logging.getLogger("synapse.app.synchrotron")
+
+
+class SynchrotronSlavedStore(
+    SlavedPushRuleStore,
+    SlavedEventStore,
+    SlavedReceiptsStore,
+    SlavedAccountDataStore,
+    SlavedApplicationServiceStore,
+    SlavedRegistrationStore,
+    SlavedFilteringStore,
+    SlavedPresenceStore,
+    SlavedDeviceInboxStore,
+    RoomStore,
+    BaseSlavedStore,
+    ClientIpStore,  # After BaseSlavedStore because the constructor is different
+):
+    who_forgot_in_room = (
+        RoomMemberStore.__dict__["who_forgot_in_room"]
+    )
+
+    # XXX: This is a bit broken because we don't persist the accepted list in a
+    # way that can be replicated. This means that we don't have a way to
+    # invalidate the cache correctly.
+    get_presence_list_accepted = PresenceStore.__dict__[
+        "get_presence_list_accepted"
+    ]
+    get_presence_list_observers_accepted = PresenceStore.__dict__[
+        "get_presence_list_observers_accepted"
+    ]
+
+
+UPDATE_SYNCING_USERS_MS = 10 * 1000
+
+
+class SynchrotronPresence(object):
+    def __init__(self, hs):
+        self.is_mine_id = hs.is_mine_id
+        self.http_client = hs.get_simple_http_client()
+        self.store = hs.get_datastore()
+        self.user_to_num_current_syncs = {}
+        self.syncing_users_url = hs.config.worker_replication_url + "/syncing_users"
+        self.clock = hs.get_clock()
+        self.notifier = hs.get_notifier()
+
+        active_presence = self.store.take_presence_startup_info()
+        self.user_to_current_state = {
+            state.user_id: state
+            for state in active_presence
+        }
+
+        self.process_id = random_string(16)
+        logger.info("Presence process_id is %r", self.process_id)
+
+        self._sending_sync = False
+        self._need_to_send_sync = False
+        self.clock.looping_call(
+            self._send_syncing_users_regularly,
+            UPDATE_SYNCING_USERS_MS,
+        )
+
+        reactor.addSystemEventTrigger("before", "shutdown", self._on_shutdown)
+
+    def set_state(self, user, state, ignore_status_msg=False):
+        # TODO Hows this supposed to work?
+        pass
+
+    get_states = PresenceHandler.get_states.__func__
+    get_state = PresenceHandler.get_state.__func__
+    _get_interested_parties = PresenceHandler._get_interested_parties.__func__
+    current_state_for_users = PresenceHandler.current_state_for_users.__func__
+
+    @defer.inlineCallbacks
+    def user_syncing(self, user_id, affect_presence):
+        if affect_presence:
+            curr_sync = self.user_to_num_current_syncs.get(user_id, 0)
+            self.user_to_num_current_syncs[user_id] = curr_sync + 1
+            prev_states = yield self.current_state_for_users([user_id])
+            if prev_states[user_id].state == PresenceState.OFFLINE:
+                # TODO: Don't block the sync request on this HTTP hit.
+                yield self._send_syncing_users_now()
+
+        def _end():
+            # We check that the user_id is in user_to_num_current_syncs because
+            # user_to_num_current_syncs may have been cleared if we are
+            # shutting down.
+            if affect_presence and user_id in self.user_to_num_current_syncs:
+                self.user_to_num_current_syncs[user_id] -= 1
+
+        @contextlib.contextmanager
+        def _user_syncing():
+            try:
+                yield
+            finally:
+                _end()
+
+        defer.returnValue(_user_syncing())
+
+    @defer.inlineCallbacks
+    def _on_shutdown(self):
+        # When the synchrotron is shutdown tell the master to clear the in
+        # progress syncs for this process
+        self.user_to_num_current_syncs.clear()
+        yield self._send_syncing_users_now()
+
+    def _send_syncing_users_regularly(self):
+        # Only send an update if we aren't in the middle of sending one.
+        if not self._sending_sync:
+            preserve_fn(self._send_syncing_users_now)()
+
+    @defer.inlineCallbacks
+    def _send_syncing_users_now(self):
+        if self._sending_sync:
+            # We don't want to race with sending another update.
+            # Instead we wait for that update to finish and send another
+            # update afterwards.
+            self._need_to_send_sync = True
+            return
+
+        # Flag that we are sending an update.
+        self._sending_sync = True
+
+        yield self.http_client.post_json_get_json(self.syncing_users_url, {
+            "process_id": self.process_id,
+            "syncing_users": [
+                user_id for user_id, count in self.user_to_num_current_syncs.items()
+                if count > 0
+            ],
+        })
+
+        # Unset the flag as we are no longer sending an update.
+        self._sending_sync = False
+        if self._need_to_send_sync:
+            # If something happened while we were sending the update then
+            # we might need to send another update.
+            # TODO: Check if the update that was sent matches the current state
+            # as we only need to send an update if they are different.
+            self._need_to_send_sync = False
+            yield self._send_syncing_users_now()
+
+    @defer.inlineCallbacks
+    def notify_from_replication(self, states, stream_id):
+        parties = yield self._get_interested_parties(
+            states, calculate_remote_hosts=False
+        )
+        room_ids_to_states, users_to_states, _ = parties
+
+        self.notifier.on_new_event(
+            "presence_key", stream_id, rooms=room_ids_to_states.keys(),
+            users=users_to_states.keys()
+        )
+
+    @defer.inlineCallbacks
+    def process_replication(self, result):
+        stream = result.get("presence", {"rows": []})
+        states = []
+        for row in stream["rows"]:
+            (
+                position, user_id, state, last_active_ts,
+                last_federation_update_ts, last_user_sync_ts, status_msg,
+                currently_active
+            ) = row
+            state = UserPresenceState(
+                user_id, state, last_active_ts,
+                last_federation_update_ts, last_user_sync_ts, status_msg,
+                currently_active
+            )
+            self.user_to_current_state[user_id] = state
+            states.append(state)
+
+        if states and "position" in stream:
+            stream_id = int(stream["position"])
+            yield self.notify_from_replication(states, stream_id)
+
+
+class SynchrotronTyping(object):
+    def __init__(self, hs):
+        self._latest_room_serial = 0
+        self._room_serials = {}
+        self._room_typing = {}
+
+    def stream_positions(self):
+        # We must update this typing token from the response of the previous
+        # sync. In particular, the stream id may "reset" back to zero/a low
+        # value which we *must* use for the next replication request.
+        return {"typing": self._latest_room_serial}
+
+    def process_replication(self, result):
+        stream = result.get("typing")
+        if stream:
+            self._latest_room_serial = int(stream["position"])
+
+            for row in stream["rows"]:
+                position, room_id, typing_json = row
+                typing = json.loads(typing_json)
+                self._room_serials[room_id] = position
+                self._room_typing[room_id] = typing
+
+
+class SynchrotronApplicationService(object):
+    def notify_interested_services(self, event):
+        pass
+
+
+class SynchrotronServer(HomeServer):
+    def get_db_conn(self, run_new_connection=True):
+        # Any param beginning with cp_ is a parameter for adbapi, and should
+        # not be passed to the database engine.
+        db_params = {
+            k: v for k, v in self.db_config.get("args", {}).items()
+            if not k.startswith("cp_")
+        }
+        db_conn = self.database_engine.module.connect(**db_params)
+
+        if run_new_connection:
+            self.database_engine.on_new_connection(db_conn)
+        return db_conn
+
+    def setup(self):
+        logger.info("Setting up.")
+        self.datastore = SynchrotronSlavedStore(self.get_db_conn(), self)
+        logger.info("Finished setting up.")
+
+    def _listen_http(self, listener_config):
+        port = listener_config["port"]
+        bind_address = listener_config.get("bind_address", "")
+        site_tag = listener_config.get("tag", port)
+        resources = {}
+        for res in listener_config["resources"]:
+            for name in res["names"]:
+                if name == "metrics":
+                    resources[METRICS_PREFIX] = MetricsResource(self)
+                elif name == "client":
+                    resource = JsonResource(self, canonical_json=False)
+                    sync.register_servlets(self, resource)
+                    events.register_servlets(self, resource)
+                    InitialSyncRestServlet(self).register(resource)
+                    RoomInitialSyncRestServlet(self).register(resource)
+                    resources.update({
+                        "/_matrix/client/r0": resource,
+                        "/_matrix/client/unstable": resource,
+                        "/_matrix/client/v2_alpha": resource,
+                        "/_matrix/client/api/v1": resource,
+                    })
+
+        root_resource = create_resource_tree(resources, Resource())
+        reactor.listenTCP(
+            port,
+            SynapseSite(
+                "synapse.access.http.%s" % (site_tag,),
+                site_tag,
+                listener_config,
+                root_resource,
+            ),
+            interface=bind_address
+        )
+        logger.info("Synapse synchrotron now listening on port %d", port)
+
+    def start_listening(self, listeners):
+        for listener in listeners:
+            if listener["type"] == "http":
+                self._listen_http(listener)
+            elif listener["type"] == "manhole":
+                reactor.listenTCP(
+                    listener["port"],
+                    manhole(
+                        username="matrix",
+                        password="rabbithole",
+                        globals={"hs": self},
+                    ),
+                    interface=listener.get("bind_address", '127.0.0.1')
+                )
+            else:
+                logger.warn("Unrecognized listener type: %s", listener["type"])
+
+    @defer.inlineCallbacks
+    def replicate(self):
+        http_client = self.get_simple_http_client()
+        store = self.get_datastore()
+        replication_url = self.config.worker_replication_url
+        notifier = self.get_notifier()
+        presence_handler = self.get_presence_handler()
+        typing_handler = self.get_typing_handler()
+
+        def notify_from_stream(
+            result, stream_name, stream_key, room=None, user=None
+        ):
+            stream = result.get(stream_name)
+            if stream:
+                position_index = stream["field_names"].index("position")
+                if room:
+                    room_index = stream["field_names"].index(room)
+                if user:
+                    user_index = stream["field_names"].index(user)
+
+                users = ()
+                rooms = ()
+                for row in stream["rows"]:
+                    position = row[position_index]
+
+                    if user:
+                        users = (row[user_index],)
+
+                    if room:
+                        rooms = (row[room_index],)
+
+                    notifier.on_new_event(
+                        stream_key, position, users=users, rooms=rooms
+                    )
+
+        def notify(result):
+            stream = result.get("events")
+            if stream:
+                max_position = stream["position"]
+                for row in stream["rows"]:
+                    position = row[0]
+                    internal = json.loads(row[1])
+                    event_json = json.loads(row[2])
+                    event = FrozenEvent(event_json, internal_metadata_dict=internal)
+                    extra_users = ()
+                    if event.type == EventTypes.Member:
+                        extra_users = (event.state_key,)
+                    notifier.on_new_room_event(
+                        event, position, max_position, extra_users
+                    )
+
+            notify_from_stream(
+                result, "push_rules", "push_rules_key", user="user_id"
+            )
+            notify_from_stream(
+                result, "user_account_data", "account_data_key", user="user_id"
+            )
+            notify_from_stream(
+                result, "room_account_data", "account_data_key", user="user_id"
+            )
+            notify_from_stream(
+                result, "tag_account_data", "account_data_key", user="user_id"
+            )
+            notify_from_stream(
+                result, "receipts", "receipt_key", room="room_id"
+            )
+            notify_from_stream(
+                result, "typing", "typing_key", room="room_id"
+            )
+            notify_from_stream(
+                result, "to_device", "to_device_key", user="user_id"
+            )
+
+        while True:
+            try:
+                args = store.stream_positions()
+                args.update(typing_handler.stream_positions())
+                args["timeout"] = 30000
+                result = yield http_client.get_json(replication_url, args=args)
+                yield store.process_replication(result)
+                typing_handler.process_replication(result)
+                yield presence_handler.process_replication(result)
+                notify(result)
+            except:
+                logger.exception("Error replicating from %r", replication_url)
+                yield sleep(5)
+
+    def build_presence_handler(self):
+        return SynchrotronPresence(self)
+
+    def build_typing_handler(self):
+        return SynchrotronTyping(self)
+
+
+def start(config_options):
+    try:
+        config = HomeServerConfig.load_config(
+            "Synapse synchrotron", config_options
+        )
+    except ConfigError as e:
+        sys.stderr.write("\n" + e.message + "\n")
+        sys.exit(1)
+
+    assert config.worker_app == "synapse.app.synchrotron"
+
+    setup_logging(config.worker_log_config, config.worker_log_file)
+
+    synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts
+
+    database_engine = create_engine(config.database_config)
+
+    ss = SynchrotronServer(
+        config.server_name,
+        db_config=config.database_config,
+        config=config,
+        version_string="Synapse/" + get_version_string(synapse),
+        database_engine=database_engine,
+        application_service_handler=SynchrotronApplicationService(),
+    )
+
+    ss.setup()
+    ss.start_listening(config.worker_listeners)
+
+    def run():
+        with LoggingContext("run"):
+            logger.info("Running")
+            change_resource_limit(config.soft_file_limit)
+            if config.gc_thresholds:
+                gc.set_threshold(*config.gc_thresholds)
+            reactor.run()
+
+    def start():
+        ss.get_datastore().start_profiling()
+        ss.replicate()
+        ss.get_state_handler().start_caching()
+
+    reactor.callWhenRunning(start)
+
+    if config.worker_daemonize:
+        daemon = Daemonize(
+            app="synapse-synchrotron",
+            pid=config.worker_pid_file,
+            action=run,
+            auto_close_fds=False,
+            verbose=True,
+            logger=logger,
+        )
+        daemon.start()
+    else:
+        run()
+
+
+if __name__ == '__main__':
+    with LoggingContext("main"):
+        start(sys.argv[1:])
diff --git a/synapse/app/synctl.py b/synapse/app/synctl.py
index ab3a31d7b7..c045588866 100755
--- a/synapse/app/synctl.py
+++ b/synapse/app/synctl.py
@@ -14,70 +14,198 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import sys
+import argparse
+import collections
+import glob
 import os
 import os.path
-import subprocess
 import signal
+import subprocess
+import sys
 import yaml
 
-SYNAPSE = ["python", "-B", "-m", "synapse.app.homeserver"]
+SYNAPSE = [sys.executable, "-B", "-m", "synapse.app.homeserver"]
 
 GREEN = "\x1b[1;32m"
 RED = "\x1b[1;31m"
 NORMAL = "\x1b[m"
 
 
+def write(message, colour=NORMAL, stream=sys.stdout):
+    if colour == NORMAL:
+        stream.write(message + "\n")
+    else:
+        stream.write(colour + message + NORMAL + "\n")
+
+
 def start(configfile):
-    print ("Starting ...")
+    write("Starting ...")
     args = SYNAPSE
     args.extend(["--daemonize", "-c", configfile])
 
     try:
         subprocess.check_call(args)
-        print (GREEN + "started" + NORMAL)
+        write("started synapse.app.homeserver(%r)" % (configfile,), colour=GREEN)
+    except subprocess.CalledProcessError as e:
+        write(
+            "error starting (exit code: %d); see above for logs" % e.returncode,
+            colour=RED,
+        )
+
+
+def start_worker(app, configfile, worker_configfile):
+    args = [
+        "python", "-B",
+        "-m", app,
+        "-c", configfile,
+        "-c", worker_configfile
+    ]
+
+    try:
+        subprocess.check_call(args)
+        write("started %s(%r)" % (app, worker_configfile), colour=GREEN)
     except subprocess.CalledProcessError as e:
-        print (
-            RED +
-            "error starting (exit code: %d); see above for logs" % e.returncode +
-            NORMAL
+        write(
+            "error starting %s(%r) (exit code: %d); see above for logs" % (
+                app, worker_configfile, e.returncode,
+            ),
+            colour=RED,
         )
 
 
-def stop(pidfile):
+def stop(pidfile, app):
     if os.path.exists(pidfile):
         pid = int(open(pidfile).read())
         os.kill(pid, signal.SIGTERM)
-        print (GREEN + "stopped" + NORMAL)
+        write("stopped %s" % (app,), colour=GREEN)
+
+
+Worker = collections.namedtuple("Worker", [
+    "app", "configfile", "pidfile", "cache_factor"
+])
 
 
 def main():
-    configfile = sys.argv[2] if len(sys.argv) == 3 else "homeserver.yaml"
+
+    parser = argparse.ArgumentParser()
+
+    parser.add_argument(
+        "action",
+        choices=["start", "stop", "restart"],
+        help="whether to start, stop or restart the synapse",
+    )
+    parser.add_argument(
+        "configfile",
+        nargs="?",
+        default="homeserver.yaml",
+        help="the homeserver config file, defaults to homserver.yaml",
+    )
+    parser.add_argument(
+        "-w", "--worker",
+        metavar="WORKERCONFIG",
+        help="start or stop a single worker",
+    )
+    parser.add_argument(
+        "-a", "--all-processes",
+        metavar="WORKERCONFIGDIR",
+        help="start or stop all the workers in the given directory"
+             " and the main synapse process",
+    )
+
+    options = parser.parse_args()
+
+    if options.worker and options.all_processes:
+        write(
+            'Cannot use "--worker" with "--all-processes"',
+            stream=sys.stderr
+        )
+        sys.exit(1)
+
+    configfile = options.configfile
 
     if not os.path.exists(configfile):
-        sys.stderr.write(
+        write(
             "No config file found\n"
             "To generate a config file, run '%s -c %s --generate-config"
             " --server-name=<server name>'\n" % (
-                " ".join(SYNAPSE), configfile
-            )
+                " ".join(SYNAPSE), options.configfile
+            ),
+            stream=sys.stderr,
         )
         sys.exit(1)
 
-    config = yaml.load(open(configfile))
+    with open(configfile) as stream:
+        config = yaml.load(stream)
+
     pidfile = config["pid_file"]
+    cache_factor = config.get("synctl_cache_factor")
+    start_stop_synapse = True
 
-    action = sys.argv[1] if sys.argv[1:] else "usage"
-    if action == "start":
-        start(configfile)
-    elif action == "stop":
-        stop(pidfile)
-    elif action == "restart":
-        stop(pidfile)
-        start(configfile)
-    else:
-        sys.stderr.write("Usage: %s [start|stop|restart] [configfile]\n" % (sys.argv[0],))
-        sys.exit(1)
+    if cache_factor:
+        os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor)
+
+    worker_configfiles = []
+    if options.worker:
+        start_stop_synapse = False
+        worker_configfile = options.worker
+        if not os.path.exists(worker_configfile):
+            write(
+                "No worker config found at %r" % (worker_configfile,),
+                stream=sys.stderr,
+            )
+            sys.exit(1)
+        worker_configfiles.append(worker_configfile)
+
+    if options.all_processes:
+        worker_configdir = options.all_processes
+        if not os.path.isdir(worker_configdir):
+            write(
+                "No worker config directory found at %r" % (worker_configdir,),
+                stream=sys.stderr,
+            )
+            sys.exit(1)
+        worker_configfiles.extend(sorted(glob.glob(
+            os.path.join(worker_configdir, "*.yaml")
+        )))
+
+    workers = []
+    for worker_configfile in worker_configfiles:
+        with open(worker_configfile) as stream:
+            worker_config = yaml.load(stream)
+        worker_app = worker_config["worker_app"]
+        worker_pidfile = worker_config["worker_pid_file"]
+        worker_daemonize = worker_config["worker_daemonize"]
+        assert worker_daemonize  # TODO print something more user friendly
+        worker_cache_factor = worker_config.get("synctl_cache_factor")
+        workers.append(Worker(
+            worker_app, worker_configfile, worker_pidfile, worker_cache_factor,
+        ))
+
+    action = options.action
+
+    if action == "stop" or action == "restart":
+        for worker in workers:
+            stop(worker.pidfile, worker.app)
+
+        if start_stop_synapse:
+            stop(pidfile, "synapse.app.homeserver")
+
+        # TODO: Wait for synapse to actually shutdown before starting it again
+
+    if action == "start" or action == "restart":
+        if start_stop_synapse:
+            start(configfile)
+
+        for worker in workers:
+            if worker.cache_factor:
+                os.environ["SYNAPSE_CACHE_FACTOR"] = str(worker.cache_factor)
+
+            start_worker(worker.app, configfile, worker.configfile)
+
+            if cache_factor:
+                os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor)
+            else:
+                os.environ.pop("SYNAPSE_CACHE_FACTOR", None)
 
 
 if __name__ == "__main__":