summary refs log tree commit diff
path: root/synapse/app/pusher.py
diff options
context:
space:
mode:
authorDavid Baker <dave@matrix.org>2016-08-11 14:09:13 +0100
committerDavid Baker <dave@matrix.org>2016-08-11 14:09:13 +0100
commitb4ecf0b886c67437901e0af457c5f801ebde9a72 (patch)
treeef66b0684edcfeb4ad68d20375641f4654393f44 /synapse/app/pusher.py
parentInclude the ts the notif was received at (diff)
parentMerge pull request #1003 from matrix-org/erikj/redaction_prev_content (diff)
downloadsynapse-b4ecf0b886c67437901e0af457c5f801ebde9a72.tar.xz
Merge remote-tracking branch 'origin/develop' into dbkr/notifications_api
Diffstat (limited to 'synapse/app/pusher.py')
-rw-r--r--synapse/app/pusher.py136
1 files changed, 47 insertions, 89 deletions
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index 135dd58c15..c8dde0fcb8 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -18,9 +18,8 @@ import synapse
 
 from synapse.server import HomeServer
 from synapse.config._base import ConfigError
-from synapse.config.database import DatabaseConfig
-from synapse.config.logger import LoggingConfig
-from synapse.config.emailconfig import EmailConfig
+from synapse.config.logger import setup_logging
+from synapse.config.homeserver import HomeServerConfig
 from synapse.http.site import SynapseSite
 from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
 from synapse.storage.roommember import RoomMemberStore
@@ -44,61 +43,11 @@ from daemonize import Daemonize
 
 import sys
 import logging
+import gc
 
 logger = logging.getLogger("synapse.app.pusher")
 
 
-class SlaveConfig(DatabaseConfig):
-    def read_config(self, config):
-        self.replication_url = config["replication_url"]
-        self.server_name = config["server_name"]
-        self.use_insecure_ssl_client_just_for_testing_do_not_use = config.get(
-            "use_insecure_ssl_client_just_for_testing_do_not_use", False
-        )
-        self.user_agent_suffix = None
-        self.start_pushers = True
-        self.listeners = config["listeners"]
-        self.soft_file_limit = config.get("soft_file_limit")
-        self.daemonize = config.get("daemonize")
-        self.pid_file = self.abspath(config.get("pid_file"))
-        self.public_baseurl = config["public_baseurl"]
-
-    def default_config(self, server_name, **kwargs):
-        pid_file = self.abspath("pusher.pid")
-        return """\
-        # Slave configuration
-
-        # The replication listener on the synapse to talk to.
-        #replication_url: https://localhost:{replication_port}/_synapse/replication
-
-        server_name: "%(server_name)s"
-
-        listeners: []
-        # Enable a ssh manhole listener on the pusher.
-        # - type: manhole
-        #   port: {manhole_port}
-        #   bind_address: 127.0.0.1
-        # Enable a metric listener on the pusher.
-        # - type: http
-        #   port: {metrics_port}
-        #   bind_address: 127.0.0.1
-        #   resources:
-        #    - names: ["metrics"]
-        #      compress: False
-
-        report_stats: False
-
-        daemonize: False
-
-        pid_file: %(pid_file)s
-
-        """ % locals()
-
-
-class PusherSlaveConfig(SlaveConfig, LoggingConfig, EmailConfig):
-    pass
-
-
 class PusherSlaveStore(
     SlavedEventStore, SlavedPusherStore, SlavedReceiptsStore,
     SlavedAccountDataStore
@@ -163,7 +112,7 @@ class PusherServer(HomeServer):
 
     def remove_pusher(self, app_id, push_key, user_id):
         http_client = self.get_simple_http_client()
-        replication_url = self.config.replication_url
+        replication_url = self.config.worker_replication_url
         url = replication_url + "/remove_pushers"
         return http_client.post_json_get_json(url, {
             "remove": [{
@@ -196,8 +145,8 @@ class PusherServer(HomeServer):
         )
         logger.info("Synapse pusher now listening on port %d", port)
 
-    def start_listening(self):
-        for listener in self.config.listeners:
+    def start_listening(self, listeners):
+        for listener in listeners:
             if listener["type"] == "http":
                 self._listen_http(listener)
             elif listener["type"] == "manhole":
@@ -217,7 +166,7 @@ class PusherServer(HomeServer):
     def replicate(self):
         http_client = self.get_simple_http_client()
         store = self.get_datastore()
-        replication_url = self.config.replication_url
+        replication_url = self.config.worker_replication_url
         pusher_pool = self.get_pusherpool()
         clock = self.get_clock()
 
@@ -290,22 +239,33 @@ class PusherServer(HomeServer):
                 poke_pushers(result)
             except:
                 logger.exception("Error replicating from %r", replication_url)
-                sleep(30)
+                yield sleep(30)
 
 
-def setup(config_options):
+def start(config_options):
     try:
-        config = PusherSlaveConfig.load_config(
+        config = HomeServerConfig.load_config(
             "Synapse pusher", config_options
         )
     except ConfigError as e:
         sys.stderr.write("\n" + e.message + "\n")
         sys.exit(1)
 
-    if not config:
-        sys.exit(0)
+    assert config.worker_app == "synapse.app.pusher"
+
+    setup_logging(config.worker_log_config, config.worker_log_file)
+
+    if config.start_pushers:
+        sys.stderr.write(
+            "\nThe pushers must be disabled in the main synapse process"
+            "\nbefore they can be run in a separate worker."
+            "\nPlease add ``start_pushers: false`` to the main config"
+            "\n"
+        )
+        sys.exit(1)
 
-    config.setup_logging()
+    # Force the pushers to start since they will be disabled in the main config
+    config.start_pushers = True
 
     database_engine = create_engine(config.database_config)
 
@@ -313,14 +273,20 @@ def setup(config_options):
         config.server_name,
         db_config=config.database_config,
         config=config,
-        version_string=get_version_string("Synapse", synapse),
+        version_string="Synapse/" + get_version_string(synapse),
         database_engine=database_engine,
     )
 
     ps.setup()
-    ps.start_listening()
-
-    change_resource_limit(ps.config.soft_file_limit)
+    ps.start_listening(config.worker_listeners)
+
+    def run():
+        with LoggingContext("run"):
+            logger.info("Running")
+            change_resource_limit(config.soft_file_limit)
+            if config.gc_thresholds:
+                gc.set_threshold(*config.gc_thresholds)
+            reactor.run()
 
     def start():
         ps.replicate()
@@ -329,28 +295,20 @@ def setup(config_options):
 
     reactor.callWhenRunning(start)
 
-    return ps
+    if config.worker_daemonize:
+        daemon = Daemonize(
+            app="synapse-pusher",
+            pid=config.worker_pid_file,
+            action=run,
+            auto_close_fds=False,
+            verbose=True,
+            logger=logger,
+        )
+        daemon.start()
+    else:
+        run()
 
 
 if __name__ == '__main__':
     with LoggingContext("main"):
-        ps = setup(sys.argv[1:])
-
-        if ps.config.daemonize:
-            def run():
-                with LoggingContext("run"):
-                    change_resource_limit(ps.config.soft_file_limit)
-                    reactor.run()
-
-            daemon = Daemonize(
-                app="synapse-pusher",
-                pid=ps.config.pid_file,
-                action=run,
-                auto_close_fds=False,
-                verbose=True,
-                logger=logger,
-            )
-
-            daemon.start()
-        else:
-            reactor.run()
+        ps = start(sys.argv[1:])