diff options
-rwxr-xr-x | synapse/app/homeserver.py | 29 | ||||
-rw-r--r-- | synapse/app/pusher.py | 77 | ||||
-rw-r--r-- | synapse/util/manhole.py | 70 |
3 files changed, 151 insertions, 25 deletions
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 1fa93be93e..df675c0ed4 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -32,13 +32,6 @@ from synapse.storage.prepare_database import UpgradeDatabaseException, prepare_d from synapse.server import HomeServer - -from twisted.conch.manhole import ColoredManhole -from twisted.conch.insults import insults -from twisted.conch import manhole_ssh -from twisted.cred import checkers, portal - - from twisted.internet import reactor, task, defer from twisted.application import service from twisted.web.resource import Resource, EncodingResourceWrapper @@ -64,6 +57,7 @@ from synapse.federation.transport.server import TransportLayerServer from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string from synapse.util.httpresourcetree import create_resource_tree +from synapse.util.manhole import manhole from synapse.http.site import SynapseSite @@ -209,24 +203,13 @@ class SynapseHomeServer(HomeServer): if listener["type"] == "http": self._listener_http(config, listener) elif listener["type"] == "manhole": - checker = checkers.InMemoryUsernamePasswordDatabaseDontUse( - matrix="rabbithole" - ) - - rlm = manhole_ssh.TerminalRealm() - rlm.chainedProtocolFactory = lambda: insults.ServerProtocol( - ColoredManhole, - { - "__name__": "__console__", - "hs": self, - } - ) - - f = manhole_ssh.ConchFactory(portal.Portal(rlm, [checker])) - reactor.listenTCP( listener["port"], - f, + manhole( + username="matrix", + password="rabbithole", + globals={"hs": self}, + ), interface=listener.get("bind_address", '127.0.0.1') ) else: diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 8922573db7..9381fe2251 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -17,19 +17,25 @@ import synapse from synapse.server import HomeServer -from synapse.util.versionstring import get_version_string from synapse.config._base import ConfigError from synapse.config.database import DatabaseConfig from synapse.config.logger import LoggingConfig +from synapse.http.site import SynapseSite +from synapse.metrics.resource import MetricsResource, METRICS_PREFIX from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.pushers import SlavedPusherStore from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.storage.engines import create_engine from synapse.storage import DataStore from synapse.util.async import sleep -from synapse.util.logcontext import (LoggingContext, preserve_fn) +from synapse.util.httpresourcetree import create_resource_tree +from synapse.util.logcontext import LoggingContext, preserve_fn +from synapse.util.manhole import manhole +from synapse.util.rlimit import change_resource_limit +from synapse.util.versionstring import get_version_string from twisted.internet import reactor, defer +from twisted.web.resource import Resource import sys import logging @@ -46,12 +52,28 @@ class SlaveConfig(DatabaseConfig): ) self.user_agent_suffix = None self.start_pushers = True + self.listeners = config["listeners"] + self.soft_file_limit = config.get("soft_file_limit") def default_config(self, **kwargs): return """\ ## Slave ## + # The replication listener on the synapse to talk to. #replication_url: https://localhost:{replication_port}/_synapse/replication + listeners: [] + # Enable a ssh manhole listener on the pusher. + # - type: manhole + # port: {manhole_port} + # bind_address: 127.0.0.1 + # Enable a metric listener on the pusher. + # - type: http + # port: {metrics_port} + # bind_address: 127.0.0.1 + # resources: + # - names: ["metrics"], + # compress: False + report_stats: False """ @@ -67,6 +89,14 @@ class PusherSlaveStore( DataStore.update_pusher_last_stream_ordering_and_success.__func__ ) + update_pusher_failing_since = ( + DataStore.update_pusher_failing_since.__func__ + ) + + update_pusher_last_stream_ordering = ( + DataStore.update_pusher_last_stream_ordering.__func__ + ) + class PusherServer(HomeServer): @@ -100,6 +130,46 @@ class PusherServer(HomeServer): }] }) + def _listen_http(self, listener_config): + port = listener_config["port"] + bind_address = listener_config.get("bind_address", "") + site_tag = listener_config.get("tag", port) + resources = {} + for res in listener_config["resources"]: + for name in res["names"]: + if name == "metrics": + resources[METRICS_PREFIX] = MetricsResource(self) + + root_resource = create_resource_tree(resources, Resource()) + reactor.listenTCP( + port, + SynapseSite( + "synapse.access.http.%s" % (site_tag,), + site_tag, + listener_config, + root_resource, + ), + interface=bind_address + ) + logger.info("Synapse pusher now listening on port %d", port) + + def start_listening(self): + for listener in self.config.listeners: + if listener["type"] == "http": + self._listen_http(listener) + elif listener["type"] == "manhole": + reactor.listenTCP( + listener["port"], + manhole( + username="matrix", + password="rabbithole", + globals={"hs": self}, + ), + interface=listener.get("bind_address", '127.0.0.1') + ) + else: + logger.warn("Unrecognized listener type: %s", listener["type"]) + @defer.inlineCallbacks def replicate(self): http_client = self.get_simple_http_client() @@ -191,6 +261,9 @@ def setup(config_options): ) ps.setup() + ps.start_listening() + + change_resource_limit(ps.config.soft_file_limit) def start(): ps.replicate() diff --git a/synapse/util/manhole.py b/synapse/util/manhole.py new file mode 100644 index 0000000000..97e0f00b67 --- /dev/null +++ b/synapse/util/manhole.py @@ -0,0 +1,70 @@ +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.conch.manhole import ColoredManhole +from twisted.conch.insults import insults +from twisted.conch import manhole_ssh +from twisted.cred import checkers, portal +from twisted.conch.ssh.keys import Key + +PUBLIC_KEY = ( + "ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAGEArzJx8OYOnJmzf4tfBEvLi8DVPrJ3/c9k2I/Az" + "64fxjHf9imyRJbixtQhlH9lfNjUIx+4LmrJH5QNRsFporcHDKOTwTTYLh5KmRpslkYHRivcJS" + "kbh/C+BR3utDS555mV" +) + +PRIVATE_KEY = """-----BEGIN RSA PRIVATE KEY----- +MIIByAIBAAJhAK8ycfDmDpyZs3+LXwRLy4vA1T6yd/3PZNiPwM+uH8Yx3/YpskSW +4sbUIZR/ZXzY1CMfuC5qyR+UDUbBaaK3Bwyjk8E02C4eSpkabJZGB0Yr3CUpG4fw +vgUd7rQ0ueeZlQIBIwJgbh+1VZfr7WftK5lu7MHtqE1S1vPWZQYE3+VUn8yJADyb +Z4fsZaCrzW9lkIqXkE3GIY+ojdhZhkO1gbG0118sIgphwSWKRxK0mvh6ERxKqIt1 +xJEJO74EykXZV4oNJ8sjAjEA3J9r2ZghVhGN6V8DnQrTk24Td0E8hU8AcP0FVP+8 +PQm/g/aXf2QQkQT+omdHVEJrAjEAy0pL0EBH6EVS98evDCBtQw22OZT52qXlAwZ2 +gyTriKFVoqjeEjt3SZKKqXHSApP/AjBLpF99zcJJZRq2abgYlf9lv1chkrWqDHUu +DZttmYJeEfiFBBavVYIF1dOlZT0G8jMCMBc7sOSZodFnAiryP+Qg9otSBjJ3bQML +pSTqy7c3a2AScC/YyOwkDaICHnnD3XyjMwIxALRzl0tQEKMXs6hH8ToUdlLROCrP +EhQ0wahUTCk1gKA4uPD6TMTChavbh4K63OvbKg== +-----END RSA PRIVATE KEY-----""" + + +def manhole(username, password, globals): + """Starts a ssh listener with password authentication using + the given username and password. Clients connecting to the ssh + listener will find themselves in a colored python shell with + the supplied globals. + + Args: + username(str): The username ssh clients should auth with. + password(str): The password ssh clients should auth with. + globals(dict): The variables to expose in the shell. + + Returns: + twisted.internet.protocol.Factory: A factory to pass to ``listenTCP`` + """ + + checker = checkers.InMemoryUsernamePasswordDatabaseDontUse( + **{username: password} + ) + + rlm = manhole_ssh.TerminalRealm() + rlm.chainedProtocolFactory = lambda: insults.ServerProtocol( + ColoredManhole, + dict(globals, __name__="__console__") + ) + + factory = manhole_ssh.ConchFactory(portal.Portal(rlm, [checker])) + factory.publicKeys['ssh-rsa'] = Key.fromString(PUBLIC_KEY) + factory.privateKeys['ssh-rsa'] = Key.fromString(PRIVATE_KEY) + + return factory |