diff options
Diffstat (limited to 'synapse/app/pusher.py')
-rw-r--r-- | synapse/app/pusher.py | 69 |
1 files changed, 67 insertions, 2 deletions
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 8922573db7..e6aa0aa65c 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -17,19 +17,25 @@ import synapse from synapse.server import HomeServer -from synapse.util.versionstring import get_version_string from synapse.config._base import ConfigError from synapse.config.database import DatabaseConfig from synapse.config.logger import LoggingConfig +from synapse.http.site import SynapseSite +from synapse.metrics.resource import MetricsResource, METRICS_PREFIX from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.pushers import SlavedPusherStore from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.storage.engines import create_engine from synapse.storage import DataStore from synapse.util.async import sleep -from synapse.util.logcontext import (LoggingContext, preserve_fn) +from synapse.util.httpresourcetree import create_resource_tree +from synapse.util.logcontext import LoggingContext, preserve_fn +from synapse.util.manhole import manhole +from synapse.util.rlimit import change_resource_limit +from synapse.util.versionstring import get_version_string from twisted.internet import reactor, defer +from twisted.web.resource import Resource import sys import logging @@ -46,12 +52,28 @@ class SlaveConfig(DatabaseConfig): ) self.user_agent_suffix = None self.start_pushers = True + self.listeners = config["listeners"] + self.soft_file_limit = config.get("soft_file_limit") def default_config(self, **kwargs): return """\ ## Slave ## + # The replication listener on the synapse to talk to. #replication_url: https://localhost:{replication_port}/_synapse/replication + listeners: [] + # Enable a ssh manhole listener on the pusher. + # - type: manhole + # port: {manhole_port} + # bind_address: 127.0.0.1 + # Enable a metric listener on the pusher. + # - type: http + # port: {metrics_port} + # bind_address: 127.0.0.1 + # resources: + # - names: ["metrics"], + # compress: False + report_stats: False """ @@ -100,6 +122,46 @@ class PusherServer(HomeServer): }] }) + def _listen_http(self, listener_config): + port = listener_config["port"] + bind_address = listener_config.get("bind_address", "") + site_tag = listener_config.get("tag", port) + resources = {} + for res in listener_config["resources"]: + for name in res["names"]: + if name == "metrics": + resources[METRICS_PREFIX] = MetricsResource(self) + + root_resource = create_resource_tree(resources, Resource()) + reactor.listenTCP( + port, + SynapseSite( + "synapse.access.http.%s" % (site_tag,), + site_tag, + listener_config, + root_resource, + ), + interface=bind_address + ) + logger.info("Synapse pusher now listening on port %d", port) + + def start_listening(self): + for listener in self.config.listeners: + if listener["type"] == "http": + self._listen_http(listener) + elif listener["type"] == "manhole": + reactor.listenTCP( + listener["port"], + manhole( + username="matrix", + password="rabbithole", + globals={"hs": self}, + ), + interface=listener.get("bind_address", '127.0.0.1') + ) + else: + logger.warn("Unrecognized listener type: %s", listener["type"]) + @defer.inlineCallbacks def replicate(self): http_client = self.get_simple_http_client() @@ -191,6 +253,9 @@ def setup(config_options): ) ps.setup() + ps.start_listening() + + change_resource_limit(ps.config.soft_file_limit) def start(): ps.replicate() |