diff options
Diffstat (limited to 'synapse/app/pusher.py')
-rw-r--r-- | synapse/app/pusher.py | 136 |
1 files changed, 47 insertions, 89 deletions
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 135dd58c15..c8dde0fcb8 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -18,9 +18,8 @@ import synapse from synapse.server import HomeServer from synapse.config._base import ConfigError -from synapse.config.database import DatabaseConfig -from synapse.config.logger import LoggingConfig -from synapse.config.emailconfig import EmailConfig +from synapse.config.logger import setup_logging +from synapse.config.homeserver import HomeServerConfig from synapse.http.site import SynapseSite from synapse.metrics.resource import MetricsResource, METRICS_PREFIX from synapse.storage.roommember import RoomMemberStore @@ -44,61 +43,11 @@ from daemonize import Daemonize import sys import logging +import gc logger = logging.getLogger("synapse.app.pusher") -class SlaveConfig(DatabaseConfig): - def read_config(self, config): - self.replication_url = config["replication_url"] - self.server_name = config["server_name"] - self.use_insecure_ssl_client_just_for_testing_do_not_use = config.get( - "use_insecure_ssl_client_just_for_testing_do_not_use", False - ) - self.user_agent_suffix = None - self.start_pushers = True - self.listeners = config["listeners"] - self.soft_file_limit = config.get("soft_file_limit") - self.daemonize = config.get("daemonize") - self.pid_file = self.abspath(config.get("pid_file")) - self.public_baseurl = config["public_baseurl"] - - def default_config(self, server_name, **kwargs): - pid_file = self.abspath("pusher.pid") - return """\ - # Slave configuration - - # The replication listener on the synapse to talk to. - #replication_url: https://localhost:{replication_port}/_synapse/replication - - server_name: "%(server_name)s" - - listeners: [] - # Enable a ssh manhole listener on the pusher. - # - type: manhole - # port: {manhole_port} - # bind_address: 127.0.0.1 - # Enable a metric listener on the pusher. - # - type: http - # port: {metrics_port} - # bind_address: 127.0.0.1 - # resources: - # - names: ["metrics"] - # compress: False - - report_stats: False - - daemonize: False - - pid_file: %(pid_file)s - - """ % locals() - - -class PusherSlaveConfig(SlaveConfig, LoggingConfig, EmailConfig): - pass - - class PusherSlaveStore( SlavedEventStore, SlavedPusherStore, SlavedReceiptsStore, SlavedAccountDataStore @@ -163,7 +112,7 @@ class PusherServer(HomeServer): def remove_pusher(self, app_id, push_key, user_id): http_client = self.get_simple_http_client() - replication_url = self.config.replication_url + replication_url = self.config.worker_replication_url url = replication_url + "/remove_pushers" return http_client.post_json_get_json(url, { "remove": [{ @@ -196,8 +145,8 @@ class PusherServer(HomeServer): ) logger.info("Synapse pusher now listening on port %d", port) - def start_listening(self): - for listener in self.config.listeners: + def start_listening(self, listeners): + for listener in listeners: if listener["type"] == "http": self._listen_http(listener) elif listener["type"] == "manhole": @@ -217,7 +166,7 @@ class PusherServer(HomeServer): def replicate(self): http_client = self.get_simple_http_client() store = self.get_datastore() - replication_url = self.config.replication_url + replication_url = self.config.worker_replication_url pusher_pool = self.get_pusherpool() clock = self.get_clock() @@ -290,22 +239,33 @@ class PusherServer(HomeServer): poke_pushers(result) except: logger.exception("Error replicating from %r", replication_url) - sleep(30) + yield sleep(30) -def setup(config_options): +def start(config_options): try: - config = PusherSlaveConfig.load_config( + config = HomeServerConfig.load_config( "Synapse pusher", config_options ) except ConfigError as e: sys.stderr.write("\n" + e.message + "\n") sys.exit(1) - if not config: - sys.exit(0) + assert config.worker_app == "synapse.app.pusher" + + setup_logging(config.worker_log_config, config.worker_log_file) + + if config.start_pushers: + sys.stderr.write( + "\nThe pushers must be disabled in the main synapse process" + "\nbefore they can be run in a separate worker." + "\nPlease add ``start_pushers: false`` to the main config" + "\n" + ) + sys.exit(1) - config.setup_logging() + # Force the pushers to start since they will be disabled in the main config + config.start_pushers = True database_engine = create_engine(config.database_config) @@ -313,14 +273,20 @@ def setup(config_options): config.server_name, db_config=config.database_config, config=config, - version_string=get_version_string("Synapse", synapse), + version_string="Synapse/" + get_version_string(synapse), database_engine=database_engine, ) ps.setup() - ps.start_listening() - - change_resource_limit(ps.config.soft_file_limit) + ps.start_listening(config.worker_listeners) + + def run(): + with LoggingContext("run"): + logger.info("Running") + change_resource_limit(config.soft_file_limit) + if config.gc_thresholds: + gc.set_threshold(*config.gc_thresholds) + reactor.run() def start(): ps.replicate() @@ -329,28 +295,20 @@ def setup(config_options): reactor.callWhenRunning(start) - return ps + if config.worker_daemonize: + daemon = Daemonize( + app="synapse-pusher", + pid=config.worker_pid_file, + action=run, + auto_close_fds=False, + verbose=True, + logger=logger, + ) + daemon.start() + else: + run() if __name__ == '__main__': with LoggingContext("main"): - ps = setup(sys.argv[1:]) - - if ps.config.daemonize: - def run(): - with LoggingContext("run"): - change_resource_limit(ps.config.soft_file_limit) - reactor.run() - - daemon = Daemonize( - app="synapse-pusher", - pid=ps.config.pid_file, - action=run, - auto_close_fds=False, - verbose=True, - logger=logger, - ) - - daemon.start() - else: - reactor.run() + ps = start(sys.argv[1:]) |