summary refs log tree commit diff
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/app/pusher.py171
-rw-r--r--synapse/app/synchrotron.py137
-rw-r--r--synapse/config/homeserver.py4
-rw-r--r--synapse/config/logger.py102
-rw-r--r--synapse/config/server.py31
5 files changed, 154 insertions, 291 deletions
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index 4ec23d84c1..6c8c02fb38 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -18,10 +18,9 @@ import synapse
 
 from synapse.server import HomeServer
 from synapse.config._base import ConfigError
-from synapse.config.database import DatabaseConfig
-from synapse.config.logger import LoggingConfig
-from synapse.config.emailconfig import EmailConfig
-from synapse.config.key import KeyConfig
+from synapse.config.workers import clobber_with_worker_config
+from synapse.config.logger import setup_logging
+from synapse.config.homeserver import HomeServerConfig
 from synapse.http.site import SynapseSite
 from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
 from synapse.storage.roommember import RoomMemberStore
@@ -43,98 +42,12 @@ from twisted.web.resource import Resource
 
 from daemonize import Daemonize
 
-import gc
 import sys
 import logging
 
 logger = logging.getLogger("synapse.app.pusher")
 
 
-class SlaveConfig(DatabaseConfig):
-    def read_config(self, config):
-        self.replication_url = config["replication_url"]
-        self.server_name = config["server_name"]
-        self.use_insecure_ssl_client_just_for_testing_do_not_use = config.get(
-            "use_insecure_ssl_client_just_for_testing_do_not_use", False
-        )
-        self.user_agent_suffix = None
-        self.start_pushers = True
-        self.listeners = config["listeners"]
-        self.soft_file_limit = config.get("soft_file_limit")
-        self.daemonize = config.get("daemonize")
-        self.pid_file = self.abspath(config.get("pid_file"))
-        self.public_baseurl = config["public_baseurl"]
-
-        thresholds = config.get("gc_thresholds", None)
-        if thresholds is not None:
-            try:
-                assert len(thresholds) == 3
-                self.gc_thresholds = (
-                    int(thresholds[0]), int(thresholds[1]), int(thresholds[2]),
-                )
-            except:
-                raise ConfigError(
-                    "Value of `gc_threshold` must be a list of three integers if set"
-                )
-        else:
-            self.gc_thresholds = None
-
-        # some things used by the auth handler but not actually used in the
-        # pusher codebase
-        self.bcrypt_rounds = None
-        self.ldap_enabled = None
-        self.ldap_server = None
-        self.ldap_port = None
-        self.ldap_tls = None
-        self.ldap_search_base = None
-        self.ldap_search_property = None
-        self.ldap_email_property = None
-        self.ldap_full_name_property = None
-
-        # We would otherwise try to use the registration shared secret as the
-        # macaroon shared secret if there was no macaroon_shared_secret, but
-        # that means pulling in RegistrationConfig too. We don't need to be
-        # backwards compaitible in the pusher codebase so just make people set
-        # macaroon_shared_secret. We set this to None to prevent it referencing
-        # an undefined key.
-        self.registration_shared_secret = None
-
-    def default_config(self, server_name, **kwargs):
-        pid_file = self.abspath("pusher.pid")
-        return """\
-        # Slave configuration
-
-        # The replication listener on the synapse to talk to.
-        #replication_url: https://localhost:{replication_port}/_synapse/replication
-
-        server_name: "%(server_name)s"
-
-        listeners: []
-        # Enable a ssh manhole listener on the pusher.
-        # - type: manhole
-        #   port: {manhole_port}
-        #   bind_address: 127.0.0.1
-        # Enable a metric listener on the pusher.
-        # - type: http
-        #   port: {metrics_port}
-        #   bind_address: 127.0.0.1
-        #   resources:
-        #    - names: ["metrics"]
-        #      compress: False
-
-        report_stats: False
-
-        daemonize: False
-
-        pid_file: %(pid_file)s
-
-        """ % locals()
-
-
-class PusherSlaveConfig(SlaveConfig, LoggingConfig, EmailConfig, KeyConfig):
-    pass
-
-
 class PusherSlaveStore(
     SlavedEventStore, SlavedPusherStore, SlavedReceiptsStore,
     SlavedAccountDataStore
@@ -232,8 +145,8 @@ class PusherServer(HomeServer):
         )
         logger.info("Synapse pusher now listening on port %d", port)
 
-    def start_listening(self):
-        for listener in self.config.listeners:
+    def start_listening(self, listeners):
+        for listener in listeners:
             if listener["type"] == "http":
                 self._listen_http(listener)
             elif listener["type"] == "manhole":
@@ -329,19 +242,32 @@ class PusherServer(HomeServer):
                 yield sleep(30)
 
 
-def setup(config_options):
+def setup(worker_name, config_options):
     try:
-        config = PusherSlaveConfig.load_config(
+        config = HomeServerConfig.load_config(
             "Synapse pusher", config_options
         )
     except ConfigError as e:
         sys.stderr.write("\n" + e.message + "\n")
         sys.exit(1)
 
-    if not config:
-        sys.exit(0)
+    worker_config = config.workers[worker_name]
 
-    config.setup_logging()
+    setup_logging(worker_config.log_config, worker_config.log_file)
+
+    clobber_with_worker_config(config, worker_config)
+
+    if config.start_pushers:
+        sys.stderr.write(
+            "\nThe pushers must be disabled in the main synapse process"
+            "\nbefore they can be run in a separate worker."
+            "\nPlease add ``start_pushers: false`` to the main config"
+            "\n"
+        )
+        sys.exit(1)
+
+    # Force the pushers to start since they will be disabled in the main config
+    config.start_pushers = True
 
     database_engine = create_engine(config.database_config)
 
@@ -354,11 +280,15 @@ def setup(config_options):
     )
 
     ps.setup()
-    ps.start_listening()
-
-    change_resource_limit(ps.config.soft_file_limit)
-    if ps.config.gc_thresholds:
-        gc.set_threshold(*ps.config.gc_thresholds)
+    ps.start_listening(worker_config.listeners)
+
+    def run():
+        with LoggingContext("run"):
+            logger.info("Running")
+            change_resource_limit(worker_config.soft_file_limit)
+            if worker_config.gc_thresholds:
+                ps.set_threshold(worker_config.gc_thresholds)
+            reactor.run()
 
     def start():
         ps.replicate()
@@ -367,30 +297,21 @@ def setup(config_options):
 
     reactor.callWhenRunning(start)
 
-    return ps
+    if worker_config.daemonize:
+        daemon = Daemonize(
+            app="synapse-pusher",
+            pid=worker_config.pid_file,
+            action=run,
+            auto_close_fds=False,
+            verbose=True,
+            logger=logger,
+        )
+        daemon.start()
+    else:
+        run()
 
 
 if __name__ == '__main__':
     with LoggingContext("main"):
-        ps = setup(sys.argv[1:])
-
-        if ps.config.daemonize:
-            def run():
-                with LoggingContext("run"):
-                    change_resource_limit(ps.config.soft_file_limit)
-                    if ps.config.gc_thresholds:
-                        gc.set_threshold(*ps.config.gc_thresholds)
-                    reactor.run()
-
-            daemon = Daemonize(
-                app="synapse-pusher",
-                pid=ps.config.pid_file,
-                action=run,
-                auto_close_fds=False,
-                verbose=True,
-                logger=logger,
-            )
-
-            daemon.start()
-        else:
-            reactor.run()
+        worker_name = sys.argv[1]
+        ps = setup(worker_name, sys.argv[2:])
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 297e199453..7a607faef6 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -18,9 +18,9 @@ import synapse
 
 from synapse.api.constants import EventTypes, PresenceState
 from synapse.config._base import ConfigError
-from synapse.config.database import DatabaseConfig
-from synapse.config.logger import LoggingConfig
-from synapse.config.appservice import AppServiceConfig
+from synapse.config.homeserver import HomeServerConfig
+from synapse.config.logger import setup_logging
+from synapse.config.workers import clobber_with_worker_config
 from synapse.events import FrozenEvent
 from synapse.handlers.presence import PresenceHandler
 from synapse.http.site import SynapseSite
@@ -57,76 +57,11 @@ from daemonize import Daemonize
 import sys
 import logging
 import contextlib
-import gc
 import ujson as json
 
 logger = logging.getLogger("synapse.app.synchrotron")
 
 
-class SynchrotronConfig(DatabaseConfig, LoggingConfig, AppServiceConfig):
-    def read_config(self, config):
-        self.replication_url = config["replication_url"]
-        self.server_name = config["server_name"]
-        self.use_insecure_ssl_client_just_for_testing_do_not_use = config.get(
-            "use_insecure_ssl_client_just_for_testing_do_not_use", False
-        )
-        self.user_agent_suffix = None
-        self.listeners = config["listeners"]
-        self.soft_file_limit = config.get("soft_file_limit")
-        self.daemonize = config.get("daemonize")
-        self.pid_file = self.abspath(config.get("pid_file"))
-        self.macaroon_secret_key = config["macaroon_secret_key"]
-        self.expire_access_token = config.get("expire_access_token", False)
-
-        thresholds = config.get("gc_thresholds", None)
-        if thresholds is not None:
-            try:
-                assert len(thresholds) == 3
-                self.gc_thresholds = (
-                    int(thresholds[0]), int(thresholds[1]), int(thresholds[2]),
-                )
-            except:
-                raise ConfigError(
-                    "Value of `gc_threshold` must be a list of three integers if set"
-                )
-        else:
-            self.gc_thresholds = None
-
-    def default_config(self, server_name, **kwargs):
-        pid_file = self.abspath("synchroton.pid")
-        return """\
-        # Slave configuration
-
-        # The replication listener on the synapse to talk to.
-        #replication_url: https://localhost:{replication_port}/_synapse/replication
-
-        server_name: "%(server_name)s"
-
-        listeners:
-        # Enable a /sync listener on the synchrontron
-        #- type: http
-        #    port: {http_port}
-        #    bind_address: ""
-        # Enable a ssh manhole listener on the synchrotron
-        # - type: manhole
-        #   port: {manhole_port}
-        #   bind_address: 127.0.0.1
-        # Enable a metric listener on the synchrotron
-        # - type: http
-        #   port: {metrics_port}
-        #   bind_address: 127.0.0.1
-        #   resources:
-        #    - names: ["metrics"]
-        #      compress: False
-
-        report_stats: False
-
-        daemonize: False
-
-        pid_file: %(pid_file)s
-        """ % locals()
-
-
 class SynchrotronSlavedStore(
     SlavedPushRuleStore,
     SlavedEventStore,
@@ -350,8 +285,8 @@ class SynchrotronServer(HomeServer):
         )
         logger.info("Synapse synchrotron now listening on port %d", port)
 
-    def start_listening(self):
-        for listener in self.config.listeners:
+    def start_listening(self, listeners):
+        for listener in listeners:
             if listener["type"] == "http":
                 self._listen_http(listener)
             elif listener["type"] == "manhole":
@@ -470,19 +405,20 @@ class SynchrotronServer(HomeServer):
         return SynchrotronTyping(self)
 
 
-def setup(config_options):
+def start(worker_name, config_options):
     try:
-        config = SynchrotronConfig.load_config(
+        config = HomeServerConfig.load_config(
             "Synapse synchrotron", config_options
         )
     except ConfigError as e:
         sys.stderr.write("\n" + e.message + "\n")
         sys.exit(1)
 
-    if not config:
-        sys.exit(0)
+    worker_config = config.workers[worker_name]
 
-    config.setup_logging()
+    setup_logging(worker_config.log_config, worker_config.log_file)
+
+    clobber_with_worker_config(config, worker_config)
 
     database_engine = create_engine(config.database_config)
 
@@ -496,11 +432,15 @@ def setup(config_options):
     )
 
     ss.setup()
-    ss.start_listening()
-
-    change_resource_limit(ss.config.soft_file_limit)
-    if ss.config.gc_thresholds:
-        ss.set_threshold(*ss.config.gc_thresholds)
+    ss.start_listening(worker_config.listeners)
+
+    def run():
+        with LoggingContext("run"):
+            logger.info("Running")
+            change_resource_limit(worker_config.soft_file_limit)
+            if worker_config.gc_thresholds:
+                ss.set_threshold(worker_config.gc_thresholds)
+            reactor.run()
 
     def start():
         ss.get_datastore().start_profiling()
@@ -508,30 +448,21 @@ def setup(config_options):
 
     reactor.callWhenRunning(start)
 
-    return ss
+    if worker_config.daemonize:
+        daemon = Daemonize(
+            app="synapse-synchrotron",
+            pid=worker_config.pid_file,
+            action=run,
+            auto_close_fds=False,
+            verbose=True,
+            logger=logger,
+        )
+        daemon.start()
+    else:
+        run()
 
 
 if __name__ == '__main__':
     with LoggingContext("main"):
-        ss = setup(sys.argv[1:])
-
-        if ss.config.daemonize:
-            def run():
-                with LoggingContext("run"):
-                    change_resource_limit(ss.config.soft_file_limit)
-                    if ss.config.gc_thresholds:
-                        gc.set_threshold(*ss.config.gc_thresholds)
-                    reactor.run()
-
-            daemon = Daemonize(
-                app="synapse-synchrotron",
-                pid=ss.config.pid_file,
-                action=run,
-                auto_close_fds=False,
-                verbose=True,
-                logger=logger,
-            )
-
-            daemon.start()
-        else:
-            reactor.run()
+        worker_name = sys.argv[1]
+        start(worker_name, sys.argv[2:])
diff --git a/synapse/config/homeserver.py b/synapse/config/homeserver.py
index fc2445484c..79b0534b3b 100644
--- a/synapse/config/homeserver.py
+++ b/synapse/config/homeserver.py
@@ -32,13 +32,15 @@ from .password import PasswordConfig
 from .jwt import JWTConfig
 from .ldap import LDAPConfig
 from .emailconfig import EmailConfig
+from .workers import WorkerConfig
 
 
 class HomeServerConfig(TlsConfig, ServerConfig, DatabaseConfig, LoggingConfig,
                        RatelimitConfig, ContentRepositoryConfig, CaptchaConfig,
                        VoipConfig, RegistrationConfig, MetricsConfig, ApiConfig,
                        AppServiceConfig, KeyConfig, SAML2Config, CasConfig,
-                       JWTConfig, LDAPConfig, PasswordConfig, EmailConfig,):
+                       JWTConfig, LDAPConfig, PasswordConfig, EmailConfig,
+                       WorkerConfig,):
     pass
 
 
diff --git a/synapse/config/logger.py b/synapse/config/logger.py
index 5047db898f..dc68683fbc 100644
--- a/synapse/config/logger.py
+++ b/synapse/config/logger.py
@@ -126,54 +126,58 @@ class LoggingConfig(Config):
                 )
 
     def setup_logging(self):
-        log_format = (
-            "%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s"
-            " - %(message)s"
-        )
-        if self.log_config is None:
-
-            level = logging.INFO
-            level_for_storage = logging.INFO
-            if self.verbosity:
-                level = logging.DEBUG
-                if self.verbosity > 1:
-                    level_for_storage = logging.DEBUG
-
-            # FIXME: we need a logging.WARN for a -q quiet option
-            logger = logging.getLogger('')
-            logger.setLevel(level)
-
-            logging.getLogger('synapse.storage').setLevel(level_for_storage)
-
-            formatter = logging.Formatter(log_format)
-            if self.log_file:
-                # TODO: Customisable file size / backup count
-                handler = logging.handlers.RotatingFileHandler(
-                    self.log_file, maxBytes=(1000 * 1000 * 100), backupCount=3
-                )
-
-                def sighup(signum, stack):
-                    logger.info("Closing log file due to SIGHUP")
-                    handler.doRollover()
-                    logger.info("Opened new log file due to SIGHUP")
-
-                # TODO(paul): obviously this is a terrible mechanism for
-                #   stealing SIGHUP, because it means no other part of synapse
-                #   can use it instead. If we want to catch SIGHUP anywhere
-                #   else as well, I'd suggest we find a nicer way to broadcast
-                #   it around.
-                if getattr(signal, "SIGHUP"):
-                    signal.signal(signal.SIGHUP, sighup)
-            else:
-                handler = logging.StreamHandler()
-            handler.setFormatter(formatter)
-
-            handler.addFilter(LoggingContextFilter(request=""))
-
-            logger.addHandler(handler)
+        setup_logging(self.log_config, self.log_file, self.verbosity)
+
+
+def setup_logging(log_config=None, log_file=None, verbosity=None):
+    log_format = (
+        "%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s"
+        " - %(message)s"
+    )
+    if log_config is None:
+
+        level = logging.INFO
+        level_for_storage = logging.INFO
+        if verbosity:
+            level = logging.DEBUG
+            if verbosity > 1:
+                level_for_storage = logging.DEBUG
+
+        # FIXME: we need a logging.WARN for a -q quiet option
+        logger = logging.getLogger('')
+        logger.setLevel(level)
+
+        logging.getLogger('synapse.storage').setLevel(level_for_storage)
+
+        formatter = logging.Formatter(log_format)
+        if log_file:
+            # TODO: Customisable file size / backup count
+            handler = logging.handlers.RotatingFileHandler(
+                log_file, maxBytes=(1000 * 1000 * 100), backupCount=3
+            )
+
+            def sighup(signum, stack):
+                logger.info("Closing log file due to SIGHUP")
+                handler.doRollover()
+                logger.info("Opened new log file due to SIGHUP")
+
+            # TODO(paul): obviously this is a terrible mechanism for
+            #   stealing SIGHUP, because it means no other part of synapse
+            #   can use it instead. If we want to catch SIGHUP anywhere
+            #   else as well, I'd suggest we find a nicer way to broadcast
+            #   it around.
+            if getattr(signal, "SIGHUP"):
+                signal.signal(signal.SIGHUP, sighup)
         else:
-            with open(self.log_config, 'r') as f:
-                logging.config.dictConfig(yaml.load(f))
+            handler = logging.StreamHandler()
+        handler.setFormatter(formatter)
+
+        handler.addFilter(LoggingContextFilter(request=""))
+
+        logger.addHandler(handler)
+    else:
+        with open(log_config, 'r') as f:
+            logging.config.dictConfig(yaml.load(f))
 
-        observer = PythonLoggingObserver()
-        observer.start()
+    observer = PythonLoggingObserver()
+    observer.start()
diff --git a/synapse/config/server.py b/synapse/config/server.py
index 44b8d422e0..f370b22c32 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -38,19 +38,7 @@ class ServerConfig(Config):
 
         self.listeners = config.get("listeners", [])
 
-        thresholds = config.get("gc_thresholds", None)
-        if thresholds is not None:
-            try:
-                assert len(thresholds) == 3
-                self.gc_thresholds = (
-                    int(thresholds[0]), int(thresholds[1]), int(thresholds[2]),
-                )
-            except:
-                raise ConfigError(
-                    "Value of `gc_threshold` must be a list of three integers if set"
-                )
-        else:
-            self.gc_thresholds = None
+        self.gc_thresholds = read_gc_thresholds(config.get("gc_thresholds", None))
 
         bind_port = config.get("bind_port")
         if bind_port:
@@ -264,3 +252,20 @@ class ServerConfig(Config):
                                   type=int,
                                   help="Turn on the twisted telnet manhole"
                                   " service on the given port.")
+
+
+def read_gc_thresholds(thresholds):
+    """Reads the three integer thresholds for garbage collection. Ensures that
+    the thresholds are integers if thresholds are supplied.
+    """
+    if thresholds is None:
+        return None
+    try:
+        assert len(thresholds) == 3
+        return (
+            int(thresholds[0]), int(thresholds[1]), int(thresholds[2]),
+        )
+    except:
+        raise ConfigError(
+            "Value of `gc_threshold` must be a list of three integers if set"
+        )