summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/app/_base.py99
-rw-r--r--synapse/app/appservice.py50
-rw-r--r--synapse/app/client_reader.py53
-rw-r--r--synapse/app/federation_reader.py53
-rw-r--r--synapse/app/federation_sender.py57
-rw-r--r--synapse/app/frontend_proxy.py64
-rwxr-xr-xsynapse/app/homeserver.py114
-rw-r--r--synapse/app/media_repository.py53
-rw-r--r--synapse/app/pusher.py57
-rw-r--r--synapse/app/synchrotron.py69
-rw-r--r--synapse/app/user_dir.py53
-rw-r--r--synapse/config/server.py12
-rw-r--r--synapse/config/workers.py1
-rw-r--r--synapse/python_dependencies.py1
14 files changed, 271 insertions, 465 deletions
diff --git a/synapse/app/_base.py b/synapse/app/_base.py
new file mode 100644
index 0000000000..cd0e815919
--- /dev/null
+++ b/synapse/app/_base.py
@@ -0,0 +1,99 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import gc
+import logging
+
+import affinity
+from daemonize import Daemonize
+from synapse.util import PreserveLoggingContext
+from synapse.util.rlimit import change_resource_limit
+from twisted.internet import reactor
+
+
+def start_worker_reactor(appname, config):
+    """ Run the reactor in the main process
+
+    Daemonizes if necessary, and then configures some resources, before starting
+    the reactor. Pulls configuration from the 'worker' settings in 'config'.
+
+    Args:
+        appname (str): application name which will be sent to syslog
+        config (synapse.config.Config): config object
+    """
+
+    logger = logging.getLogger(config.worker_app)
+
+    start_reactor(
+        appname,
+        config.soft_file_limit,
+        config.gc_thresholds,
+        config.worker_pid_file,
+        config.worker_daemonize,
+        config.worker_cpu_affinity,
+        logger,
+    )
+
+
+def start_reactor(
+        appname,
+        soft_file_limit,
+        gc_thresholds,
+        pid_file,
+        daemonize,
+        cpu_affinity,
+        logger,
+):
+    """ Run the reactor in the main process
+
+    Daemonizes if necessary, and then configures some resources, before starting
+    the reactor
+
+    Args:
+        appname (str): application name which will be sent to syslog
+        soft_file_limit (int):
+        gc_thresholds:
+        pid_file (str): name of pid file to write to if daemonize is True
+        daemonize (bool): true to run the reactor in a background process
+        cpu_affinity (int|None): cpu affinity mask
+        logger (logging.Logger): logger instance to pass to Daemonize
+    """
+
+    def run():
+        # make sure that we run the reactor with the sentinel log context,
+        # otherwise other PreserveLoggingContext instances will get confused
+        # and complain when they see the logcontext arbitrarily swapping
+        # between the sentinel and `run` logcontexts.
+        with PreserveLoggingContext():
+            logger.info("Running")
+            if cpu_affinity is not None:
+                logger.info("Setting CPU affinity to %s" % cpu_affinity)
+                affinity.set_process_affinity_mask(0, cpu_affinity)
+            change_resource_limit(soft_file_limit)
+            if gc_thresholds:
+                gc.set_threshold(*gc_thresholds)
+            reactor.run()
+
+    if daemonize:
+        daemon = Daemonize(
+            app=appname,
+            pid=pid_file,
+            action=run,
+            auto_close_fds=False,
+            verbose=True,
+            logger=logger,
+        )
+        daemon.start()
+    else:
+        run()
diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py
index 9a476efa63..ba2657bbad 100644
--- a/synapse/app/appservice.py
+++ b/synapse/app/appservice.py
@@ -13,38 +13,31 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import logging
+import sys
 
 import synapse
-
-from synapse.server import HomeServer
+from synapse import events
+from synapse.app import _base
 from synapse.config._base import ConfigError
-from synapse.config.logger import setup_logging
 from synapse.config.homeserver import HomeServerConfig
+from synapse.config.logger import setup_logging
 from synapse.http.site import SynapseSite
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
+from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
 from synapse.replication.slave.storage.directory import DirectoryStore
 from synapse.replication.slave.storage.events import SlavedEventStore
-from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
 from synapse.replication.tcp.client import ReplicationClientHandler
+from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn
+from synapse.util.logcontext import LoggingContext, preserve_fn
 from synapse.util.manhole import manhole
-from synapse.util.rlimit import change_resource_limit
 from synapse.util.versionstring import get_version_string
-
-from synapse import events
-
 from twisted.internet import reactor
 from twisted.web.resource import Resource
 
-from daemonize import Daemonize
-
-import sys
-import logging
-import gc
-
 logger = logging.getLogger("synapse.app.appservice")
 
 
@@ -181,36 +174,13 @@ def start(config_options):
     ps.setup()
     ps.start_listening(config.worker_listeners)
 
-    def run():
-        # make sure that we run the reactor with the sentinel log context,
-        # otherwise other PreserveLoggingContext instances will get confused
-        # and complain when they see the logcontext arbitrarily swapping
-        # between the sentinel and `run` logcontexts.
-        with PreserveLoggingContext():
-            logger.info("Running")
-            change_resource_limit(config.soft_file_limit)
-            if config.gc_thresholds:
-                gc.set_threshold(*config.gc_thresholds)
-            reactor.run()
-
     def start():
         ps.get_datastore().start_profiling()
         ps.get_state_handler().start_caching()
 
     reactor.callWhenRunning(start)
 
-    if config.worker_daemonize:
-        daemon = Daemonize(
-            app="synapse-appservice",
-            pid=config.worker_pid_file,
-            action=run,
-            auto_close_fds=False,
-            verbose=True,
-            logger=logger,
-        )
-        daemon.start()
-    else:
-        run()
+    _base.start_worker_reactor("synapse-appservice", config)
 
 
 if __name__ == '__main__':
diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py
index 09bc1935f1..129cfa901f 100644
--- a/synapse/app/client_reader.py
+++ b/synapse/app/client_reader.py
@@ -13,47 +13,39 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import logging
+import sys
 
 import synapse
-
+from synapse import events
+from synapse.app import _base
 from synapse.config._base import ConfigError
 from synapse.config.homeserver import HomeServerConfig
 from synapse.config.logger import setup_logging
-from synapse.http.site import SynapseSite
+from synapse.crypto import context_factory
 from synapse.http.server import JsonResource
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.http.site import SynapseSite
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
 from synapse.replication.slave.storage._base import BaseSlavedStore
 from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
 from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
+from synapse.replication.slave.storage.directory import DirectoryStore
 from synapse.replication.slave.storage.events import SlavedEventStore
 from synapse.replication.slave.storage.keys import SlavedKeyStore
-from synapse.replication.slave.storage.room import RoomStore
-from synapse.replication.slave.storage.directory import DirectoryStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
+from synapse.replication.slave.storage.room import RoomStore
 from synapse.replication.slave.storage.transactions import TransactionStore
 from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.rest.client.v1.room import PublicRoomListRestServlet
 from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
+from synapse.util.logcontext import LoggingContext
 from synapse.util.manhole import manhole
-from synapse.util.rlimit import change_resource_limit
 from synapse.util.versionstring import get_version_string
-from synapse.crypto import context_factory
-
-from synapse import events
-
-
 from twisted.internet import reactor
 from twisted.web.resource import Resource
 
-from daemonize import Daemonize
-
-import sys
-import logging
-import gc
-
 logger = logging.getLogger("synapse.app.client_reader")
 
 
@@ -183,36 +175,13 @@ def start(config_options):
     ss.get_handlers()
     ss.start_listening(config.worker_listeners)
 
-    def run():
-        # make sure that we run the reactor with the sentinel log context,
-        # otherwise other PreserveLoggingContext instances will get confused
-        # and complain when they see the logcontext arbitrarily swapping
-        # between the sentinel and `run` logcontexts.
-        with PreserveLoggingContext():
-            logger.info("Running")
-            change_resource_limit(config.soft_file_limit)
-            if config.gc_thresholds:
-                gc.set_threshold(*config.gc_thresholds)
-            reactor.run()
-
     def start():
         ss.get_state_handler().start_caching()
         ss.get_datastore().start_profiling()
 
     reactor.callWhenRunning(start)
 
-    if config.worker_daemonize:
-        daemon = Daemonize(
-            app="synapse-client-reader",
-            pid=config.worker_pid_file,
-            action=run,
-            auto_close_fds=False,
-            verbose=True,
-            logger=logger,
-        )
-        daemon.start()
-    else:
-        run()
+    _base.start_worker_reactor("synapse-client-reader", config)
 
 
 if __name__ == '__main__':
diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py
index eb392e1c9d..40cebe6f4a 100644
--- a/synapse/app/federation_reader.py
+++ b/synapse/app/federation_reader.py
@@ -13,44 +13,36 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import logging
+import sys
 
 import synapse
-
+from synapse import events
+from synapse.api.urls import FEDERATION_PREFIX
+from synapse.app import _base
 from synapse.config._base import ConfigError
 from synapse.config.homeserver import HomeServerConfig
 from synapse.config.logger import setup_logging
+from synapse.crypto import context_factory
+from synapse.federation.transport.server import TransportLayerServer
 from synapse.http.site import SynapseSite
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
 from synapse.replication.slave.storage._base import BaseSlavedStore
+from synapse.replication.slave.storage.directory import DirectoryStore
 from synapse.replication.slave.storage.events import SlavedEventStore
 from synapse.replication.slave.storage.keys import SlavedKeyStore
 from synapse.replication.slave.storage.room import RoomStore
 from synapse.replication.slave.storage.transactions import TransactionStore
-from synapse.replication.slave.storage.directory import DirectoryStore
 from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
+from synapse.util.logcontext import LoggingContext
 from synapse.util.manhole import manhole
-from synapse.util.rlimit import change_resource_limit
 from synapse.util.versionstring import get_version_string
-from synapse.api.urls import FEDERATION_PREFIX
-from synapse.federation.transport.server import TransportLayerServer
-from synapse.crypto import context_factory
-
-from synapse import events
-
-
 from twisted.internet import reactor
 from twisted.web.resource import Resource
 
-from daemonize import Daemonize
-
-import sys
-import logging
-import gc
-
 logger = logging.getLogger("synapse.app.federation_reader")
 
 
@@ -172,36 +164,13 @@ def start(config_options):
     ss.get_handlers()
     ss.start_listening(config.worker_listeners)
 
-    def run():
-        # make sure that we run the reactor with the sentinel log context,
-        # otherwise other PreserveLoggingContext instances will get confused
-        # and complain when they see the logcontext arbitrarily swapping
-        # between the sentinel and `run` logcontexts.
-        with PreserveLoggingContext():
-            logger.info("Running")
-            change_resource_limit(config.soft_file_limit)
-            if config.gc_thresholds:
-                gc.set_threshold(*config.gc_thresholds)
-            reactor.run()
-
     def start():
         ss.get_state_handler().start_caching()
         ss.get_datastore().start_profiling()
 
     reactor.callWhenRunning(start)
 
-    if config.worker_daemonize:
-        daemon = Daemonize(
-            app="synapse-federation-reader",
-            pid=config.worker_pid_file,
-            action=run,
-            auto_close_fds=False,
-            verbose=True,
-            logger=logger,
-        )
-        daemon.start()
-    else:
-        run()
+    _base.start_worker_reactor("synapse-federation-reader", config)
 
 
 if __name__ == '__main__':
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 03327dc47a..389e3909d1 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -13,44 +13,37 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import logging
+import sys
 
 import synapse
-
-from synapse.server import HomeServer
+from synapse import events
+from synapse.app import _base
 from synapse.config._base import ConfigError
-from synapse.config.logger import setup_logging
 from synapse.config.homeserver import HomeServerConfig
+from synapse.config.logger import setup_logging
 from synapse.crypto import context_factory
-from synapse.http.site import SynapseSite
 from synapse.federation import send_queue
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.http.site import SynapseSite
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
 from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
+from synapse.replication.slave.storage.devices import SlavedDeviceStore
 from synapse.replication.slave.storage.events import SlavedEventStore
+from synapse.replication.slave.storage.presence import SlavedPresenceStore
 from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
-from synapse.replication.slave.storage.presence import SlavedPresenceStore
 from synapse.replication.slave.storage.transactions import TransactionStore
-from synapse.replication.slave.storage.devices import SlavedDeviceStore
 from synapse.replication.tcp.client import ReplicationClientHandler
+from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
 from synapse.util.async import Linearizer
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn
+from synapse.util.logcontext import LoggingContext, preserve_fn
 from synapse.util.manhole import manhole
-from synapse.util.rlimit import change_resource_limit
 from synapse.util.versionstring import get_version_string
-
-from synapse import events
-
-from twisted.internet import reactor, defer
+from twisted.internet import defer, reactor
 from twisted.web.resource import Resource
 
-from daemonize import Daemonize
-
-import sys
-import logging
-import gc
-
 logger = logging.getLogger("synapse.app.federation_sender")
 
 
@@ -213,36 +206,12 @@ def start(config_options):
     ps.setup()
     ps.start_listening(config.worker_listeners)
 
-    def run():
-        # make sure that we run the reactor with the sentinel log context,
-        # otherwise other PreserveLoggingContext instances will get confused
-        # and complain when they see the logcontext arbitrarily swapping
-        # between the sentinel and `run` logcontexts.
-        with PreserveLoggingContext():
-            logger.info("Running")
-            change_resource_limit(config.soft_file_limit)
-            if config.gc_thresholds:
-                gc.set_threshold(*config.gc_thresholds)
-            reactor.run()
-
     def start():
         ps.get_datastore().start_profiling()
         ps.get_state_handler().start_caching()
 
     reactor.callWhenRunning(start)
-
-    if config.worker_daemonize:
-        daemon = Daemonize(
-            app="synapse-federation-sender",
-            pid=config.worker_pid_file,
-            action=run,
-            auto_close_fds=False,
-            verbose=True,
-            logger=logger,
-        )
-        daemon.start()
-    else:
-        run()
+    _base.start_worker_reactor("synapse-federation-sender", config)
 
 
 class FederationSenderHandler(object):
diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py
index 132f18a979..bee4c47498 100644
--- a/synapse/app/frontend_proxy.py
+++ b/synapse/app/frontend_proxy.py
@@ -13,48 +13,39 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import logging
+import sys
 
 import synapse
-
+from synapse import events
+from synapse.api.errors import SynapseError
+from synapse.app import _base
 from synapse.config._base import ConfigError
 from synapse.config.homeserver import HomeServerConfig
 from synapse.config.logger import setup_logging
-from synapse.http.site import SynapseSite
+from synapse.crypto import context_factory
 from synapse.http.server import JsonResource
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.http.servlet import (
+    RestServlet, parse_json_object_from_request,
+)
+from synapse.http.site import SynapseSite
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
 from synapse.replication.slave.storage._base import BaseSlavedStore
+from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
 from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
 from synapse.replication.slave.storage.devices import SlavedDeviceStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
-from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
 from synapse.replication.tcp.client import ReplicationClientHandler
+from synapse.rest.client.v2_alpha._base import client_v2_patterns
 from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
+from synapse.util.logcontext import LoggingContext
 from synapse.util.manhole import manhole
-from synapse.util.rlimit import change_resource_limit
 from synapse.util.versionstring import get_version_string
-from synapse.crypto import context_factory
-from synapse.api.errors import SynapseError
-from synapse.http.servlet import (
-    RestServlet, parse_json_object_from_request,
-)
-from synapse.rest.client.v2_alpha._base import client_v2_patterns
-
-from synapse import events
-
-
-from twisted.internet import reactor, defer
+from twisted.internet import defer, reactor
 from twisted.web.resource import Resource
 
-from daemonize import Daemonize
-
-import sys
-import logging
-import gc
-
-
 logger = logging.getLogger("synapse.app.frontend_proxy")
 
 
@@ -234,36 +225,13 @@ def start(config_options):
     ss.get_handlers()
     ss.start_listening(config.worker_listeners)
 
-    def run():
-        # make sure that we run the reactor with the sentinel log context,
-        # otherwise other PreserveLoggingContext instances will get confused
-        # and complain when they see the logcontext arbitrarily swapping
-        # between the sentinel and `run` logcontexts.
-        with PreserveLoggingContext():
-            logger.info("Running")
-            change_resource_limit(config.soft_file_limit)
-            if config.gc_thresholds:
-                gc.set_threshold(*config.gc_thresholds)
-            reactor.run()
-
     def start():
         ss.get_state_handler().start_caching()
         ss.get_datastore().start_profiling()
 
     reactor.callWhenRunning(start)
 
-    if config.worker_daemonize:
-        daemon = Daemonize(
-            app="synapse-frontend-proxy",
-            pid=config.worker_pid_file,
-            action=run,
-            auto_close_fds=False,
-            verbose=True,
-            logger=logger,
-        )
-        daemon.start()
-    else:
-        run()
+    _base.start_worker_reactor("synapse-frontend-proxy", config)
 
 
 if __name__ == '__main__':
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 081e7cce59..84ad8f04a0 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -13,61 +13,48 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-import synapse
-
 import gc
 import logging
 import os
 import sys
 
+import synapse
 import synapse.config.logger
+from synapse import events
+from synapse.api.urls import CONTENT_REPO_PREFIX, FEDERATION_PREFIX, \
+    LEGACY_MEDIA_PREFIX, MEDIA_PREFIX, SERVER_KEY_PREFIX, SERVER_KEY_V2_PREFIX, \
+    STATIC_PREFIX, WEB_CLIENT_PREFIX
+from synapse.app import _base
 from synapse.config._base import ConfigError
-
-from synapse.python_dependencies import (
-    check_requirements, CONDITIONAL_REQUIREMENTS
-)
-
-from synapse.rest import ClientRestResource
-from synapse.storage.engines import create_engine, IncorrectDatabaseSetup
-from synapse.storage import are_all_users_on_domain
-from synapse.storage.prepare_database import UpgradeDatabaseException, prepare_database
-
-from synapse.server import HomeServer
-
-from twisted.internet import reactor, defer
-from twisted.application import service
-from twisted.web.resource import Resource, EncodingResourceWrapper
-from twisted.web.static import File
-from twisted.web.server import GzipEncoderFactory
-from synapse.http.server import RootRedirect
-from synapse.rest.media.v0.content_repository import ContentRepoResource
-from synapse.rest.media.v1.media_repository import MediaRepositoryResource
-from synapse.rest.key.v1.server_key_resource import LocalKey
-from synapse.rest.key.v2 import KeyApiV2Resource
-from synapse.api.urls import (
-    FEDERATION_PREFIX, WEB_CLIENT_PREFIX, CONTENT_REPO_PREFIX,
-    SERVER_KEY_PREFIX, LEGACY_MEDIA_PREFIX, MEDIA_PREFIX, STATIC_PREFIX,
-    SERVER_KEY_V2_PREFIX,
-)
 from synapse.config.homeserver import HomeServerConfig
 from synapse.crypto import context_factory
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
+from synapse.federation.transport.server import TransportLayerServer
+from synapse.http.server import RootRedirect
+from synapse.http.site import SynapseSite
 from synapse.metrics import register_memory_metrics
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
+from synapse.python_dependencies import CONDITIONAL_REQUIREMENTS, \
+    check_requirements
 from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
-from synapse.federation.transport.server import TransportLayerServer
-
-from synapse.util.rlimit import change_resource_limit
-from synapse.util.versionstring import get_version_string
+from synapse.rest import ClientRestResource
+from synapse.rest.key.v1.server_key_resource import LocalKey
+from synapse.rest.key.v2 import KeyApiV2Resource
+from synapse.rest.media.v0.content_repository import ContentRepoResource
+from synapse.rest.media.v1.media_repository import MediaRepositoryResource
+from synapse.server import HomeServer
+from synapse.storage import are_all_users_on_domain
+from synapse.storage.engines import IncorrectDatabaseSetup, create_engine
+from synapse.storage.prepare_database import UpgradeDatabaseException, prepare_database
 from synapse.util.httpresourcetree import create_resource_tree
+from synapse.util.logcontext import LoggingContext
 from synapse.util.manhole import manhole
-
-from synapse.http.site import SynapseSite
-
-from synapse import events
-
-from daemonize import Daemonize
+from synapse.util.rlimit import change_resource_limit
+from synapse.util.versionstring import get_version_string
+from twisted.application import service
+from twisted.internet import defer, reactor
+from twisted.web.resource import EncodingResourceWrapper, Resource
+from twisted.web.server import GzipEncoderFactory
+from twisted.web.static import File
 
 logger = logging.getLogger("synapse.app.homeserver")
 
@@ -446,37 +433,18 @@ def run(hs):
         # be quite busy the first few minutes
         clock.call_later(5 * 60, phone_stats_home)
 
-    def in_thread():
-        # Uncomment to enable tracing of log context changes.
-        # sys.settrace(logcontext_tracer)
-
-        # make sure that we run the reactor with the sentinel log context,
-        # otherwise other PreserveLoggingContext instances will get confused
-        # and complain when they see the logcontext arbitrarily swapping
-        # between the sentinel and `run` logcontexts.
-        with PreserveLoggingContext():
-            change_resource_limit(hs.config.soft_file_limit)
-            if hs.config.gc_thresholds:
-                gc.set_threshold(*hs.config.gc_thresholds)
-            reactor.run()
-
-    if hs.config.daemonize:
-
-        if hs.config.print_pidfile:
-            print (hs.config.pid_file)
-
-        daemon = Daemonize(
-            app="synapse-homeserver",
-            pid=hs.config.pid_file,
-            action=lambda: in_thread(),
-            auto_close_fds=False,
-            verbose=True,
-            logger=logger,
-        )
-
-        daemon.start()
-    else:
-        in_thread()
+    if hs.config.daemonize and hs.config.print_pidfile:
+        print (hs.config.pid_file)
+
+    _base.start_reactor(
+        "synapse-homeserver",
+        hs.config.soft_file_limit,
+        hs.config.gc_thresholds,
+        hs.config.pid_file,
+        hs.config.daemonize,
+        hs.config.cpu_affinity,
+        logger,
+    )
 
 
 def main():
diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py
index f57ec784fe..36c18bdbcb 100644
--- a/synapse/app/media_repository.py
+++ b/synapse/app/media_repository.py
@@ -13,14 +13,21 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import logging
+import sys
 
 import synapse
-
+from synapse import events
+from synapse.api.urls import (
+    CONTENT_REPO_PREFIX, LEGACY_MEDIA_PREFIX, MEDIA_PREFIX
+)
+from synapse.app import _base
 from synapse.config._base import ConfigError
 from synapse.config.homeserver import HomeServerConfig
 from synapse.config.logger import setup_logging
+from synapse.crypto import context_factory
 from synapse.http.site import SynapseSite
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
 from synapse.replication.slave.storage._base import BaseSlavedStore
 from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
 from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
@@ -33,27 +40,12 @@ from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
 from synapse.storage.media_repository import MediaRepositoryStore
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
+from synapse.util.logcontext import LoggingContext
 from synapse.util.manhole import manhole
-from synapse.util.rlimit import change_resource_limit
 from synapse.util.versionstring import get_version_string
-from synapse.api.urls import (
-    CONTENT_REPO_PREFIX, LEGACY_MEDIA_PREFIX, MEDIA_PREFIX
-)
-from synapse.crypto import context_factory
-
-from synapse import events
-
-
 from twisted.internet import reactor
 from twisted.web.resource import Resource
 
-from daemonize import Daemonize
-
-import sys
-import logging
-import gc
-
 logger = logging.getLogger("synapse.app.media_repository")
 
 
@@ -180,36 +172,13 @@ def start(config_options):
     ss.get_handlers()
     ss.start_listening(config.worker_listeners)
 
-    def run():
-        # make sure that we run the reactor with the sentinel log context,
-        # otherwise other PreserveLoggingContext instances will get confused
-        # and complain when they see the logcontext arbitrarily swapping
-        # between the sentinel and `run` logcontexts.
-        with PreserveLoggingContext():
-            logger.info("Running")
-            change_resource_limit(config.soft_file_limit)
-            if config.gc_thresholds:
-                gc.set_threshold(*config.gc_thresholds)
-            reactor.run()
-
     def start():
         ss.get_state_handler().start_caching()
         ss.get_datastore().start_profiling()
 
     reactor.callWhenRunning(start)
 
-    if config.worker_daemonize:
-        daemon = Daemonize(
-            app="synapse-media-repository",
-            pid=config.worker_pid_file,
-            action=run,
-            auto_close_fds=False,
-            verbose=True,
-            logger=logger,
-        )
-        daemon.start()
-    else:
-        run()
+    _base.start_worker_reactor("synapse-media-repository", config)
 
 
 if __name__ == '__main__':
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index f9114acfcb..db9a4d16f4 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -13,41 +13,33 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import logging
+import sys
 
 import synapse
-
-from synapse.server import HomeServer
+from synapse import events
+from synapse.app import _base
 from synapse.config._base import ConfigError
-from synapse.config.logger import setup_logging
 from synapse.config.homeserver import HomeServerConfig
+from synapse.config.logger import setup_logging
 from synapse.http.site import SynapseSite
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
-from synapse.storage.roommember import RoomMemberStore
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
+from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
 from synapse.replication.slave.storage.events import SlavedEventStore
 from synapse.replication.slave.storage.pushers import SlavedPusherStore
 from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
-from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
 from synapse.replication.tcp.client import ReplicationClientHandler
-from synapse.storage.engines import create_engine
+from synapse.server import HomeServer
 from synapse.storage import DataStore
+from synapse.storage.engines import create_engine
+from synapse.storage.roommember import RoomMemberStore
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, preserve_fn, \
-    PreserveLoggingContext
+from synapse.util.logcontext import LoggingContext, preserve_fn
 from synapse.util.manhole import manhole
-from synapse.util.rlimit import change_resource_limit
 from synapse.util.versionstring import get_version_string
-
-from synapse import events
-
-from twisted.internet import reactor, defer
+from twisted.internet import defer, reactor
 from twisted.web.resource import Resource
 
-from daemonize import Daemonize
-
-import sys
-import logging
-import gc
-
 logger = logging.getLogger("synapse.app.pusher")
 
 
@@ -244,18 +236,6 @@ def start(config_options):
     ps.setup()
     ps.start_listening(config.worker_listeners)
 
-    def run():
-        # make sure that we run the reactor with the sentinel log context,
-        # otherwise other PreserveLoggingContext instances will get confused
-        # and complain when they see the logcontext arbitrarily swapping
-        # between the sentinel and `run` logcontexts.
-        with PreserveLoggingContext():
-            logger.info("Running")
-            change_resource_limit(config.soft_file_limit)
-            if config.gc_thresholds:
-                gc.set_threshold(*config.gc_thresholds)
-            reactor.run()
-
     def start():
         ps.get_pusherpool().start()
         ps.get_datastore().start_profiling()
@@ -263,18 +243,7 @@ def start(config_options):
 
     reactor.callWhenRunning(start)
 
-    if config.worker_daemonize:
-        daemon = Daemonize(
-            app="synapse-pusher",
-            pid=config.worker_pid_file,
-            action=run,
-            auto_close_fds=False,
-            verbose=True,
-            logger=logger,
-        )
-        daemon.start()
-    else:
-        run()
+    _base.start_worker_reactor("synapse-pusher", config)
 
 
 if __name__ == '__main__':
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 4bdd99a966..80e4ba5336 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -13,56 +13,50 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import contextlib
+import logging
+import sys
 
 import synapse
-
 from synapse.api.constants import EventTypes
+from synapse.app import _base
 from synapse.config._base import ConfigError
 from synapse.config.homeserver import HomeServerConfig
 from synapse.config.logger import setup_logging
 from synapse.handlers.presence import PresenceHandler, get_interested_parties
-from synapse.http.site import SynapseSite
 from synapse.http.server import JsonResource
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
-from synapse.rest.client.v2_alpha import sync
-from synapse.rest.client.v1 import events
-from synapse.rest.client.v1.room import RoomInitialSyncRestServlet
-from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet
+from synapse.http.site import SynapseSite
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
 from synapse.replication.slave.storage._base import BaseSlavedStore
-from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
-from synapse.replication.slave.storage.events import SlavedEventStore
-from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
 from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
 from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
-from synapse.replication.slave.storage.registration import SlavedRegistrationStore
-from synapse.replication.slave.storage.filtering import SlavedFilteringStore
-from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
-from synapse.replication.slave.storage.presence import SlavedPresenceStore
+from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
 from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
 from synapse.replication.slave.storage.devices import SlavedDeviceStore
+from synapse.replication.slave.storage.events import SlavedEventStore
+from synapse.replication.slave.storage.filtering import SlavedFilteringStore
+from synapse.replication.slave.storage.presence import SlavedPresenceStore
+from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
+from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
+from synapse.replication.slave.storage.registration import SlavedRegistrationStore
 from synapse.replication.slave.storage.room import RoomStore
 from synapse.replication.tcp.client import ReplicationClientHandler
+from synapse.rest.client.v1 import events
+from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet
+from synapse.rest.client.v1.room import RoomInitialSyncRestServlet
+from synapse.rest.client.v2_alpha import sync
 from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
 from synapse.storage.presence import UserPresenceState
 from synapse.storage.roommember import RoomMemberStore
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn
+from synapse.util.logcontext import LoggingContext, preserve_fn
 from synapse.util.manhole import manhole
-from synapse.util.rlimit import change_resource_limit
 from synapse.util.stringutils import random_string
 from synapse.util.versionstring import get_version_string
-
-from twisted.internet import reactor, defer
+from twisted.internet import defer, reactor
 from twisted.web.resource import Resource
 
-from daemonize import Daemonize
-
-import sys
-import logging
-import contextlib
-import gc
-
 logger = logging.getLogger("synapse.app.synchrotron")
 
 
@@ -440,36 +434,13 @@ def start(config_options):
     ss.setup()
     ss.start_listening(config.worker_listeners)
 
-    def run():
-        # make sure that we run the reactor with the sentinel log context,
-        # otherwise other PreserveLoggingContext instances will get confused
-        # and complain when they see the logcontext arbitrarily swapping
-        # between the sentinel and `run` logcontexts.
-        with PreserveLoggingContext():
-            logger.info("Running")
-            change_resource_limit(config.soft_file_limit)
-            if config.gc_thresholds:
-                gc.set_threshold(*config.gc_thresholds)
-            reactor.run()
-
     def start():
         ss.get_datastore().start_profiling()
         ss.get_state_handler().start_caching()
 
     reactor.callWhenRunning(start)
 
-    if config.worker_daemonize:
-        daemon = Daemonize(
-            app="synapse-synchrotron",
-            pid=config.worker_pid_file,
-            action=run,
-            auto_close_fds=False,
-            verbose=True,
-            logger=logger,
-        )
-        daemon.start()
-    else:
-        run()
+    _base.start_worker_reactor("synapse-synchrotron", config)
 
 
 if __name__ == '__main__':
diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py
index 8c6300db9d..cd743887ce 100644
--- a/synapse/app/user_dir.py
+++ b/synapse/app/user_dir.py
@@ -14,16 +14,19 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import synapse
+import logging
+import sys
 
-from synapse.server import HomeServer
+import synapse
+from synapse import events
+from synapse.app import _base
 from synapse.config._base import ConfigError
-from synapse.config.logger import setup_logging
 from synapse.config.homeserver import HomeServerConfig
+from synapse.config.logger import setup_logging
 from synapse.crypto import context_factory
-from synapse.http.site import SynapseSite
 from synapse.http.server import JsonResource
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.http.site import SynapseSite
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
 from synapse.replication.slave.storage._base import BaseSlavedStore
 from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
 from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
@@ -31,26 +34,17 @@ from synapse.replication.slave.storage.events import SlavedEventStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
 from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.rest.client.v2_alpha import user_directory
+from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
 from synapse.storage.user_directory import UserDirectoryStore
+from synapse.util.caches.stream_change_cache import StreamChangeCache
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn
+from synapse.util.logcontext import LoggingContext, preserve_fn
 from synapse.util.manhole import manhole
-from synapse.util.rlimit import change_resource_limit
 from synapse.util.versionstring import get_version_string
-from synapse.util.caches.stream_change_cache import StreamChangeCache
-
-from synapse import events
-
 from twisted.internet import reactor
 from twisted.web.resource import Resource
 
-from daemonize import Daemonize
-
-import sys
-import logging
-import gc
-
 logger = logging.getLogger("synapse.app.user_dir")
 
 
@@ -233,36 +227,13 @@ def start(config_options):
     ps.setup()
     ps.start_listening(config.worker_listeners)
 
-    def run():
-        # make sure that we run the reactor with the sentinel log context,
-        # otherwise other PreserveLoggingContext instances will get confused
-        # and complain when they see the logcontext arbitrarily swapping
-        # between the sentinel and `run` logcontexts.
-        with PreserveLoggingContext():
-            logger.info("Running")
-            change_resource_limit(config.soft_file_limit)
-            if config.gc_thresholds:
-                gc.set_threshold(*config.gc_thresholds)
-            reactor.run()
-
     def start():
         ps.get_datastore().start_profiling()
         ps.get_state_handler().start_caching()
 
     reactor.callWhenRunning(start)
 
-    if config.worker_daemonize:
-        daemon = Daemonize(
-            app="synapse-user-dir",
-            pid=config.worker_pid_file,
-            action=run,
-            auto_close_fds=False,
-            verbose=True,
-            logger=logger,
-        )
-        daemon.start()
-    else:
-        run()
+    _base.start_worker_reactor("synapse-user-dir")
 
 
 if __name__ == '__main__':
diff --git a/synapse/config/server.py b/synapse/config/server.py
index 28b4e5f50c..4e4bf6b432 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -29,6 +29,7 @@ class ServerConfig(Config):
         self.user_agent_suffix = config.get("user_agent_suffix")
         self.use_frozen_dicts = config.get("use_frozen_dicts", False)
         self.public_baseurl = config.get("public_baseurl")
+        self.cpu_affinity = config.get("cpu_affinity")
 
         # Whether to send federation traffic out in this process. This only
         # applies to some federation traffic, and so shouldn't be used to
@@ -147,6 +148,17 @@ class ServerConfig(Config):
         # When running as a daemon, the file to store the pid in
         pid_file: %(pid_file)s
 
+        # CPU affinity mask. Setting this restricts the CPUs on which the process
+        # will be scheduled. It is represented as a bitmask, with the lowest order
+        # bit corresponding to the first logical CPU and the highest order bit
+        # corresponding to the last logical CPU. Not all CPUs may exist on a
+        # given system but a mask may specify more CPUs than are present.
+        # For example:
+        #    0x00000001  is processor #0,
+        #    0x00000003  is processors #0 and #1,
+        #    0xFFFFFFFF  is all processors (#0 through #31).
+        # cpu_affinity: 0xFFFFFFFF
+
         # Whether to serve a web client from the HTTP/HTTPS root resource.
         web_client: True
 
diff --git a/synapse/config/workers.py b/synapse/config/workers.py
index 99d5d8aaeb..c5a5a8919c 100644
--- a/synapse/config/workers.py
+++ b/synapse/config/workers.py
@@ -33,6 +33,7 @@ class WorkerConfig(Config):
         self.worker_name = config.get("worker_name", self.worker_app)
 
         self.worker_main_http_uri = config.get("worker_main_http_uri", None)
+        self.worker_cpu_affinity = config.get("worker_cpu_affinity")
 
         if self.worker_listeners:
             for listener in self.worker_listeners:
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index ed7f1c89ad..1d902dc38d 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -40,6 +40,7 @@ REQUIREMENTS = {
     "pymacaroons-pynacl": ["pymacaroons"],
     "msgpack-python>=0.3.0": ["msgpack"],
     "phonenumbers>=8.2.0": ["phonenumbers"],
+    "affinity": ["affinity"],
 }
 CONDITIONAL_REQUIREMENTS = {
     "web_client": {