summary refs log tree commit diff
path: root/synapse/app/pusher.py
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/app/pusher.py69
1 files changed, 67 insertions, 2 deletions
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index 8922573db7..abb9f1fe8e 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -17,19 +17,25 @@
 import synapse
 
 from synapse.server import HomeServer
-from synapse.util.versionstring import get_version_string
 from synapse.config._base import ConfigError
 from synapse.config.database import DatabaseConfig
 from synapse.config.logger import LoggingConfig
+from synapse.http.site import SynapseSite
+from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
 from synapse.replication.slave.storage.events import SlavedEventStore
 from synapse.replication.slave.storage.pushers import SlavedPusherStore
 from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
 from synapse.storage.engines import create_engine
 from synapse.storage import DataStore
 from synapse.util.async import sleep
-from synapse.util.logcontext import (LoggingContext, preserve_fn)
+from synapse.util.httpresourcetree import create_resource_tree
+from synapse.util.logcontext import LoggingContext, preserve_fn
+from synapse.util.manhole import manhole
+from synapse.util.rlimit import change_resource_limit
+from synapse.util.versionstring import get_version_string
 
 from twisted.internet import reactor, defer
+from twisted.web.resource import Resource
 
 import sys
 import logging
@@ -46,12 +52,28 @@ class SlaveConfig(DatabaseConfig):
         )
         self.user_agent_suffix = None
         self.start_pushers = True
+        self.listeners = config["listeners"]
+        self.soft_file_limit = config.get("soft_file_limit")
 
     def default_config(self, **kwargs):
         return """\
         ## Slave ##
+        # The replication listener on the synapse to talk to.
         #replication_url: https://localhost:{replication_port}/_synapse/replication
 
+        listeners: []
+        # Uncomment to enable a ssh manhole listener on the pusher.
+        # - type: manhole
+        #   port: {manhole_port}
+        #   bind_address: 127.0.0.1
+        # Uncomment to enable a metric listener on the pusher.
+        # - type: http
+        #   port: {metrics_port}
+        #   bind_address: 127.0.0.1
+        #   resources:
+        #    - names: ["metrics"],
+        #      compress: False
+
         report_stats: False
         """
 
@@ -100,6 +122,46 @@ class PusherServer(HomeServer):
             }]
         })
 
+    def _listen_http(self, listener_config):
+        port = listener_config["port"]
+        bind_address = listener_config.get("bind_address", "")
+        site_tag = listener_config.get("tag", port)
+        resources = {}
+        for res in listener_config["resources"]:
+            for name in res["names"]:
+                if name == "metrics":
+                    resources[METRICS_PREFIX] = MetricsResource(self)
+
+        root_resource = create_resource_tree(resources, Resource())
+        reactor.listenTCP(
+            port,
+            SynapseSite(
+                "synapse.access.http.%s" % (site_tag,),
+                site_tag,
+                listener_config,
+                root_resource,
+            ),
+            interface=bind_address
+        )
+        logger.info("Synapse pusher now listening on port %d", port)
+
+    def start_listening(self):
+        for listener in self.config.listeners:
+            if listener["type"] == "http":
+                self._listen_http(listener)
+            elif listener["type"] == "manhole":
+                reactor.listenTCP(
+                    listener["port"],
+                    manhole(
+                        username="matrix",
+                        password="rabbithole",
+                        globals={"hs": self},
+                    ),
+                    interface=listener.get("bind_address", '127.0.0.1')
+                )
+            else:
+                logger.warn("Unrecognized listener type: %s", listener["type"])
+
     @defer.inlineCallbacks
     def replicate(self):
         http_client = self.get_simple_http_client()
@@ -191,6 +253,9 @@ def setup(config_options):
     )
 
     ps.setup()
+    ps.start_listening()
+
+    change_resource_limit(ps.config.soft_file_limit)
 
     def start():
         ps.replicate()