From 885ee861f7270fef1370a2d63e009a8fceaf8dd5 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 16 Jun 2016 11:06:12 +0100 Subject: Inline the synchrotron and pusher configs into the main config --- synapse/app/pusher.py | 171 ++++++++++++--------------------------------- synapse/app/synchrotron.py | 137 +++++++++--------------------------- 2 files changed, 80 insertions(+), 228 deletions(-) (limited to 'synapse/app') diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 4ec23d84c1..6c8c02fb38 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -18,10 +18,9 @@ import synapse from synapse.server import HomeServer from synapse.config._base import ConfigError -from synapse.config.database import DatabaseConfig -from synapse.config.logger import LoggingConfig -from synapse.config.emailconfig import EmailConfig -from synapse.config.key import KeyConfig +from synapse.config.workers import clobber_with_worker_config +from synapse.config.logger import setup_logging +from synapse.config.homeserver import HomeServerConfig from synapse.http.site import SynapseSite from synapse.metrics.resource import MetricsResource, METRICS_PREFIX from synapse.storage.roommember import RoomMemberStore @@ -43,98 +42,12 @@ from twisted.web.resource import Resource from daemonize import Daemonize -import gc import sys import logging logger = logging.getLogger("synapse.app.pusher") -class SlaveConfig(DatabaseConfig): - def read_config(self, config): - self.replication_url = config["replication_url"] - self.server_name = config["server_name"] - self.use_insecure_ssl_client_just_for_testing_do_not_use = config.get( - "use_insecure_ssl_client_just_for_testing_do_not_use", False - ) - self.user_agent_suffix = None - self.start_pushers = True - self.listeners = config["listeners"] - self.soft_file_limit = config.get("soft_file_limit") - self.daemonize = config.get("daemonize") - self.pid_file = self.abspath(config.get("pid_file")) - self.public_baseurl = config["public_baseurl"] - - thresholds = config.get("gc_thresholds", None) - if thresholds is not None: - try: - assert len(thresholds) == 3 - self.gc_thresholds = ( - int(thresholds[0]), int(thresholds[1]), int(thresholds[2]), - ) - except: - raise ConfigError( - "Value of `gc_threshold` must be a list of three integers if set" - ) - else: - self.gc_thresholds = None - - # some things used by the auth handler but not actually used in the - # pusher codebase - self.bcrypt_rounds = None - self.ldap_enabled = None - self.ldap_server = None - self.ldap_port = None - self.ldap_tls = None - self.ldap_search_base = None - self.ldap_search_property = None - self.ldap_email_property = None - self.ldap_full_name_property = None - - # We would otherwise try to use the registration shared secret as the - # macaroon shared secret if there was no macaroon_shared_secret, but - # that means pulling in RegistrationConfig too. We don't need to be - # backwards compaitible in the pusher codebase so just make people set - # macaroon_shared_secret. We set this to None to prevent it referencing - # an undefined key. - self.registration_shared_secret = None - - def default_config(self, server_name, **kwargs): - pid_file = self.abspath("pusher.pid") - return """\ - # Slave configuration - - # The replication listener on the synapse to talk to. - #replication_url: https://localhost:{replication_port}/_synapse/replication - - server_name: "%(server_name)s" - - listeners: [] - # Enable a ssh manhole listener on the pusher. - # - type: manhole - # port: {manhole_port} - # bind_address: 127.0.0.1 - # Enable a metric listener on the pusher. - # - type: http - # port: {metrics_port} - # bind_address: 127.0.0.1 - # resources: - # - names: ["metrics"] - # compress: False - - report_stats: False - - daemonize: False - - pid_file: %(pid_file)s - - """ % locals() - - -class PusherSlaveConfig(SlaveConfig, LoggingConfig, EmailConfig, KeyConfig): - pass - - class PusherSlaveStore( SlavedEventStore, SlavedPusherStore, SlavedReceiptsStore, SlavedAccountDataStore @@ -232,8 +145,8 @@ class PusherServer(HomeServer): ) logger.info("Synapse pusher now listening on port %d", port) - def start_listening(self): - for listener in self.config.listeners: + def start_listening(self, listeners): + for listener in listeners: if listener["type"] == "http": self._listen_http(listener) elif listener["type"] == "manhole": @@ -329,19 +242,32 @@ class PusherServer(HomeServer): yield sleep(30) -def setup(config_options): +def setup(worker_name, config_options): try: - config = PusherSlaveConfig.load_config( + config = HomeServerConfig.load_config( "Synapse pusher", config_options ) except ConfigError as e: sys.stderr.write("\n" + e.message + "\n") sys.exit(1) - if not config: - sys.exit(0) + worker_config = config.workers[worker_name] - config.setup_logging() + setup_logging(worker_config.log_config, worker_config.log_file) + + clobber_with_worker_config(config, worker_config) + + if config.start_pushers: + sys.stderr.write( + "\nThe pushers must be disabled in the main synapse process" + "\nbefore they can be run in a separate worker." + "\nPlease add ``start_pushers: false`` to the main config" + "\n" + ) + sys.exit(1) + + # Force the pushers to start since they will be disabled in the main config + config.start_pushers = True database_engine = create_engine(config.database_config) @@ -354,11 +280,15 @@ def setup(config_options): ) ps.setup() - ps.start_listening() - - change_resource_limit(ps.config.soft_file_limit) - if ps.config.gc_thresholds: - gc.set_threshold(*ps.config.gc_thresholds) + ps.start_listening(worker_config.listeners) + + def run(): + with LoggingContext("run"): + logger.info("Running") + change_resource_limit(worker_config.soft_file_limit) + if worker_config.gc_thresholds: + ps.set_threshold(worker_config.gc_thresholds) + reactor.run() def start(): ps.replicate() @@ -367,30 +297,21 @@ def setup(config_options): reactor.callWhenRunning(start) - return ps + if worker_config.daemonize: + daemon = Daemonize( + app="synapse-pusher", + pid=worker_config.pid_file, + action=run, + auto_close_fds=False, + verbose=True, + logger=logger, + ) + daemon.start() + else: + run() if __name__ == '__main__': with LoggingContext("main"): - ps = setup(sys.argv[1:]) - - if ps.config.daemonize: - def run(): - with LoggingContext("run"): - change_resource_limit(ps.config.soft_file_limit) - if ps.config.gc_thresholds: - gc.set_threshold(*ps.config.gc_thresholds) - reactor.run() - - daemon = Daemonize( - app="synapse-pusher", - pid=ps.config.pid_file, - action=run, - auto_close_fds=False, - verbose=True, - logger=logger, - ) - - daemon.start() - else: - reactor.run() + worker_name = sys.argv[1] + ps = setup(worker_name, sys.argv[2:]) diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 297e199453..7a607faef6 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -18,9 +18,9 @@ import synapse from synapse.api.constants import EventTypes, PresenceState from synapse.config._base import ConfigError -from synapse.config.database import DatabaseConfig -from synapse.config.logger import LoggingConfig -from synapse.config.appservice import AppServiceConfig +from synapse.config.homeserver import HomeServerConfig +from synapse.config.logger import setup_logging +from synapse.config.workers import clobber_with_worker_config from synapse.events import FrozenEvent from synapse.handlers.presence import PresenceHandler from synapse.http.site import SynapseSite @@ -57,76 +57,11 @@ from daemonize import Daemonize import sys import logging import contextlib -import gc import ujson as json logger = logging.getLogger("synapse.app.synchrotron") -class SynchrotronConfig(DatabaseConfig, LoggingConfig, AppServiceConfig): - def read_config(self, config): - self.replication_url = config["replication_url"] - self.server_name = config["server_name"] - self.use_insecure_ssl_client_just_for_testing_do_not_use = config.get( - "use_insecure_ssl_client_just_for_testing_do_not_use", False - ) - self.user_agent_suffix = None - self.listeners = config["listeners"] - self.soft_file_limit = config.get("soft_file_limit") - self.daemonize = config.get("daemonize") - self.pid_file = self.abspath(config.get("pid_file")) - self.macaroon_secret_key = config["macaroon_secret_key"] - self.expire_access_token = config.get("expire_access_token", False) - - thresholds = config.get("gc_thresholds", None) - if thresholds is not None: - try: - assert len(thresholds) == 3 - self.gc_thresholds = ( - int(thresholds[0]), int(thresholds[1]), int(thresholds[2]), - ) - except: - raise ConfigError( - "Value of `gc_threshold` must be a list of three integers if set" - ) - else: - self.gc_thresholds = None - - def default_config(self, server_name, **kwargs): - pid_file = self.abspath("synchroton.pid") - return """\ - # Slave configuration - - # The replication listener on the synapse to talk to. - #replication_url: https://localhost:{replication_port}/_synapse/replication - - server_name: "%(server_name)s" - - listeners: - # Enable a /sync listener on the synchrontron - #- type: http - # port: {http_port} - # bind_address: "" - # Enable a ssh manhole listener on the synchrotron - # - type: manhole - # port: {manhole_port} - # bind_address: 127.0.0.1 - # Enable a metric listener on the synchrotron - # - type: http - # port: {metrics_port} - # bind_address: 127.0.0.1 - # resources: - # - names: ["metrics"] - # compress: False - - report_stats: False - - daemonize: False - - pid_file: %(pid_file)s - """ % locals() - - class SynchrotronSlavedStore( SlavedPushRuleStore, SlavedEventStore, @@ -350,8 +285,8 @@ class SynchrotronServer(HomeServer): ) logger.info("Synapse synchrotron now listening on port %d", port) - def start_listening(self): - for listener in self.config.listeners: + def start_listening(self, listeners): + for listener in listeners: if listener["type"] == "http": self._listen_http(listener) elif listener["type"] == "manhole": @@ -470,19 +405,20 @@ class SynchrotronServer(HomeServer): return SynchrotronTyping(self) -def setup(config_options): +def start(worker_name, config_options): try: - config = SynchrotronConfig.load_config( + config = HomeServerConfig.load_config( "Synapse synchrotron", config_options ) except ConfigError as e: sys.stderr.write("\n" + e.message + "\n") sys.exit(1) - if not config: - sys.exit(0) + worker_config = config.workers[worker_name] - config.setup_logging() + setup_logging(worker_config.log_config, worker_config.log_file) + + clobber_with_worker_config(config, worker_config) database_engine = create_engine(config.database_config) @@ -496,11 +432,15 @@ def setup(config_options): ) ss.setup() - ss.start_listening() - - change_resource_limit(ss.config.soft_file_limit) - if ss.config.gc_thresholds: - ss.set_threshold(*ss.config.gc_thresholds) + ss.start_listening(worker_config.listeners) + + def run(): + with LoggingContext("run"): + logger.info("Running") + change_resource_limit(worker_config.soft_file_limit) + if worker_config.gc_thresholds: + ss.set_threshold(worker_config.gc_thresholds) + reactor.run() def start(): ss.get_datastore().start_profiling() @@ -508,30 +448,21 @@ def setup(config_options): reactor.callWhenRunning(start) - return ss + if worker_config.daemonize: + daemon = Daemonize( + app="synapse-synchrotron", + pid=worker_config.pid_file, + action=run, + auto_close_fds=False, + verbose=True, + logger=logger, + ) + daemon.start() + else: + run() if __name__ == '__main__': with LoggingContext("main"): - ss = setup(sys.argv[1:]) - - if ss.config.daemonize: - def run(): - with LoggingContext("run"): - change_resource_limit(ss.config.soft_file_limit) - if ss.config.gc_thresholds: - gc.set_threshold(*ss.config.gc_thresholds) - reactor.run() - - daemon = Daemonize( - app="synapse-synchrotron", - pid=ss.config.pid_file, - action=run, - auto_close_fds=False, - verbose=True, - logger=logger, - ) - - daemon.start() - else: - reactor.run() + worker_name = sys.argv[1] + start(worker_name, sys.argv[2:]) -- cgit 1.4.1 From bde13833cb42fc6e09928ffb4f4efad9244abffa Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 16 Jun 2016 12:44:40 +0100 Subject: Access replication_url from the worker config directly --- synapse/app/pusher.py | 5 +++-- synapse/app/synchrotron.py | 5 +++-- synapse/config/workers.py | 4 ---- 3 files changed, 6 insertions(+), 8 deletions(-) (limited to 'synapse/app') diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 6c8c02fb38..a26a3bd394 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -112,7 +112,7 @@ class PusherServer(HomeServer): def remove_pusher(self, app_id, push_key, user_id): http_client = self.get_simple_http_client() - replication_url = self.config.replication_url + replication_url = self.worker_config.replication_url url = replication_url + "/remove_pushers" return http_client.post_json_get_json(url, { "remove": [{ @@ -166,7 +166,7 @@ class PusherServer(HomeServer): def replicate(self): http_client = self.get_simple_http_client() store = self.get_datastore() - replication_url = self.config.replication_url + replication_url = self.worker_config.replication_url pusher_pool = self.get_pusherpool() clock = self.get_clock() @@ -275,6 +275,7 @@ def setup(worker_name, config_options): config.server_name, db_config=config.database_config, config=config, + worker_config=worker_config, version_string=get_version_string("Synapse", synapse), database_engine=database_engine, ) diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 7a607faef6..4443c73e6a 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -98,7 +98,7 @@ class SynchrotronPresence(object): self.http_client = hs.get_simple_http_client() self.store = hs.get_datastore() self.user_to_num_current_syncs = {} - self.syncing_users_url = hs.config.replication_url + "/syncing_users" + self.syncing_users_url = hs.worker_config.replication_url + "/syncing_users" self.clock = hs.get_clock() active_presence = self.store.take_presence_startup_info() @@ -306,7 +306,7 @@ class SynchrotronServer(HomeServer): def replicate(self): http_client = self.get_simple_http_client() store = self.get_datastore() - replication_url = self.config.replication_url + replication_url = self.worker_config.replication_url clock = self.get_clock() notifier = self.get_notifier() presence_handler = self.get_presence_handler() @@ -426,6 +426,7 @@ def start(worker_name, config_options): config.server_name, db_config=config.database_config, config=config, + worker_config=worker_config, version_string=get_version_string("Synapse", synapse), database_engine=database_engine, application_service_handler=SynchrotronApplicationService(), diff --git a/synapse/config/workers.py b/synapse/config/workers.py index 4f4658c0a8..f2c77ef59a 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -46,10 +46,6 @@ def clobber_with_worker_config(config, worker_config): # worker config directly. config.event_cache_size = worker_config.event_cache_size - # TODO: The replication_url should only be accessed within worker specific - # code so it really shouldn't need to be clobbered in the main config. - config.replication_url = worker_config.replication_url - def read_worker_config(config): return Worker( -- cgit 1.4.1 From 364d6167926d5d8b2a312e3d35623d2e05330e0a Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 16 Jun 2016 12:53:15 +0100 Subject: Access the event_cache_size directly from the server object. This means that the workers can override the event_cache_size directly without clobbering the value in the main synapse config. --- synapse/app/pusher.py | 6 +++--- synapse/app/synchrotron.py | 6 +++--- synapse/config/workers.py | 14 -------------- synapse/server.py | 3 +++ synapse/storage/_base.py | 2 +- 5 files changed, 10 insertions(+), 21 deletions(-) (limited to 'synapse/app') diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index a26a3bd394..5d4db4f892 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -18,7 +18,6 @@ import synapse from synapse.server import HomeServer from synapse.config._base import ConfigError -from synapse.config.workers import clobber_with_worker_config from synapse.config.logger import setup_logging from synapse.config.homeserver import HomeServerConfig from synapse.http.site import SynapseSite @@ -241,6 +240,9 @@ class PusherServer(HomeServer): logger.exception("Error replicating from %r", replication_url) yield sleep(30) + def get_event_cache_size(self): + return self.worker_config.event_cache_size + def setup(worker_name, config_options): try: @@ -255,8 +257,6 @@ def setup(worker_name, config_options): setup_logging(worker_config.log_config, worker_config.log_file) - clobber_with_worker_config(config, worker_config) - if config.start_pushers: sys.stderr.write( "\nThe pushers must be disabled in the main synapse process" diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 4443c73e6a..d10bb2b3f0 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -20,7 +20,6 @@ from synapse.api.constants import EventTypes, PresenceState from synapse.config._base import ConfigError from synapse.config.homeserver import HomeServerConfig from synapse.config.logger import setup_logging -from synapse.config.workers import clobber_with_worker_config from synapse.events import FrozenEvent from synapse.handlers.presence import PresenceHandler from synapse.http.site import SynapseSite @@ -404,6 +403,9 @@ class SynchrotronServer(HomeServer): def build_typing_handler(self): return SynchrotronTyping(self) + def get_event_cache_size(self): + return self.worker_config.event_cache_size + def start(worker_name, config_options): try: @@ -418,8 +420,6 @@ def start(worker_name, config_options): setup_logging(worker_config.log_config, worker_config.log_file) - clobber_with_worker_config(config, worker_config) - database_engine = create_engine(config.database_config) ss = SynchrotronServer( diff --git a/synapse/config/workers.py b/synapse/config/workers.py index f2c77ef59a..503358e03e 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -33,20 +33,6 @@ Worker = collections.namedtuple("Worker", [ ]) -def clobber_with_worker_config(config, worker_config): - """Overrides some of the keys of the main config with worker-specific - values. We only need to override the keys that are accessed deep - withing synapse code. Most of the keys that we want to override in - the workers are accessed in setup code that is rewritten specifically - for the workers. In that new code we can access the worker config directly, - so we don't need to override the values in the main config.""" - - # TODO: The event_cache_size is accessed in the db setup. It should be - # possible to rejigg that code so that the cache size is pulled from the - # worker config directly. - config.event_cache_size = worker_config.event_cache_size - - def read_worker_config(config): return Worker( app=config["app"], diff --git a/synapse/server.py b/synapse/server.py index dd4b81c658..b3c31ece73 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -236,6 +236,9 @@ class HomeServer(object): def remove_pusher(self, app_id, push_key, user_id): return self.get_pusherpool().remove_pusher(app_id, push_key, user_id) + def get_event_cache_size(self): + return self.config.event_cache_size + def _make_dependency_method(depname): def _get(hs): diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 32c6677d47..2932880cc5 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -166,7 +166,7 @@ class SQLBaseStore(object): self._get_event_counters = PerformanceCounters() self._get_event_cache = Cache("*getEvent*", keylen=3, lru=True, - max_entries=hs.config.event_cache_size) + max_entries=hs.get_event_cache_size()) self._state_group_cache = DictionaryCache( "*stateGroupCache*", 2000 * CACHE_SIZE_FACTOR -- cgit 1.4.1 From a352b68acf473f59012340b7f481f3dfd6544ac6 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 16 Jun 2016 17:29:50 +0100 Subject: Use worker_ prefixes for worker config, use existing support for multiple config files --- synapse/app/pusher.py | 29 ++++++++++++--------------- synapse/app/synchrotron.py | 29 ++++++++++++--------------- synapse/config/workers.py | 49 ++++++++-------------------------------------- synapse/server.py | 3 --- synapse/storage/_base.py | 2 +- 5 files changed, 33 insertions(+), 79 deletions(-) (limited to 'synapse/app') diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 5d4db4f892..9ac26d52c6 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -111,7 +111,7 @@ class PusherServer(HomeServer): def remove_pusher(self, app_id, push_key, user_id): http_client = self.get_simple_http_client() - replication_url = self.worker_config.replication_url + replication_url = self.config.worker_replication_url url = replication_url + "/remove_pushers" return http_client.post_json_get_json(url, { "remove": [{ @@ -165,7 +165,7 @@ class PusherServer(HomeServer): def replicate(self): http_client = self.get_simple_http_client() store = self.get_datastore() - replication_url = self.worker_config.replication_url + replication_url = self.config.worker_replication_url pusher_pool = self.get_pusherpool() clock = self.get_clock() @@ -240,11 +240,8 @@ class PusherServer(HomeServer): logger.exception("Error replicating from %r", replication_url) yield sleep(30) - def get_event_cache_size(self): - return self.worker_config.event_cache_size - -def setup(worker_name, config_options): +def start(config_options): try: config = HomeServerConfig.load_config( "Synapse pusher", config_options @@ -253,9 +250,9 @@ def setup(worker_name, config_options): sys.stderr.write("\n" + e.message + "\n") sys.exit(1) - worker_config = config.workers[worker_name] + assert config.worker_app == "synapse.app.pusher" - setup_logging(worker_config.log_config, worker_config.log_file) + setup_logging(config.worker_log_config, config.worker_log_file) if config.start_pushers: sys.stderr.write( @@ -275,20 +272,19 @@ def setup(worker_name, config_options): config.server_name, db_config=config.database_config, config=config, - worker_config=worker_config, version_string=get_version_string("Synapse", synapse), database_engine=database_engine, ) ps.setup() - ps.start_listening(worker_config.listeners) + ps.start_listening(config.worker_listeners) def run(): with LoggingContext("run"): logger.info("Running") - change_resource_limit(worker_config.soft_file_limit) - if worker_config.gc_thresholds: - ps.set_threshold(worker_config.gc_thresholds) + change_resource_limit(config.soft_file_limit) + if config.gc_thresholds: + ps.set_threshold(config.gc_thresholds) reactor.run() def start(): @@ -298,10 +294,10 @@ def setup(worker_name, config_options): reactor.callWhenRunning(start) - if worker_config.daemonize: + if config.worker_daemonize: daemon = Daemonize( app="synapse-pusher", - pid=worker_config.pid_file, + pid=config.worker_pid_file, action=run, auto_close_fds=False, verbose=True, @@ -314,5 +310,4 @@ def setup(worker_name, config_options): if __name__ == '__main__': with LoggingContext("main"): - worker_name = sys.argv[1] - ps = setup(worker_name, sys.argv[2:]) + ps = start(sys.argv[1:]) diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index d10bb2b3f0..160db8637e 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -97,7 +97,7 @@ class SynchrotronPresence(object): self.http_client = hs.get_simple_http_client() self.store = hs.get_datastore() self.user_to_num_current_syncs = {} - self.syncing_users_url = hs.worker_config.replication_url + "/syncing_users" + self.syncing_users_url = hs.config.worker_replication_url + "/syncing_users" self.clock = hs.get_clock() active_presence = self.store.take_presence_startup_info() @@ -305,7 +305,7 @@ class SynchrotronServer(HomeServer): def replicate(self): http_client = self.get_simple_http_client() store = self.get_datastore() - replication_url = self.worker_config.replication_url + replication_url = self.config.worker_replication_url clock = self.get_clock() notifier = self.get_notifier() presence_handler = self.get_presence_handler() @@ -403,11 +403,8 @@ class SynchrotronServer(HomeServer): def build_typing_handler(self): return SynchrotronTyping(self) - def get_event_cache_size(self): - return self.worker_config.event_cache_size - -def start(worker_name, config_options): +def start(config_options): try: config = HomeServerConfig.load_config( "Synapse synchrotron", config_options @@ -416,9 +413,9 @@ def start(worker_name, config_options): sys.stderr.write("\n" + e.message + "\n") sys.exit(1) - worker_config = config.workers[worker_name] + assert config.worker_app == "synapse.app.synchrotron" - setup_logging(worker_config.log_config, worker_config.log_file) + setup_logging(config.worker_log_config, config.worker_log_file) database_engine = create_engine(config.database_config) @@ -426,21 +423,20 @@ def start(worker_name, config_options): config.server_name, db_config=config.database_config, config=config, - worker_config=worker_config, version_string=get_version_string("Synapse", synapse), database_engine=database_engine, application_service_handler=SynchrotronApplicationService(), ) ss.setup() - ss.start_listening(worker_config.listeners) + ss.start_listening(config.worker_listeners) def run(): with LoggingContext("run"): logger.info("Running") - change_resource_limit(worker_config.soft_file_limit) - if worker_config.gc_thresholds: - ss.set_threshold(worker_config.gc_thresholds) + change_resource_limit(config.soft_file_limit) + if config.gc_thresholds: + ss.set_threshold(config.gc_thresholds) reactor.run() def start(): @@ -449,10 +445,10 @@ def start(worker_name, config_options): reactor.callWhenRunning(start) - if worker_config.daemonize: + if config.worker_daemonize: daemon = Daemonize( app="synapse-synchrotron", - pid=worker_config.pid_file, + pid=config.worker_pid_file, action=run, auto_close_fds=False, verbose=True, @@ -465,5 +461,4 @@ def start(worker_name, config_options): if __name__ == '__main__': with LoggingContext("main"): - worker_name = sys.argv[1] - start(worker_name, sys.argv[2:]) + start(sys.argv[1:]) diff --git a/synapse/config/workers.py b/synapse/config/workers.py index 503358e03e..904789d155 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -13,52 +13,19 @@ # See the License for the specific language governing permissions and # limitations under the License. -import collections - from ._base import Config -from .server import read_gc_thresholds - - -Worker = collections.namedtuple("Worker", [ - "app", - "listeners", - "pid_file", - "daemonize", - "log_file", - "log_config", - "event_cache_size", - "soft_file_limit", - "gc_thresholds", - "replication_url", -]) - - -def read_worker_config(config): - return Worker( - app=config["app"], - listeners=config.get("listeners", []), - pid_file=config.get("pid_file"), - daemonize=config["daemonize"], - log_file=config.get("log_file"), - log_config=config.get("log_config"), - event_cache_size=Config.parse_size(config.get("event_cache_size", "10K")), - soft_file_limit=config.get("soft_file_limit"), - gc_thresholds=read_gc_thresholds(config.get("gc_thresholds")), - replication_url=config.get("replication_url"), - ) class WorkerConfig(Config): """The workers are processes run separately to the main synapse process. - Each worker has a name that identifies it within the config file. They have their own pid_file and listener configuration. They use the - replication_url to talk to the main synapse process. They have their - own cache size tuning, gc threshold tuning and open file limits.""" + replication_url to talk to the main synapse process.""" def read_config(self, config): - workers = config.get("workers", {}) - - self.workers = { - worker_name: read_worker_config(worker_config) - for worker_name, worker_config in workers.items() - } + self.worker_app = config.get("worker_app") + self.worker_listeners = config.get("worker_listeners") + self.worker_daemonize = config.get("worker_daemonize") + self.worker_pid_file = config.get("worker_pid_file") + self.worker_log_file = config.get("worker_log_file") + self.worker_log_config = config.get("worker_log_config") + self.worker_replication_url = config.get("worker_replication_url") diff --git a/synapse/server.py b/synapse/server.py index b3c31ece73..dd4b81c658 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -236,9 +236,6 @@ class HomeServer(object): def remove_pusher(self, app_id, push_key, user_id): return self.get_pusherpool().remove_pusher(app_id, push_key, user_id) - def get_event_cache_size(self): - return self.config.event_cache_size - def _make_dependency_method(depname): def _get(hs): diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 2932880cc5..32c6677d47 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -166,7 +166,7 @@ class SQLBaseStore(object): self._get_event_counters = PerformanceCounters() self._get_event_cache = Cache("*getEvent*", keylen=3, lru=True, - max_entries=hs.get_event_cache_size()) + max_entries=hs.config.event_cache_size) self._state_group_cache = DictionaryCache( "*stateGroupCache*", 2000 * CACHE_SIZE_FACTOR -- cgit 1.4.1 From 8c75040c25495bf29f4c76ca0fcc032975210012 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 17 Jun 2016 11:48:12 +0100 Subject: Fix setting gc thresholds in the workers --- synapse/app/pusher.py | 3 ++- synapse/app/synchrotron.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) (limited to 'synapse/app') diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 9ac26d52c6..4f1d18ab5f 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -43,6 +43,7 @@ from daemonize import Daemonize import sys import logging +import gc logger = logging.getLogger("synapse.app.pusher") @@ -284,7 +285,7 @@ def start(config_options): logger.info("Running") change_resource_limit(config.soft_file_limit) if config.gc_thresholds: - ps.set_threshold(config.gc_thresholds) + gc.set_threshold(*config.gc_thresholds) reactor.run() def start(): diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 160db8637e..8cf5bbbb6d 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -56,6 +56,7 @@ from daemonize import Daemonize import sys import logging import contextlib +import gc import ujson as json logger = logging.getLogger("synapse.app.synchrotron") @@ -436,7 +437,7 @@ def start(config_options): logger.info("Running") change_resource_limit(config.soft_file_limit) if config.gc_thresholds: - ss.set_threshold(config.gc_thresholds) + gc.set_threshold(*config.gc_thresholds) reactor.run() def start(): -- cgit 1.4.1