diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index 8922573db7..abb9f1fe8e 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -17,19 +17,25 @@
import synapse
from synapse.server import HomeServer
-from synapse.util.versionstring import get_version_string
from synapse.config._base import ConfigError
from synapse.config.database import DatabaseConfig
from synapse.config.logger import LoggingConfig
+from synapse.http.site import SynapseSite
+from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.pushers import SlavedPusherStore
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.storage.engines import create_engine
from synapse.storage import DataStore
from synapse.util.async import sleep
-from synapse.util.logcontext import (LoggingContext, preserve_fn)
+from synapse.util.httpresourcetree import create_resource_tree
+from synapse.util.logcontext import LoggingContext, preserve_fn
+from synapse.util.manhole import manhole
+from synapse.util.rlimit import change_resource_limit
+from synapse.util.versionstring import get_version_string
from twisted.internet import reactor, defer
+from twisted.web.resource import Resource
import sys
import logging
@@ -46,12 +52,28 @@ class SlaveConfig(DatabaseConfig):
)
self.user_agent_suffix = None
self.start_pushers = True
+ self.listeners = config["listeners"]
+ self.soft_file_limit = config.get("soft_file_limit")
def default_config(self, **kwargs):
return """\
## Slave ##
+ # The replication listener on the synapse to talk to.
#replication_url: https://localhost:{replication_port}/_synapse/replication
+ listeners: []
+ # Uncomment to enable a ssh manhole listener on the pusher.
+ # - type: manhole
+ # port: {manhole_port}
+ # bind_address: 127.0.0.1
+ # Uncomment to enable a metric listener on the pusher.
+ # - type: http
+ # port: {metrics_port}
+ # bind_address: 127.0.0.1
+ # resources:
+ # - names: ["metrics"],
+ # compress: False
+
report_stats: False
"""
@@ -100,6 +122,46 @@ class PusherServer(HomeServer):
}]
})
+ def _listen_http(self, listener_config):
+ port = listener_config["port"]
+ bind_address = listener_config.get("bind_address", "")
+ site_tag = listener_config.get("tag", port)
+ resources = {}
+ for res in listener_config["resources"]:
+ for name in res["names"]:
+ if name == "metrics":
+ resources[METRICS_PREFIX] = MetricsResource(self)
+
+ root_resource = create_resource_tree(resources, Resource())
+ reactor.listenTCP(
+ port,
+ SynapseSite(
+ "synapse.access.http.%s" % (site_tag,),
+ site_tag,
+ listener_config,
+ root_resource,
+ ),
+ interface=bind_address
+ )
+ logger.info("Synapse pusher now listening on port %d", port)
+
+ def start_listening(self):
+ for listener in self.config.listeners:
+ if listener["type"] == "http":
+ self._listen_http(listener)
+ elif listener["type"] == "manhole":
+ reactor.listenTCP(
+ listener["port"],
+ manhole(
+ username="matrix",
+ password="rabbithole",
+ globals={"hs": self},
+ ),
+ interface=listener.get("bind_address", '127.0.0.1')
+ )
+ else:
+ logger.warn("Unrecognized listener type: %s", listener["type"])
+
@defer.inlineCallbacks
def replicate(self):
http_client = self.get_simple_http_client()
@@ -191,6 +253,9 @@ def setup(config_options):
)
ps.setup()
+ ps.start_listening()
+
+ change_resource_limit(ps.config.soft_file_limit)
def start():
ps.replicate()
|