diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index 135dd58c15..c8dde0fcb8 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -18,9 +18,8 @@ import synapse
from synapse.server import HomeServer
from synapse.config._base import ConfigError
-from synapse.config.database import DatabaseConfig
-from synapse.config.logger import LoggingConfig
-from synapse.config.emailconfig import EmailConfig
+from synapse.config.logger import setup_logging
+from synapse.config.homeserver import HomeServerConfig
from synapse.http.site import SynapseSite
from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
from synapse.storage.roommember import RoomMemberStore
@@ -44,61 +43,11 @@ from daemonize import Daemonize
import sys
import logging
+import gc
logger = logging.getLogger("synapse.app.pusher")
-class SlaveConfig(DatabaseConfig):
- def read_config(self, config):
- self.replication_url = config["replication_url"]
- self.server_name = config["server_name"]
- self.use_insecure_ssl_client_just_for_testing_do_not_use = config.get(
- "use_insecure_ssl_client_just_for_testing_do_not_use", False
- )
- self.user_agent_suffix = None
- self.start_pushers = True
- self.listeners = config["listeners"]
- self.soft_file_limit = config.get("soft_file_limit")
- self.daemonize = config.get("daemonize")
- self.pid_file = self.abspath(config.get("pid_file"))
- self.public_baseurl = config["public_baseurl"]
-
- def default_config(self, server_name, **kwargs):
- pid_file = self.abspath("pusher.pid")
- return """\
- # Slave configuration
-
- # The replication listener on the synapse to talk to.
- #replication_url: https://localhost:{replication_port}/_synapse/replication
-
- server_name: "%(server_name)s"
-
- listeners: []
- # Enable a ssh manhole listener on the pusher.
- # - type: manhole
- # port: {manhole_port}
- # bind_address: 127.0.0.1
- # Enable a metric listener on the pusher.
- # - type: http
- # port: {metrics_port}
- # bind_address: 127.0.0.1
- # resources:
- # - names: ["metrics"]
- # compress: False
-
- report_stats: False
-
- daemonize: False
-
- pid_file: %(pid_file)s
-
- """ % locals()
-
-
-class PusherSlaveConfig(SlaveConfig, LoggingConfig, EmailConfig):
- pass
-
-
class PusherSlaveStore(
SlavedEventStore, SlavedPusherStore, SlavedReceiptsStore,
SlavedAccountDataStore
@@ -163,7 +112,7 @@ class PusherServer(HomeServer):
def remove_pusher(self, app_id, push_key, user_id):
http_client = self.get_simple_http_client()
- replication_url = self.config.replication_url
+ replication_url = self.config.worker_replication_url
url = replication_url + "/remove_pushers"
return http_client.post_json_get_json(url, {
"remove": [{
@@ -196,8 +145,8 @@ class PusherServer(HomeServer):
)
logger.info("Synapse pusher now listening on port %d", port)
- def start_listening(self):
- for listener in self.config.listeners:
+ def start_listening(self, listeners):
+ for listener in listeners:
if listener["type"] == "http":
self._listen_http(listener)
elif listener["type"] == "manhole":
@@ -217,7 +166,7 @@ class PusherServer(HomeServer):
def replicate(self):
http_client = self.get_simple_http_client()
store = self.get_datastore()
- replication_url = self.config.replication_url
+ replication_url = self.config.worker_replication_url
pusher_pool = self.get_pusherpool()
clock = self.get_clock()
@@ -290,22 +239,33 @@ class PusherServer(HomeServer):
poke_pushers(result)
except:
logger.exception("Error replicating from %r", replication_url)
- sleep(30)
+ yield sleep(30)
-def setup(config_options):
+def start(config_options):
try:
- config = PusherSlaveConfig.load_config(
+ config = HomeServerConfig.load_config(
"Synapse pusher", config_options
)
except ConfigError as e:
sys.stderr.write("\n" + e.message + "\n")
sys.exit(1)
- if not config:
- sys.exit(0)
+ assert config.worker_app == "synapse.app.pusher"
+
+ setup_logging(config.worker_log_config, config.worker_log_file)
+
+ if config.start_pushers:
+ sys.stderr.write(
+ "\nThe pushers must be disabled in the main synapse process"
+ "\nbefore they can be run in a separate worker."
+ "\nPlease add ``start_pushers: false`` to the main config"
+ "\n"
+ )
+ sys.exit(1)
- config.setup_logging()
+ # Force the pushers to start since they will be disabled in the main config
+ config.start_pushers = True
database_engine = create_engine(config.database_config)
@@ -313,14 +273,20 @@ def setup(config_options):
config.server_name,
db_config=config.database_config,
config=config,
- version_string=get_version_string("Synapse", synapse),
+ version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,
)
ps.setup()
- ps.start_listening()
-
- change_resource_limit(ps.config.soft_file_limit)
+ ps.start_listening(config.worker_listeners)
+
+ def run():
+ with LoggingContext("run"):
+ logger.info("Running")
+ change_resource_limit(config.soft_file_limit)
+ if config.gc_thresholds:
+ gc.set_threshold(*config.gc_thresholds)
+ reactor.run()
def start():
ps.replicate()
@@ -329,28 +295,20 @@ def setup(config_options):
reactor.callWhenRunning(start)
- return ps
+ if config.worker_daemonize:
+ daemon = Daemonize(
+ app="synapse-pusher",
+ pid=config.worker_pid_file,
+ action=run,
+ auto_close_fds=False,
+ verbose=True,
+ logger=logger,
+ )
+ daemon.start()
+ else:
+ run()
if __name__ == '__main__':
with LoggingContext("main"):
- ps = setup(sys.argv[1:])
-
- if ps.config.daemonize:
- def run():
- with LoggingContext("run"):
- change_resource_limit(ps.config.soft_file_limit)
- reactor.run()
-
- daemon = Daemonize(
- app="synapse-pusher",
- pid=ps.config.pid_file,
- action=run,
- auto_close_fds=False,
- verbose=True,
- logger=logger,
- )
-
- daemon.start()
- else:
- reactor.run()
+ ps = start(sys.argv[1:])
|