summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/app/pusher.py171
-rw-r--r--synapse/app/synchrotron.py137
-rw-r--r--synapse/config/homeserver.py4
-rw-r--r--synapse/config/logger.py102
-rw-r--r--synapse/config/server.py31
-rw-r--r--synapse/config/workers.py31
6 files changed, 181 insertions, 295 deletions
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index 4ec23d84c1..9ac26d52c6 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -18,10 +18,8 @@ import synapse
 
 from synapse.server import HomeServer
 from synapse.config._base import ConfigError
-from synapse.config.database import DatabaseConfig
-from synapse.config.logger import LoggingConfig
-from synapse.config.emailconfig import EmailConfig
-from synapse.config.key import KeyConfig
+from synapse.config.logger import setup_logging
+from synapse.config.homeserver import HomeServerConfig
 from synapse.http.site import SynapseSite
 from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
 from synapse.storage.roommember import RoomMemberStore
@@ -43,98 +41,12 @@ from twisted.web.resource import Resource
 
 from daemonize import Daemonize
 
-import gc
 import sys
 import logging
 
 logger = logging.getLogger("synapse.app.pusher")
 
 
-class SlaveConfig(DatabaseConfig):
-    def read_config(self, config):
-        self.replication_url = config["replication_url"]
-        self.server_name = config["server_name"]
-        self.use_insecure_ssl_client_just_for_testing_do_not_use = config.get(
-            "use_insecure_ssl_client_just_for_testing_do_not_use", False
-        )
-        self.user_agent_suffix = None
-        self.start_pushers = True
-        self.listeners = config["listeners"]
-        self.soft_file_limit = config.get("soft_file_limit")
-        self.daemonize = config.get("daemonize")
-        self.pid_file = self.abspath(config.get("pid_file"))
-        self.public_baseurl = config["public_baseurl"]
-
-        thresholds = config.get("gc_thresholds", None)
-        if thresholds is not None:
-            try:
-                assert len(thresholds) == 3
-                self.gc_thresholds = (
-                    int(thresholds[0]), int(thresholds[1]), int(thresholds[2]),
-                )
-            except:
-                raise ConfigError(
-                    "Value of `gc_threshold` must be a list of three integers if set"
-                )
-        else:
-            self.gc_thresholds = None
-
-        # some things used by the auth handler but not actually used in the
-        # pusher codebase
-        self.bcrypt_rounds = None
-        self.ldap_enabled = None
-        self.ldap_server = None
-        self.ldap_port = None
-        self.ldap_tls = None
-        self.ldap_search_base = None
-        self.ldap_search_property = None
-        self.ldap_email_property = None
-        self.ldap_full_name_property = None
-
-        # We would otherwise try to use the registration shared secret as the
-        # macaroon shared secret if there was no macaroon_shared_secret, but
-        # that means pulling in RegistrationConfig too. We don't need to be
-        # backwards compaitible in the pusher codebase so just make people set
-        # macaroon_shared_secret. We set this to None to prevent it referencing
-        # an undefined key.
-        self.registration_shared_secret = None
-
-    def default_config(self, server_name, **kwargs):
-        pid_file = self.abspath("pusher.pid")
-        return """\
-        # Slave configuration
-
-        # The replication listener on the synapse to talk to.
-        #replication_url: https://localhost:{replication_port}/_synapse/replication
-
-        server_name: "%(server_name)s"
-
-        listeners: []
-        # Enable a ssh manhole listener on the pusher.
-        # - type: manhole
-        #   port: {manhole_port}
-        #   bind_address: 127.0.0.1
-        # Enable a metric listener on the pusher.
-        # - type: http
-        #   port: {metrics_port}
-        #   bind_address: 127.0.0.1
-        #   resources:
-        #    - names: ["metrics"]
-        #      compress: False
-
-        report_stats: False
-
-        daemonize: False
-
-        pid_file: %(pid_file)s
-
-        """ % locals()
-
-
-class PusherSlaveConfig(SlaveConfig, LoggingConfig, EmailConfig, KeyConfig):
-    pass
-
-
 class PusherSlaveStore(
     SlavedEventStore, SlavedPusherStore, SlavedReceiptsStore,
     SlavedAccountDataStore
@@ -199,7 +111,7 @@ class PusherServer(HomeServer):
 
     def remove_pusher(self, app_id, push_key, user_id):
         http_client = self.get_simple_http_client()
-        replication_url = self.config.replication_url
+        replication_url = self.config.worker_replication_url
         url = replication_url + "/remove_pushers"
         return http_client.post_json_get_json(url, {
             "remove": [{
@@ -232,8 +144,8 @@ class PusherServer(HomeServer):
         )
         logger.info("Synapse pusher now listening on port %d", port)
 
-    def start_listening(self):
-        for listener in self.config.listeners:
+    def start_listening(self, listeners):
+        for listener in listeners:
             if listener["type"] == "http":
                 self._listen_http(listener)
             elif listener["type"] == "manhole":
@@ -253,7 +165,7 @@ class PusherServer(HomeServer):
     def replicate(self):
         http_client = self.get_simple_http_client()
         store = self.get_datastore()
-        replication_url = self.config.replication_url
+        replication_url = self.config.worker_replication_url
         pusher_pool = self.get_pusherpool()
         clock = self.get_clock()
 
@@ -329,19 +241,30 @@ class PusherServer(HomeServer):
                 yield sleep(30)
 
 
-def setup(config_options):
+def start(config_options):
     try:
-        config = PusherSlaveConfig.load_config(
+        config = HomeServerConfig.load_config(
             "Synapse pusher", config_options
         )
     except ConfigError as e:
         sys.stderr.write("\n" + e.message + "\n")
         sys.exit(1)
 
-    if not config:
-        sys.exit(0)
+    assert config.worker_app == "synapse.app.pusher"
+
+    setup_logging(config.worker_log_config, config.worker_log_file)
+
+    if config.start_pushers:
+        sys.stderr.write(
+            "\nThe pushers must be disabled in the main synapse process"
+            "\nbefore they can be run in a separate worker."
+            "\nPlease add ``start_pushers: false`` to the main config"
+            "\n"
+        )
+        sys.exit(1)
 
-    config.setup_logging()
+    # Force the pushers to start since they will be disabled in the main config
+    config.start_pushers = True
 
     database_engine = create_engine(config.database_config)
 
@@ -354,11 +277,15 @@ def setup(config_options):
     )
 
     ps.setup()
-    ps.start_listening()
-
-    change_resource_limit(ps.config.soft_file_limit)
-    if ps.config.gc_thresholds:
-        gc.set_threshold(*ps.config.gc_thresholds)
+    ps.start_listening(config.worker_listeners)
+
+    def run():
+        with LoggingContext("run"):
+            logger.info("Running")
+            change_resource_limit(config.soft_file_limit)
+            if config.gc_thresholds:
+                ps.set_threshold(config.gc_thresholds)
+            reactor.run()
 
     def start():
         ps.replicate()
@@ -367,30 +294,20 @@ def setup(config_options):
 
     reactor.callWhenRunning(start)
 
-    return ps
+    if config.worker_daemonize:
+        daemon = Daemonize(
+            app="synapse-pusher",
+            pid=config.worker_pid_file,
+            action=run,
+            auto_close_fds=False,
+            verbose=True,
+            logger=logger,
+        )
+        daemon.start()
+    else:
+        run()
 
 
 if __name__ == '__main__':
     with LoggingContext("main"):
-        ps = setup(sys.argv[1:])
-
-        if ps.config.daemonize:
-            def run():
-                with LoggingContext("run"):
-                    change_resource_limit(ps.config.soft_file_limit)
-                    if ps.config.gc_thresholds:
-                        gc.set_threshold(*ps.config.gc_thresholds)
-                    reactor.run()
-
-            daemon = Daemonize(
-                app="synapse-pusher",
-                pid=ps.config.pid_file,
-                action=run,
-                auto_close_fds=False,
-                verbose=True,
-                logger=logger,
-            )
-
-            daemon.start()
-        else:
-            reactor.run()
+        ps = start(sys.argv[1:])
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 297e199453..160db8637e 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -18,9 +18,8 @@ import synapse
 
 from synapse.api.constants import EventTypes, PresenceState
 from synapse.config._base import ConfigError
-from synapse.config.database import DatabaseConfig
-from synapse.config.logger import LoggingConfig
-from synapse.config.appservice import AppServiceConfig
+from synapse.config.homeserver import HomeServerConfig
+from synapse.config.logger import setup_logging
 from synapse.events import FrozenEvent
 from synapse.handlers.presence import PresenceHandler
 from synapse.http.site import SynapseSite
@@ -57,76 +56,11 @@ from daemonize import Daemonize
 import sys
 import logging
 import contextlib
-import gc
 import ujson as json
 
 logger = logging.getLogger("synapse.app.synchrotron")
 
 
-class SynchrotronConfig(DatabaseConfig, LoggingConfig, AppServiceConfig):
-    def read_config(self, config):
-        self.replication_url = config["replication_url"]
-        self.server_name = config["server_name"]
-        self.use_insecure_ssl_client_just_for_testing_do_not_use = config.get(
-            "use_insecure_ssl_client_just_for_testing_do_not_use", False
-        )
-        self.user_agent_suffix = None
-        self.listeners = config["listeners"]
-        self.soft_file_limit = config.get("soft_file_limit")
-        self.daemonize = config.get("daemonize")
-        self.pid_file = self.abspath(config.get("pid_file"))
-        self.macaroon_secret_key = config["macaroon_secret_key"]
-        self.expire_access_token = config.get("expire_access_token", False)
-
-        thresholds = config.get("gc_thresholds", None)
-        if thresholds is not None:
-            try:
-                assert len(thresholds) == 3
-                self.gc_thresholds = (
-                    int(thresholds[0]), int(thresholds[1]), int(thresholds[2]),
-                )
-            except:
-                raise ConfigError(
-                    "Value of `gc_threshold` must be a list of three integers if set"
-                )
-        else:
-            self.gc_thresholds = None
-
-    def default_config(self, server_name, **kwargs):
-        pid_file = self.abspath("synchroton.pid")
-        return """\
-        # Slave configuration
-
-        # The replication listener on the synapse to talk to.
-        #replication_url: https://localhost:{replication_port}/_synapse/replication
-
-        server_name: "%(server_name)s"
-
-        listeners:
-        # Enable a /sync listener on the synchrontron
-        #- type: http
-        #    port: {http_port}
-        #    bind_address: ""
-        # Enable a ssh manhole listener on the synchrotron
-        # - type: manhole
-        #   port: {manhole_port}
-        #   bind_address: 127.0.0.1
-        # Enable a metric listener on the synchrotron
-        # - type: http
-        #   port: {metrics_port}
-        #   bind_address: 127.0.0.1
-        #   resources:
-        #    - names: ["metrics"]
-        #      compress: False
-
-        report_stats: False
-
-        daemonize: False
-
-        pid_file: %(pid_file)s
-        """ % locals()
-
-
 class SynchrotronSlavedStore(
     SlavedPushRuleStore,
     SlavedEventStore,
@@ -163,7 +97,7 @@ class SynchrotronPresence(object):
         self.http_client = hs.get_simple_http_client()
         self.store = hs.get_datastore()
         self.user_to_num_current_syncs = {}
-        self.syncing_users_url = hs.config.replication_url + "/syncing_users"
+        self.syncing_users_url = hs.config.worker_replication_url + "/syncing_users"
         self.clock = hs.get_clock()
 
         active_presence = self.store.take_presence_startup_info()
@@ -350,8 +284,8 @@ class SynchrotronServer(HomeServer):
         )
         logger.info("Synapse synchrotron now listening on port %d", port)
 
-    def start_listening(self):
-        for listener in self.config.listeners:
+    def start_listening(self, listeners):
+        for listener in listeners:
             if listener["type"] == "http":
                 self._listen_http(listener)
             elif listener["type"] == "manhole":
@@ -371,7 +305,7 @@ class SynchrotronServer(HomeServer):
     def replicate(self):
         http_client = self.get_simple_http_client()
         store = self.get_datastore()
-        replication_url = self.config.replication_url
+        replication_url = self.config.worker_replication_url
         clock = self.get_clock()
         notifier = self.get_notifier()
         presence_handler = self.get_presence_handler()
@@ -470,19 +404,18 @@ class SynchrotronServer(HomeServer):
         return SynchrotronTyping(self)
 
 
-def setup(config_options):
+def start(config_options):
     try:
-        config = SynchrotronConfig.load_config(
+        config = HomeServerConfig.load_config(
             "Synapse synchrotron", config_options
         )
     except ConfigError as e:
         sys.stderr.write("\n" + e.message + "\n")
         sys.exit(1)
 
-    if not config:
-        sys.exit(0)
+    assert config.worker_app == "synapse.app.synchrotron"
 
-    config.setup_logging()
+    setup_logging(config.worker_log_config, config.worker_log_file)
 
     database_engine = create_engine(config.database_config)
 
@@ -496,11 +429,15 @@ def setup(config_options):
     )
 
     ss.setup()
-    ss.start_listening()
-
-    change_resource_limit(ss.config.soft_file_limit)
-    if ss.config.gc_thresholds:
-        ss.set_threshold(*ss.config.gc_thresholds)
+    ss.start_listening(config.worker_listeners)
+
+    def run():
+        with LoggingContext("run"):
+            logger.info("Running")
+            change_resource_limit(config.soft_file_limit)
+            if config.gc_thresholds:
+                ss.set_threshold(config.gc_thresholds)
+            reactor.run()
 
     def start():
         ss.get_datastore().start_profiling()
@@ -508,30 +445,20 @@ def setup(config_options):
 
     reactor.callWhenRunning(start)
 
-    return ss
+    if config.worker_daemonize:
+        daemon = Daemonize(
+            app="synapse-synchrotron",
+            pid=config.worker_pid_file,
+            action=run,
+            auto_close_fds=False,
+            verbose=True,
+            logger=logger,
+        )
+        daemon.start()
+    else:
+        run()
 
 
 if __name__ == '__main__':
     with LoggingContext("main"):
-        ss = setup(sys.argv[1:])
-
-        if ss.config.daemonize:
-            def run():
-                with LoggingContext("run"):
-                    change_resource_limit(ss.config.soft_file_limit)
-                    if ss.config.gc_thresholds:
-                        gc.set_threshold(*ss.config.gc_thresholds)
-                    reactor.run()
-
-            daemon = Daemonize(
-                app="synapse-synchrotron",
-                pid=ss.config.pid_file,
-                action=run,
-                auto_close_fds=False,
-                verbose=True,
-                logger=logger,
-            )
-
-            daemon.start()
-        else:
-            reactor.run()
+        start(sys.argv[1:])
diff --git a/synapse/config/homeserver.py b/synapse/config/homeserver.py
index fc2445484c..79b0534b3b 100644
--- a/synapse/config/homeserver.py
+++ b/synapse/config/homeserver.py
@@ -32,13 +32,15 @@ from .password import PasswordConfig
 from .jwt import JWTConfig
 from .ldap import LDAPConfig
 from .emailconfig import EmailConfig
+from .workers import WorkerConfig
 
 
 class HomeServerConfig(TlsConfig, ServerConfig, DatabaseConfig, LoggingConfig,
                        RatelimitConfig, ContentRepositoryConfig, CaptchaConfig,
                        VoipConfig, RegistrationConfig, MetricsConfig, ApiConfig,
                        AppServiceConfig, KeyConfig, SAML2Config, CasConfig,
-                       JWTConfig, LDAPConfig, PasswordConfig, EmailConfig,):
+                       JWTConfig, LDAPConfig, PasswordConfig, EmailConfig,
+                       WorkerConfig,):
     pass
 
 
diff --git a/synapse/config/logger.py b/synapse/config/logger.py
index 5047db898f..dc68683fbc 100644
--- a/synapse/config/logger.py
+++ b/synapse/config/logger.py
@@ -126,54 +126,58 @@ class LoggingConfig(Config):
                 )
 
     def setup_logging(self):
-        log_format = (
-            "%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s"
-            " - %(message)s"
-        )
-        if self.log_config is None:
-
-            level = logging.INFO
-            level_for_storage = logging.INFO
-            if self.verbosity:
-                level = logging.DEBUG
-                if self.verbosity > 1:
-                    level_for_storage = logging.DEBUG
-
-            # FIXME: we need a logging.WARN for a -q quiet option
-            logger = logging.getLogger('')
-            logger.setLevel(level)
-
-            logging.getLogger('synapse.storage').setLevel(level_for_storage)
-
-            formatter = logging.Formatter(log_format)
-            if self.log_file:
-                # TODO: Customisable file size / backup count
-                handler = logging.handlers.RotatingFileHandler(
-                    self.log_file, maxBytes=(1000 * 1000 * 100), backupCount=3
-                )
-
-                def sighup(signum, stack):
-                    logger.info("Closing log file due to SIGHUP")
-                    handler.doRollover()
-                    logger.info("Opened new log file due to SIGHUP")
-
-                # TODO(paul): obviously this is a terrible mechanism for
-                #   stealing SIGHUP, because it means no other part of synapse
-                #   can use it instead. If we want to catch SIGHUP anywhere
-                #   else as well, I'd suggest we find a nicer way to broadcast
-                #   it around.
-                if getattr(signal, "SIGHUP"):
-                    signal.signal(signal.SIGHUP, sighup)
-            else:
-                handler = logging.StreamHandler()
-            handler.setFormatter(formatter)
-
-            handler.addFilter(LoggingContextFilter(request=""))
-
-            logger.addHandler(handler)
+        setup_logging(self.log_config, self.log_file, self.verbosity)
+
+
+def setup_logging(log_config=None, log_file=None, verbosity=None):
+    log_format = (
+        "%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s"
+        " - %(message)s"
+    )
+    if log_config is None:
+
+        level = logging.INFO
+        level_for_storage = logging.INFO
+        if verbosity:
+            level = logging.DEBUG
+            if verbosity > 1:
+                level_for_storage = logging.DEBUG
+
+        # FIXME: we need a logging.WARN for a -q quiet option
+        logger = logging.getLogger('')
+        logger.setLevel(level)
+
+        logging.getLogger('synapse.storage').setLevel(level_for_storage)
+
+        formatter = logging.Formatter(log_format)
+        if log_file:
+            # TODO: Customisable file size / backup count
+            handler = logging.handlers.RotatingFileHandler(
+                log_file, maxBytes=(1000 * 1000 * 100), backupCount=3
+            )
+
+            def sighup(signum, stack):
+                logger.info("Closing log file due to SIGHUP")
+                handler.doRollover()
+                logger.info("Opened new log file due to SIGHUP")
+
+            # TODO(paul): obviously this is a terrible mechanism for
+            #   stealing SIGHUP, because it means no other part of synapse
+            #   can use it instead. If we want to catch SIGHUP anywhere
+            #   else as well, I'd suggest we find a nicer way to broadcast
+            #   it around.
+            if getattr(signal, "SIGHUP"):
+                signal.signal(signal.SIGHUP, sighup)
         else:
-            with open(self.log_config, 'r') as f:
-                logging.config.dictConfig(yaml.load(f))
+            handler = logging.StreamHandler()
+        handler.setFormatter(formatter)
+
+        handler.addFilter(LoggingContextFilter(request=""))
+
+        logger.addHandler(handler)
+    else:
+        with open(log_config, 'r') as f:
+            logging.config.dictConfig(yaml.load(f))
 
-        observer = PythonLoggingObserver()
-        observer.start()
+    observer = PythonLoggingObserver()
+    observer.start()
diff --git a/synapse/config/server.py b/synapse/config/server.py
index 44b8d422e0..f370b22c32 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -38,19 +38,7 @@ class ServerConfig(Config):
 
         self.listeners = config.get("listeners", [])
 
-        thresholds = config.get("gc_thresholds", None)
-        if thresholds is not None:
-            try:
-                assert len(thresholds) == 3
-                self.gc_thresholds = (
-                    int(thresholds[0]), int(thresholds[1]), int(thresholds[2]),
-                )
-            except:
-                raise ConfigError(
-                    "Value of `gc_threshold` must be a list of three integers if set"
-                )
-        else:
-            self.gc_thresholds = None
+        self.gc_thresholds = read_gc_thresholds(config.get("gc_thresholds", None))
 
         bind_port = config.get("bind_port")
         if bind_port:
@@ -264,3 +252,20 @@ class ServerConfig(Config):
                                   type=int,
                                   help="Turn on the twisted telnet manhole"
                                   " service on the given port.")
+
+
+def read_gc_thresholds(thresholds):
+    """Reads the three integer thresholds for garbage collection. Ensures that
+    the thresholds are integers if thresholds are supplied.
+    """
+    if thresholds is None:
+        return None
+    try:
+        assert len(thresholds) == 3
+        return (
+            int(thresholds[0]), int(thresholds[1]), int(thresholds[2]),
+        )
+    except:
+        raise ConfigError(
+            "Value of `gc_threshold` must be a list of three integers if set"
+        )
diff --git a/synapse/config/workers.py b/synapse/config/workers.py
new file mode 100644
index 0000000000..904789d155
--- /dev/null
+++ b/synapse/config/workers.py
@@ -0,0 +1,31 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 matrix.org
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import Config
+
+
+class WorkerConfig(Config):
+    """The workers are processes run separately to the main synapse process.
+    They have their own pid_file and listener configuration. They use the
+    replication_url to talk to the main synapse process."""
+
+    def read_config(self, config):
+        self.worker_app = config.get("worker_app")
+        self.worker_listeners = config.get("worker_listeners")
+        self.worker_daemonize = config.get("worker_daemonize")
+        self.worker_pid_file = config.get("worker_pid_file")
+        self.worker_log_file = config.get("worker_log_file")
+        self.worker_log_config = config.get("worker_log_config")
+        self.worker_replication_url = config.get("worker_replication_url")