diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index 4ec23d84c1..9ac26d52c6 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -18,10 +18,8 @@ import synapse
from synapse.server import HomeServer
from synapse.config._base import ConfigError
-from synapse.config.database import DatabaseConfig
-from synapse.config.logger import LoggingConfig
-from synapse.config.emailconfig import EmailConfig
-from synapse.config.key import KeyConfig
+from synapse.config.logger import setup_logging
+from synapse.config.homeserver import HomeServerConfig
from synapse.http.site import SynapseSite
from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
from synapse.storage.roommember import RoomMemberStore
@@ -43,98 +41,12 @@ from twisted.web.resource import Resource
from daemonize import Daemonize
-import gc
import sys
import logging
logger = logging.getLogger("synapse.app.pusher")
-class SlaveConfig(DatabaseConfig):
- def read_config(self, config):
- self.replication_url = config["replication_url"]
- self.server_name = config["server_name"]
- self.use_insecure_ssl_client_just_for_testing_do_not_use = config.get(
- "use_insecure_ssl_client_just_for_testing_do_not_use", False
- )
- self.user_agent_suffix = None
- self.start_pushers = True
- self.listeners = config["listeners"]
- self.soft_file_limit = config.get("soft_file_limit")
- self.daemonize = config.get("daemonize")
- self.pid_file = self.abspath(config.get("pid_file"))
- self.public_baseurl = config["public_baseurl"]
-
- thresholds = config.get("gc_thresholds", None)
- if thresholds is not None:
- try:
- assert len(thresholds) == 3
- self.gc_thresholds = (
- int(thresholds[0]), int(thresholds[1]), int(thresholds[2]),
- )
- except:
- raise ConfigError(
- "Value of `gc_threshold` must be a list of three integers if set"
- )
- else:
- self.gc_thresholds = None
-
- # some things used by the auth handler but not actually used in the
- # pusher codebase
- self.bcrypt_rounds = None
- self.ldap_enabled = None
- self.ldap_server = None
- self.ldap_port = None
- self.ldap_tls = None
- self.ldap_search_base = None
- self.ldap_search_property = None
- self.ldap_email_property = None
- self.ldap_full_name_property = None
-
- # We would otherwise try to use the registration shared secret as the
- # macaroon shared secret if there was no macaroon_shared_secret, but
- # that means pulling in RegistrationConfig too. We don't need to be
- # backwards compaitible in the pusher codebase so just make people set
- # macaroon_shared_secret. We set this to None to prevent it referencing
- # an undefined key.
- self.registration_shared_secret = None
-
- def default_config(self, server_name, **kwargs):
- pid_file = self.abspath("pusher.pid")
- return """\
- # Slave configuration
-
- # The replication listener on the synapse to talk to.
- #replication_url: https://localhost:{replication_port}/_synapse/replication
-
- server_name: "%(server_name)s"
-
- listeners: []
- # Enable a ssh manhole listener on the pusher.
- # - type: manhole
- # port: {manhole_port}
- # bind_address: 127.0.0.1
- # Enable a metric listener on the pusher.
- # - type: http
- # port: {metrics_port}
- # bind_address: 127.0.0.1
- # resources:
- # - names: ["metrics"]
- # compress: False
-
- report_stats: False
-
- daemonize: False
-
- pid_file: %(pid_file)s
-
- """ % locals()
-
-
-class PusherSlaveConfig(SlaveConfig, LoggingConfig, EmailConfig, KeyConfig):
- pass
-
-
class PusherSlaveStore(
SlavedEventStore, SlavedPusherStore, SlavedReceiptsStore,
SlavedAccountDataStore
@@ -199,7 +111,7 @@ class PusherServer(HomeServer):
def remove_pusher(self, app_id, push_key, user_id):
http_client = self.get_simple_http_client()
- replication_url = self.config.replication_url
+ replication_url = self.config.worker_replication_url
url = replication_url + "/remove_pushers"
return http_client.post_json_get_json(url, {
"remove": [{
@@ -232,8 +144,8 @@ class PusherServer(HomeServer):
)
logger.info("Synapse pusher now listening on port %d", port)
- def start_listening(self):
- for listener in self.config.listeners:
+ def start_listening(self, listeners):
+ for listener in listeners:
if listener["type"] == "http":
self._listen_http(listener)
elif listener["type"] == "manhole":
@@ -253,7 +165,7 @@ class PusherServer(HomeServer):
def replicate(self):
http_client = self.get_simple_http_client()
store = self.get_datastore()
- replication_url = self.config.replication_url
+ replication_url = self.config.worker_replication_url
pusher_pool = self.get_pusherpool()
clock = self.get_clock()
@@ -329,19 +241,30 @@ class PusherServer(HomeServer):
yield sleep(30)
-def setup(config_options):
+def start(config_options):
try:
- config = PusherSlaveConfig.load_config(
+ config = HomeServerConfig.load_config(
"Synapse pusher", config_options
)
except ConfigError as e:
sys.stderr.write("\n" + e.message + "\n")
sys.exit(1)
- if not config:
- sys.exit(0)
+ assert config.worker_app == "synapse.app.pusher"
+
+ setup_logging(config.worker_log_config, config.worker_log_file)
+
+ if config.start_pushers:
+ sys.stderr.write(
+ "\nThe pushers must be disabled in the main synapse process"
+ "\nbefore they can be run in a separate worker."
+ "\nPlease add ``start_pushers: false`` to the main config"
+ "\n"
+ )
+ sys.exit(1)
- config.setup_logging()
+ # Force the pushers to start since they will be disabled in the main config
+ config.start_pushers = True
database_engine = create_engine(config.database_config)
@@ -354,11 +277,15 @@ def setup(config_options):
)
ps.setup()
- ps.start_listening()
-
- change_resource_limit(ps.config.soft_file_limit)
- if ps.config.gc_thresholds:
- gc.set_threshold(*ps.config.gc_thresholds)
+ ps.start_listening(config.worker_listeners)
+
+ def run():
+ with LoggingContext("run"):
+ logger.info("Running")
+ change_resource_limit(config.soft_file_limit)
+ if config.gc_thresholds:
+ ps.set_threshold(config.gc_thresholds)
+ reactor.run()
def start():
ps.replicate()
@@ -367,30 +294,20 @@ def setup(config_options):
reactor.callWhenRunning(start)
- return ps
+ if config.worker_daemonize:
+ daemon = Daemonize(
+ app="synapse-pusher",
+ pid=config.worker_pid_file,
+ action=run,
+ auto_close_fds=False,
+ verbose=True,
+ logger=logger,
+ )
+ daemon.start()
+ else:
+ run()
if __name__ == '__main__':
with LoggingContext("main"):
- ps = setup(sys.argv[1:])
-
- if ps.config.daemonize:
- def run():
- with LoggingContext("run"):
- change_resource_limit(ps.config.soft_file_limit)
- if ps.config.gc_thresholds:
- gc.set_threshold(*ps.config.gc_thresholds)
- reactor.run()
-
- daemon = Daemonize(
- app="synapse-pusher",
- pid=ps.config.pid_file,
- action=run,
- auto_close_fds=False,
- verbose=True,
- logger=logger,
- )
-
- daemon.start()
- else:
- reactor.run()
+ ps = start(sys.argv[1:])
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 297e199453..160db8637e 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -18,9 +18,8 @@ import synapse
from synapse.api.constants import EventTypes, PresenceState
from synapse.config._base import ConfigError
-from synapse.config.database import DatabaseConfig
-from synapse.config.logger import LoggingConfig
-from synapse.config.appservice import AppServiceConfig
+from synapse.config.homeserver import HomeServerConfig
+from synapse.config.logger import setup_logging
from synapse.events import FrozenEvent
from synapse.handlers.presence import PresenceHandler
from synapse.http.site import SynapseSite
@@ -57,76 +56,11 @@ from daemonize import Daemonize
import sys
import logging
import contextlib
-import gc
import ujson as json
logger = logging.getLogger("synapse.app.synchrotron")
-class SynchrotronConfig(DatabaseConfig, LoggingConfig, AppServiceConfig):
- def read_config(self, config):
- self.replication_url = config["replication_url"]
- self.server_name = config["server_name"]
- self.use_insecure_ssl_client_just_for_testing_do_not_use = config.get(
- "use_insecure_ssl_client_just_for_testing_do_not_use", False
- )
- self.user_agent_suffix = None
- self.listeners = config["listeners"]
- self.soft_file_limit = config.get("soft_file_limit")
- self.daemonize = config.get("daemonize")
- self.pid_file = self.abspath(config.get("pid_file"))
- self.macaroon_secret_key = config["macaroon_secret_key"]
- self.expire_access_token = config.get("expire_access_token", False)
-
- thresholds = config.get("gc_thresholds", None)
- if thresholds is not None:
- try:
- assert len(thresholds) == 3
- self.gc_thresholds = (
- int(thresholds[0]), int(thresholds[1]), int(thresholds[2]),
- )
- except:
- raise ConfigError(
- "Value of `gc_threshold` must be a list of three integers if set"
- )
- else:
- self.gc_thresholds = None
-
- def default_config(self, server_name, **kwargs):
- pid_file = self.abspath("synchroton.pid")
- return """\
- # Slave configuration
-
- # The replication listener on the synapse to talk to.
- #replication_url: https://localhost:{replication_port}/_synapse/replication
-
- server_name: "%(server_name)s"
-
- listeners:
- # Enable a /sync listener on the synchrontron
- #- type: http
- # port: {http_port}
- # bind_address: ""
- # Enable a ssh manhole listener on the synchrotron
- # - type: manhole
- # port: {manhole_port}
- # bind_address: 127.0.0.1
- # Enable a metric listener on the synchrotron
- # - type: http
- # port: {metrics_port}
- # bind_address: 127.0.0.1
- # resources:
- # - names: ["metrics"]
- # compress: False
-
- report_stats: False
-
- daemonize: False
-
- pid_file: %(pid_file)s
- """ % locals()
-
-
class SynchrotronSlavedStore(
SlavedPushRuleStore,
SlavedEventStore,
@@ -163,7 +97,7 @@ class SynchrotronPresence(object):
self.http_client = hs.get_simple_http_client()
self.store = hs.get_datastore()
self.user_to_num_current_syncs = {}
- self.syncing_users_url = hs.config.replication_url + "/syncing_users"
+ self.syncing_users_url = hs.config.worker_replication_url + "/syncing_users"
self.clock = hs.get_clock()
active_presence = self.store.take_presence_startup_info()
@@ -350,8 +284,8 @@ class SynchrotronServer(HomeServer):
)
logger.info("Synapse synchrotron now listening on port %d", port)
- def start_listening(self):
- for listener in self.config.listeners:
+ def start_listening(self, listeners):
+ for listener in listeners:
if listener["type"] == "http":
self._listen_http(listener)
elif listener["type"] == "manhole":
@@ -371,7 +305,7 @@ class SynchrotronServer(HomeServer):
def replicate(self):
http_client = self.get_simple_http_client()
store = self.get_datastore()
- replication_url = self.config.replication_url
+ replication_url = self.config.worker_replication_url
clock = self.get_clock()
notifier = self.get_notifier()
presence_handler = self.get_presence_handler()
@@ -470,19 +404,18 @@ class SynchrotronServer(HomeServer):
return SynchrotronTyping(self)
-def setup(config_options):
+def start(config_options):
try:
- config = SynchrotronConfig.load_config(
+ config = HomeServerConfig.load_config(
"Synapse synchrotron", config_options
)
except ConfigError as e:
sys.stderr.write("\n" + e.message + "\n")
sys.exit(1)
- if not config:
- sys.exit(0)
+ assert config.worker_app == "synapse.app.synchrotron"
- config.setup_logging()
+ setup_logging(config.worker_log_config, config.worker_log_file)
database_engine = create_engine(config.database_config)
@@ -496,11 +429,15 @@ def setup(config_options):
)
ss.setup()
- ss.start_listening()
-
- change_resource_limit(ss.config.soft_file_limit)
- if ss.config.gc_thresholds:
- ss.set_threshold(*ss.config.gc_thresholds)
+ ss.start_listening(config.worker_listeners)
+
+ def run():
+ with LoggingContext("run"):
+ logger.info("Running")
+ change_resource_limit(config.soft_file_limit)
+ if config.gc_thresholds:
+ ss.set_threshold(config.gc_thresholds)
+ reactor.run()
def start():
ss.get_datastore().start_profiling()
@@ -508,30 +445,20 @@ def setup(config_options):
reactor.callWhenRunning(start)
- return ss
+ if config.worker_daemonize:
+ daemon = Daemonize(
+ app="synapse-synchrotron",
+ pid=config.worker_pid_file,
+ action=run,
+ auto_close_fds=False,
+ verbose=True,
+ logger=logger,
+ )
+ daemon.start()
+ else:
+ run()
if __name__ == '__main__':
with LoggingContext("main"):
- ss = setup(sys.argv[1:])
-
- if ss.config.daemonize:
- def run():
- with LoggingContext("run"):
- change_resource_limit(ss.config.soft_file_limit)
- if ss.config.gc_thresholds:
- gc.set_threshold(*ss.config.gc_thresholds)
- reactor.run()
-
- daemon = Daemonize(
- app="synapse-synchrotron",
- pid=ss.config.pid_file,
- action=run,
- auto_close_fds=False,
- verbose=True,
- logger=logger,
- )
-
- daemon.start()
- else:
- reactor.run()
+ start(sys.argv[1:])
diff --git a/synapse/config/homeserver.py b/synapse/config/homeserver.py
index fc2445484c..79b0534b3b 100644
--- a/synapse/config/homeserver.py
+++ b/synapse/config/homeserver.py
@@ -32,13 +32,15 @@ from .password import PasswordConfig
from .jwt import JWTConfig
from .ldap import LDAPConfig
from .emailconfig import EmailConfig
+from .workers import WorkerConfig
class HomeServerConfig(TlsConfig, ServerConfig, DatabaseConfig, LoggingConfig,
RatelimitConfig, ContentRepositoryConfig, CaptchaConfig,
VoipConfig, RegistrationConfig, MetricsConfig, ApiConfig,
AppServiceConfig, KeyConfig, SAML2Config, CasConfig,
- JWTConfig, LDAPConfig, PasswordConfig, EmailConfig,):
+ JWTConfig, LDAPConfig, PasswordConfig, EmailConfig,
+ WorkerConfig,):
pass
diff --git a/synapse/config/logger.py b/synapse/config/logger.py
index 5047db898f..dc68683fbc 100644
--- a/synapse/config/logger.py
+++ b/synapse/config/logger.py
@@ -126,54 +126,58 @@ class LoggingConfig(Config):
)
def setup_logging(self):
- log_format = (
- "%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s"
- " - %(message)s"
- )
- if self.log_config is None:
-
- level = logging.INFO
- level_for_storage = logging.INFO
- if self.verbosity:
- level = logging.DEBUG
- if self.verbosity > 1:
- level_for_storage = logging.DEBUG
-
- # FIXME: we need a logging.WARN for a -q quiet option
- logger = logging.getLogger('')
- logger.setLevel(level)
-
- logging.getLogger('synapse.storage').setLevel(level_for_storage)
-
- formatter = logging.Formatter(log_format)
- if self.log_file:
- # TODO: Customisable file size / backup count
- handler = logging.handlers.RotatingFileHandler(
- self.log_file, maxBytes=(1000 * 1000 * 100), backupCount=3
- )
-
- def sighup(signum, stack):
- logger.info("Closing log file due to SIGHUP")
- handler.doRollover()
- logger.info("Opened new log file due to SIGHUP")
-
- # TODO(paul): obviously this is a terrible mechanism for
- # stealing SIGHUP, because it means no other part of synapse
- # can use it instead. If we want to catch SIGHUP anywhere
- # else as well, I'd suggest we find a nicer way to broadcast
- # it around.
- if getattr(signal, "SIGHUP"):
- signal.signal(signal.SIGHUP, sighup)
- else:
- handler = logging.StreamHandler()
- handler.setFormatter(formatter)
-
- handler.addFilter(LoggingContextFilter(request=""))
-
- logger.addHandler(handler)
+ setup_logging(self.log_config, self.log_file, self.verbosity)
+
+
+def setup_logging(log_config=None, log_file=None, verbosity=None):
+ log_format = (
+ "%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s"
+ " - %(message)s"
+ )
+ if log_config is None:
+
+ level = logging.INFO
+ level_for_storage = logging.INFO
+ if verbosity:
+ level = logging.DEBUG
+ if verbosity > 1:
+ level_for_storage = logging.DEBUG
+
+ # FIXME: we need a logging.WARN for a -q quiet option
+ logger = logging.getLogger('')
+ logger.setLevel(level)
+
+ logging.getLogger('synapse.storage').setLevel(level_for_storage)
+
+ formatter = logging.Formatter(log_format)
+ if log_file:
+ # TODO: Customisable file size / backup count
+ handler = logging.handlers.RotatingFileHandler(
+ log_file, maxBytes=(1000 * 1000 * 100), backupCount=3
+ )
+
+ def sighup(signum, stack):
+ logger.info("Closing log file due to SIGHUP")
+ handler.doRollover()
+ logger.info("Opened new log file due to SIGHUP")
+
+ # TODO(paul): obviously this is a terrible mechanism for
+ # stealing SIGHUP, because it means no other part of synapse
+ # can use it instead. If we want to catch SIGHUP anywhere
+ # else as well, I'd suggest we find a nicer way to broadcast
+ # it around.
+ if getattr(signal, "SIGHUP"):
+ signal.signal(signal.SIGHUP, sighup)
else:
- with open(self.log_config, 'r') as f:
- logging.config.dictConfig(yaml.load(f))
+ handler = logging.StreamHandler()
+ handler.setFormatter(formatter)
+
+ handler.addFilter(LoggingContextFilter(request=""))
+
+ logger.addHandler(handler)
+ else:
+ with open(log_config, 'r') as f:
+ logging.config.dictConfig(yaml.load(f))
- observer = PythonLoggingObserver()
- observer.start()
+ observer = PythonLoggingObserver()
+ observer.start()
diff --git a/synapse/config/server.py b/synapse/config/server.py
index 44b8d422e0..f370b22c32 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -38,19 +38,7 @@ class ServerConfig(Config):
self.listeners = config.get("listeners", [])
- thresholds = config.get("gc_thresholds", None)
- if thresholds is not None:
- try:
- assert len(thresholds) == 3
- self.gc_thresholds = (
- int(thresholds[0]), int(thresholds[1]), int(thresholds[2]),
- )
- except:
- raise ConfigError(
- "Value of `gc_threshold` must be a list of three integers if set"
- )
- else:
- self.gc_thresholds = None
+ self.gc_thresholds = read_gc_thresholds(config.get("gc_thresholds", None))
bind_port = config.get("bind_port")
if bind_port:
@@ -264,3 +252,20 @@ class ServerConfig(Config):
type=int,
help="Turn on the twisted telnet manhole"
" service on the given port.")
+
+
+def read_gc_thresholds(thresholds):
+ """Reads the three integer thresholds for garbage collection. Ensures that
+ the thresholds are integers if thresholds are supplied.
+ """
+ if thresholds is None:
+ return None
+ try:
+ assert len(thresholds) == 3
+ return (
+ int(thresholds[0]), int(thresholds[1]), int(thresholds[2]),
+ )
+ except:
+ raise ConfigError(
+ "Value of `gc_threshold` must be a list of three integers if set"
+ )
diff --git a/synapse/config/workers.py b/synapse/config/workers.py
new file mode 100644
index 0000000000..904789d155
--- /dev/null
+++ b/synapse/config/workers.py
@@ -0,0 +1,31 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 matrix.org
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import Config
+
+
+class WorkerConfig(Config):
+ """The workers are processes run separately to the main synapse process.
+ They have their own pid_file and listener configuration. They use the
+ replication_url to talk to the main synapse process."""
+
+ def read_config(self, config):
+ self.worker_app = config.get("worker_app")
+ self.worker_listeners = config.get("worker_listeners")
+ self.worker_daemonize = config.get("worker_daemonize")
+ self.worker_pid_file = config.get("worker_pid_file")
+ self.worker_log_file = config.get("worker_log_file")
+ self.worker_log_config = config.get("worker_log_config")
+ self.worker_replication_url = config.get("worker_replication_url")
|