From a3ac837599f62b77f458505f841cee6072c1f921 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 21 Apr 2016 17:21:02 +0100 Subject: Optionally split out the pushers into a separate process --- synapse/app/pusher.py | 208 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 208 insertions(+) create mode 100644 synapse/app/pusher.py (limited to 'synapse/app/pusher.py') diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py new file mode 100644 index 0000000000..8922573db7 --- /dev/null +++ b/synapse/app/pusher.py @@ -0,0 +1,208 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import synapse + +from synapse.server import HomeServer +from synapse.util.versionstring import get_version_string +from synapse.config._base import ConfigError +from synapse.config.database import DatabaseConfig +from synapse.config.logger import LoggingConfig +from synapse.replication.slave.storage.events import SlavedEventStore +from synapse.replication.slave.storage.pushers import SlavedPusherStore +from synapse.replication.slave.storage.receipts import SlavedReceiptsStore +from synapse.storage.engines import create_engine +from synapse.storage import DataStore +from synapse.util.async import sleep +from synapse.util.logcontext import (LoggingContext, preserve_fn) + +from twisted.internet import reactor, defer + +import sys +import logging + +logger = logging.getLogger("synapse.app.pusher") + + +class SlaveConfig(DatabaseConfig): + def read_config(self, config): + self.replication_url = config["replication_url"] + self.server_name = config["server_name"] + self.use_insecure_ssl_client_just_for_testing_do_not_use = config.get( + "use_insecure_ssl_client_just_for_testing_do_not_use", False + ) + self.user_agent_suffix = None + self.start_pushers = True + + def default_config(self, **kwargs): + return """\ + ## Slave ## + #replication_url: https://localhost:{replication_port}/_synapse/replication + + report_stats: False + """ + + +class PusherSlaveConfig(SlaveConfig, LoggingConfig): + pass + + +class PusherSlaveStore( + SlavedEventStore, SlavedPusherStore, SlavedReceiptsStore +): + update_pusher_last_stream_ordering_and_success = ( + DataStore.update_pusher_last_stream_ordering_and_success.__func__ + ) + + +class PusherServer(HomeServer): + + def get_db_conn(self, run_new_connection=True): + # Any param beginning with cp_ is a parameter for adbapi, and should + # not be passed to the database engine. + db_params = { + k: v for k, v in self.db_config.get("args", {}).items() + if not k.startswith("cp_") + } + db_conn = self.database_engine.module.connect(**db_params) + + if run_new_connection: + self.database_engine.on_new_connection(db_conn) + return db_conn + + def setup(self): + logger.info("Setting up.") + self.datastore = PusherSlaveStore(self.get_db_conn(), self) + logger.info("Finished setting up.") + + def remove_pusher(self, app_id, push_key, user_id): + http_client = self.get_simple_http_client() + replication_url = self.config.replication_url + url = replication_url + "/remove_pushers" + return http_client.post_json_get_json(url, { + "remove": [{ + "app_id": app_id, + "push_key": push_key, + "user_id": user_id, + }] + }) + + @defer.inlineCallbacks + def replicate(self): + http_client = self.get_simple_http_client() + store = self.get_datastore() + replication_url = self.config.replication_url + pusher_pool = self.get_pusherpool() + + def stop_pusher(user_id, app_id, pushkey): + key = "%s:%s" % (app_id, pushkey) + pushers_for_user = pusher_pool.pushers.get(user_id, {}) + pusher = pushers_for_user.pop(key, None) + if pusher is None: + return + logger.info("Stopping pusher %r / %r", user_id, key) + pusher.on_stop() + + def start_pusher(user_id, app_id, pushkey): + key = "%s:%s" % (app_id, pushkey) + logger.info("Starting pusher %r / %r", user_id, key) + return pusher_pool._refresh_pusher(app_id, pushkey, user_id) + + @defer.inlineCallbacks + def poke_pushers(results): + pushers_rows = set( + map(tuple, results.get("pushers", {}).get("rows", [])) + ) + deleted_pushers_rows = set( + map(tuple, results.get("deleted_pushers", {}).get("rows", [])) + ) + for row in sorted(pushers_rows | deleted_pushers_rows): + if row in deleted_pushers_rows: + user_id, app_id, pushkey = row[1:4] + stop_pusher(user_id, app_id, pushkey) + elif row in pushers_rows: + user_id = row[1] + app_id = row[5] + pushkey = row[8] + yield start_pusher(user_id, app_id, pushkey) + + stream = results.get("events") + if stream: + min_stream_id = stream["rows"][0][0] + max_stream_id = stream["position"] + preserve_fn(pusher_pool.on_new_notifications)( + min_stream_id, max_stream_id + ) + + stream = results.get("receipts") + if stream: + rows = stream["rows"] + affected_room_ids = set(row[1] for row in rows) + min_stream_id = rows[0][0] + max_stream_id = stream["position"] + preserve_fn(pusher_pool.on_new_receipts)( + min_stream_id, max_stream_id, affected_room_ids + ) + + while True: + try: + args = store.stream_positions() + args["timeout"] = 30000 + result = yield http_client.get_json(replication_url, args=args) + yield store.process_replication(result) + poke_pushers(result) + except: + logger.exception("Error replicating from %r", replication_url) + sleep(30) + + +def setup(config_options): + try: + config = PusherSlaveConfig.load_config( + "Synapse pusher", config_options + ) + except ConfigError as e: + sys.stderr.write("\n" + e.message + "\n") + sys.exit(1) + + config.setup_logging() + + database_engine = create_engine(config.database_config) + + ps = PusherServer( + config.server_name, + db_config=config.database_config, + config=config, + version_string=get_version_string("Synapse", synapse), + database_engine=database_engine, + ) + + ps.setup() + + def start(): + ps.replicate() + ps.get_pusherpool().start() + ps.get_datastore().start_profiling() + + reactor.callWhenRunning(start) + + return ps + + +if __name__ == '__main__': + with LoggingContext("main"): + ps = setup(sys.argv[1:]) + reactor.run() -- cgit 1.4.1 From 72e2fafa207c28581c62bcce2f1a6ede410fee5a Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 25 Apr 2016 17:34:25 +0100 Subject: Add a metrics listener and a ssh listener to the pusher --- synapse/app/pusher.py | 69 +++++++++++++++++++++++++++++++++++++++++++++++-- synapse/util/manhole.py | 26 ++++++++++++++++++- 2 files changed, 92 insertions(+), 3 deletions(-) (limited to 'synapse/app/pusher.py') diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 8922573db7..abb9f1fe8e 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -17,19 +17,25 @@ import synapse from synapse.server import HomeServer -from synapse.util.versionstring import get_version_string from synapse.config._base import ConfigError from synapse.config.database import DatabaseConfig from synapse.config.logger import LoggingConfig +from synapse.http.site import SynapseSite +from synapse.metrics.resource import MetricsResource, METRICS_PREFIX from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.pushers import SlavedPusherStore from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.storage.engines import create_engine from synapse.storage import DataStore from synapse.util.async import sleep -from synapse.util.logcontext import (LoggingContext, preserve_fn) +from synapse.util.httpresourcetree import create_resource_tree +from synapse.util.logcontext import LoggingContext, preserve_fn +from synapse.util.manhole import manhole +from synapse.util.rlimit import change_resource_limit +from synapse.util.versionstring import get_version_string from twisted.internet import reactor, defer +from twisted.web.resource import Resource import sys import logging @@ -46,12 +52,28 @@ class SlaveConfig(DatabaseConfig): ) self.user_agent_suffix = None self.start_pushers = True + self.listeners = config["listeners"] + self.soft_file_limit = config.get("soft_file_limit") def default_config(self, **kwargs): return """\ ## Slave ## + # The replication listener on the synapse to talk to. #replication_url: https://localhost:{replication_port}/_synapse/replication + listeners: [] + # Uncomment to enable a ssh manhole listener on the pusher. + # - type: manhole + # port: {manhole_port} + # bind_address: 127.0.0.1 + # Uncomment to enable a metric listener on the pusher. + # - type: http + # port: {metrics_port} + # bind_address: 127.0.0.1 + # resources: + # - names: ["metrics"], + # compress: False + report_stats: False """ @@ -100,6 +122,46 @@ class PusherServer(HomeServer): }] }) + def _listen_http(self, listener_config): + port = listener_config["port"] + bind_address = listener_config.get("bind_address", "") + site_tag = listener_config.get("tag", port) + resources = {} + for res in listener_config["resources"]: + for name in res["names"]: + if name == "metrics": + resources[METRICS_PREFIX] = MetricsResource(self) + + root_resource = create_resource_tree(resources, Resource()) + reactor.listenTCP( + port, + SynapseSite( + "synapse.access.http.%s" % (site_tag,), + site_tag, + listener_config, + root_resource, + ), + interface=bind_address + ) + logger.info("Synapse pusher now listening on port %d", port) + + def start_listening(self): + for listener in self.config.listeners: + if listener["type"] == "http": + self._listen_http(listener) + elif listener["type"] == "manhole": + reactor.listenTCP( + listener["port"], + manhole( + username="matrix", + password="rabbithole", + globals={"hs": self}, + ), + interface=listener.get("bind_address", '127.0.0.1') + ) + else: + logger.warn("Unrecognized listener type: %s", listener["type"]) + @defer.inlineCallbacks def replicate(self): http_client = self.get_simple_http_client() @@ -191,6 +253,9 @@ def setup(config_options): ) ps.setup() + ps.start_listening() + + change_resource_limit(ps.config.soft_file_limit) def start(): ps.replicate() diff --git a/synapse/util/manhole.py b/synapse/util/manhole.py index 9b106cdf47..97e0f00b67 100644 --- a/synapse/util/manhole.py +++ b/synapse/util/manhole.py @@ -16,6 +16,26 @@ from twisted.conch.manhole import ColoredManhole from twisted.conch.insults import insults from twisted.conch import manhole_ssh from twisted.cred import checkers, portal +from twisted.conch.ssh.keys import Key + +PUBLIC_KEY = ( + "ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAGEArzJx8OYOnJmzf4tfBEvLi8DVPrJ3/c9k2I/Az" + "64fxjHf9imyRJbixtQhlH9lfNjUIx+4LmrJH5QNRsFporcHDKOTwTTYLh5KmRpslkYHRivcJS" + "kbh/C+BR3utDS555mV" +) + +PRIVATE_KEY = """-----BEGIN RSA PRIVATE KEY----- +MIIByAIBAAJhAK8ycfDmDpyZs3+LXwRLy4vA1T6yd/3PZNiPwM+uH8Yx3/YpskSW +4sbUIZR/ZXzY1CMfuC5qyR+UDUbBaaK3Bwyjk8E02C4eSpkabJZGB0Yr3CUpG4fw +vgUd7rQ0ueeZlQIBIwJgbh+1VZfr7WftK5lu7MHtqE1S1vPWZQYE3+VUn8yJADyb +Z4fsZaCrzW9lkIqXkE3GIY+ojdhZhkO1gbG0118sIgphwSWKRxK0mvh6ERxKqIt1 +xJEJO74EykXZV4oNJ8sjAjEA3J9r2ZghVhGN6V8DnQrTk24Td0E8hU8AcP0FVP+8 +PQm/g/aXf2QQkQT+omdHVEJrAjEAy0pL0EBH6EVS98evDCBtQw22OZT52qXlAwZ2 +gyTriKFVoqjeEjt3SZKKqXHSApP/AjBLpF99zcJJZRq2abgYlf9lv1chkrWqDHUu +DZttmYJeEfiFBBavVYIF1dOlZT0G8jMCMBc7sOSZodFnAiryP+Qg9otSBjJ3bQML +pSTqy7c3a2AScC/YyOwkDaICHnnD3XyjMwIxALRzl0tQEKMXs6hH8ToUdlLROCrP +EhQ0wahUTCk1gKA4uPD6TMTChavbh4K63OvbKg== +-----END RSA PRIVATE KEY-----""" def manhole(username, password, globals): @@ -43,4 +63,8 @@ def manhole(username, password, globals): dict(globals, __name__="__console__") ) - return manhole_ssh.ConchFactory(portal.Portal(rlm, [checker])) + factory = manhole_ssh.ConchFactory(portal.Portal(rlm, [checker])) + factory.publicKeys['ssh-rsa'] = Key.fromString(PUBLIC_KEY) + factory.privateKeys['ssh-rsa'] = Key.fromString(PRIVATE_KEY) + + return factory -- cgit 1.4.1 From f15e9e8de4329ed7f726d3405c8aa37a52c8db24 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 25 Apr 2016 17:56:24 +0100 Subject: Remove the uncomments from the comments --- synapse/app/pusher.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/app/pusher.py') diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index abb9f1fe8e..e6aa0aa65c 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -62,11 +62,11 @@ class SlaveConfig(DatabaseConfig): #replication_url: https://localhost:{replication_port}/_synapse/replication listeners: [] - # Uncomment to enable a ssh manhole listener on the pusher. + # Enable a ssh manhole listener on the pusher. # - type: manhole # port: {manhole_port} # bind_address: 127.0.0.1 - # Uncomment to enable a metric listener on the pusher. + # Enable a metric listener on the pusher. # - type: http # port: {metrics_port} # bind_address: 127.0.0.1 -- cgit 1.4.1 From 9c417c54d461be6877b58220311c7c1bb83dabb1 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 26 Apr 2016 10:45:02 +0100 Subject: Add a couple of update methods to the PusherSlaveStore --- synapse/app/pusher.py | 8 ++++++++ 1 file changed, 8 insertions(+) (limited to 'synapse/app/pusher.py') diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index e6aa0aa65c..9381fe2251 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -89,6 +89,14 @@ class PusherSlaveStore( DataStore.update_pusher_last_stream_ordering_and_success.__func__ ) + update_pusher_failing_since = ( + DataStore.update_pusher_failing_since.__func__ + ) + + update_pusher_last_stream_ordering = ( + DataStore.update_pusher_last_stream_ordering.__func__ + ) + class PusherServer(HomeServer): -- cgit 1.4.1 From 6df5a6a833620a6388c90850be4d457bc4c5c4eb Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 26 Apr 2016 15:37:41 +0100 Subject: Optionally daemonize the pusher --- synapse/app/pusher.py | 35 +++++++++++++++++++++++++++++++---- 1 file changed, 31 insertions(+), 4 deletions(-) (limited to 'synapse/app/pusher.py') diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 9381fe2251..5f3200cf4c 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -37,6 +37,8 @@ from synapse.util.versionstring import get_version_string from twisted.internet import reactor, defer from twisted.web.resource import Resource +from daemonize import Daemonize + import sys import logging @@ -54,13 +56,19 @@ class SlaveConfig(DatabaseConfig): self.start_pushers = True self.listeners = config["listeners"] self.soft_file_limit = config.get("soft_file_limit") + self.daemonize = config.get("daemonize") + self.pid_file = self.abspath(config.get("pid_file")) - def default_config(self, **kwargs): + def default_config(self, server_name, **kwargs): + pid_file = self.abspath("pusher.pid") return """\ - ## Slave ## + # Slave configuration + # The replication listener on the synapse to talk to. #replication_url: https://localhost:{replication_port}/_synapse/replication + server_name: "%(server_name)s" + listeners: [] # Enable a ssh manhole listener on the pusher. # - type: manhole @@ -75,7 +83,12 @@ class SlaveConfig(DatabaseConfig): # compress: False report_stats: False - """ + + daemonize: False + + pid_file: %(pid_file)s + + """ % locals() class PusherSlaveConfig(SlaveConfig, LoggingConfig): @@ -248,6 +261,9 @@ def setup(config_options): sys.stderr.write("\n" + e.message + "\n") sys.exit(1) + if not config: + sys.exit(0) + config.setup_logging() database_engine = create_engine(config.database_config) @@ -278,4 +294,15 @@ def setup(config_options): if __name__ == '__main__': with LoggingContext("main"): ps = setup(sys.argv[1:]) - reactor.run() + + if ps.config.daemonize: + daemon = Daemonize( + app="synapse-pusher", + pid=ps.config.pid_file, + action=reactor.run, + auto_close_fds=False, + verbose=True, + logger=logger, + ) + else: + reactor.run() -- cgit 1.4.1 From b80b93ea0f47f9854bc093c72f4f0bd42898fabe Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 26 Apr 2016 15:57:28 +0100 Subject: Add a log context to the daemonized pusher --- synapse/app/pusher.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) (limited to 'synapse/app/pusher.py') diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 5f3200cf4c..a67650b5d1 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -296,10 +296,15 @@ if __name__ == '__main__': ps = setup(sys.argv[1:]) if ps.config.daemonize: + def run(): + with LoggingContext("run"): + change_resource_limit(ps.config.soft_file_limit) + reactor.run() + daemon = Daemonize( app="synapse-pusher", pid=ps.config.pid_file, - action=reactor.run, + action=run, auto_close_fds=False, verbose=True, logger=logger, -- cgit 1.4.1 From c9eab73f2a5c0e61ffd0de46d8bd4750f53e7ccb Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 26 Apr 2016 17:06:04 +0100 Subject: Fix typo in default pusher config --- synapse/app/pusher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/app/pusher.py') diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index a67650b5d1..230156559e 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -79,7 +79,7 @@ class SlaveConfig(DatabaseConfig): # port: {metrics_port} # bind_address: 127.0.0.1 # resources: - # - names: ["metrics"], + # - names: ["metrics"] # compress: False report_stats: False -- cgit 1.4.1 From 71df32719050892eae42cd741eaffc3dcf2eb603 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 26 Apr 2016 17:07:09 +0100 Subject: Actually start the pusher daemon --- synapse/app/pusher.py | 2 ++ 1 file changed, 2 insertions(+) (limited to 'synapse/app/pusher.py') diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 230156559e..b5339f030d 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -309,5 +309,7 @@ if __name__ == '__main__': verbose=True, logger=logger, ) + + daemon.start() else: reactor.run() -- cgit 1.4.1