From 8aab9d87fa6739345810f0edf3982fe7f898ee30 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 6 Apr 2016 14:08:18 +0100 Subject: Don't require config to create database --- synapse/app/homeserver.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) (limited to 'synapse/app') diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index fcdc8e6e10..2b4473b9ac 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -33,7 +33,7 @@ from synapse.python_dependencies import ( from synapse.rest import ClientRestResource from synapse.storage.engines import create_engine, IncorrectDatabaseSetup from synapse.storage import are_all_users_on_domain -from synapse.storage.prepare_database import UpgradeDatabaseException +from synapse.storage.prepare_database import UpgradeDatabaseException, prepare_database from synapse.server import HomeServer @@ -245,7 +245,7 @@ class SynapseHomeServer(HomeServer): except IncorrectDatabaseSetup as e: quit_with_error(e.message) - def get_db_conn(self): + def get_db_conn(self, run_new_connection=True): # Any param beginning with cp_ is a parameter for adbapi, and should # not be passed to the database engine. db_params = { @@ -254,7 +254,8 @@ class SynapseHomeServer(HomeServer): } db_conn = self.database_engine.module.connect(**db_params) - self.database_engine.on_new_connection(db_conn) + if run_new_connection: + self.database_engine.on_new_connection(db_conn) return db_conn @@ -386,7 +387,7 @@ def setup(config_options): tls_server_context_factory = context_factory.ServerContextFactory(config) - database_engine = create_engine(config) + database_engine = create_engine(config.database_config) config.database_config["args"]["cp_openfun"] = database_engine.on_new_connection hs = SynapseHomeServer( @@ -402,8 +403,10 @@ def setup(config_options): logger.info("Preparing database: %s...", config.database_config['name']) try: - db_conn = hs.get_db_conn() - database_engine.prepare_database(db_conn) + db_conn = hs.get_db_conn(run_new_connection=False) + prepare_database(db_conn, database_engine, config=config) + database_engine.on_new_connection(db_conn) + hs.run_startup_checks(db_conn, database_engine) db_conn.commit() -- cgit 1.5.1 From 82d7eea7e3127445d745b5a1e2f2c636a590067e Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 11 Apr 2016 14:57:09 +0100 Subject: Move the versionstring code out of app.homeserver into util --- synapse/app/homeserver.py | 87 ++----------------------------------------- synapse/util/rlimit.py | 37 ++++++++++++++++++ synapse/util/versionstring.py | 84 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 125 insertions(+), 83 deletions(-) create mode 100644 synapse/util/rlimit.py create mode 100644 synapse/util/versionstring.py (limited to 'synapse/app') diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 2b4473b9ac..d2085a9405 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -20,8 +20,6 @@ import contextlib import logging import os import re -import resource -import subprocess import sys import time from synapse.config._base import ConfigError @@ -66,6 +64,9 @@ from synapse.metrics.resource import MetricsResource, METRICS_PREFIX from synapse.replication.resource import ReplicationResource, REPLICATION_PREFIX from synapse.federation.transport.server import TransportLayerServer +from synapse.util.rlimit import change_resource_limit +from synapse.util.versionstring import get_version_string + from synapse import events from daemonize import Daemonize @@ -269,86 +270,6 @@ def quit_with_error(error_string): sys.exit(1) -def get_version_string(): - try: - null = open(os.devnull, 'w') - cwd = os.path.dirname(os.path.abspath(__file__)) - try: - git_branch = subprocess.check_output( - ['git', 'rev-parse', '--abbrev-ref', 'HEAD'], - stderr=null, - cwd=cwd, - ).strip() - git_branch = "b=" + git_branch - except subprocess.CalledProcessError: - git_branch = "" - - try: - git_tag = subprocess.check_output( - ['git', 'describe', '--exact-match'], - stderr=null, - cwd=cwd, - ).strip() - git_tag = "t=" + git_tag - except subprocess.CalledProcessError: - git_tag = "" - - try: - git_commit = subprocess.check_output( - ['git', 'rev-parse', '--short', 'HEAD'], - stderr=null, - cwd=cwd, - ).strip() - except subprocess.CalledProcessError: - git_commit = "" - - try: - dirty_string = "-this_is_a_dirty_checkout" - is_dirty = subprocess.check_output( - ['git', 'describe', '--dirty=' + dirty_string], - stderr=null, - cwd=cwd, - ).strip().endswith(dirty_string) - - git_dirty = "dirty" if is_dirty else "" - except subprocess.CalledProcessError: - git_dirty = "" - - if git_branch or git_tag or git_commit or git_dirty: - git_version = ",".join( - s for s in - (git_branch, git_tag, git_commit, git_dirty,) - if s - ) - - return ( - "Synapse/%s (%s)" % ( - synapse.__version__, git_version, - ) - ).encode("ascii") - except Exception as e: - logger.info("Failed to check for git repository: %s", e) - - return ("Synapse/%s" % (synapse.__version__,)).encode("ascii") - - -def change_resource_limit(soft_file_no): - try: - soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE) - - if not soft_file_no: - soft_file_no = hard - - resource.setrlimit(resource.RLIMIT_NOFILE, (soft_file_no, hard)) - logger.info("Set file limit to: %d", soft_file_no) - - resource.setrlimit( - resource.RLIMIT_CORE, (resource.RLIM_INFINITY, resource.RLIM_INFINITY) - ) - except (ValueError, resource.error) as e: - logger.warn("Failed to set file or core limit: %s", e) - - def setup(config_options): """ Args: @@ -378,7 +299,7 @@ def setup(config_options): # check any extra requirements we have now we have a config check_requirements(config) - version_string = get_version_string() + version_string = get_version_string("Synapse", synapse) logger.info("Server hostname: %s", config.server_name) logger.info("Server version: %s", version_string) diff --git a/synapse/util/rlimit.py b/synapse/util/rlimit.py new file mode 100644 index 0000000000..f4a9abf83f --- /dev/null +++ b/synapse/util/rlimit.py @@ -0,0 +1,37 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import resource +import logging + + +logger = logging.getLogger("synapse.app.homeserver") + + +def change_resource_limit(soft_file_no): + try: + soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE) + + if not soft_file_no: + soft_file_no = hard + + resource.setrlimit(resource.RLIMIT_NOFILE, (soft_file_no, hard)) + logger.info("Set file limit to: %d", soft_file_no) + + resource.setrlimit( + resource.RLIMIT_CORE, (resource.RLIM_INFINITY, resource.RLIM_INFINITY) + ) + except (ValueError, resource.error) as e: + logger.warn("Failed to set file or core limit: %s", e) diff --git a/synapse/util/versionstring.py b/synapse/util/versionstring.py new file mode 100644 index 0000000000..a4f156cb3b --- /dev/null +++ b/synapse/util/versionstring.py @@ -0,0 +1,84 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import subprocess +import os +import logging + +logger = logging.getLogger(__name__) + + +def get_version_string(name, module): + try: + null = open(os.devnull, 'w') + cwd = os.path.dirname(os.path.abspath(module.__file__)) + try: + git_branch = subprocess.check_output( + ['git', 'rev-parse', '--abbrev-ref', 'HEAD'], + stderr=null, + cwd=cwd, + ).strip() + git_branch = "b=" + git_branch + except subprocess.CalledProcessError: + git_branch = "" + + try: + git_tag = subprocess.check_output( + ['git', 'describe', '--exact-match'], + stderr=null, + cwd=cwd, + ).strip() + git_tag = "t=" + git_tag + except subprocess.CalledProcessError: + git_tag = "" + + try: + git_commit = subprocess.check_output( + ['git', 'rev-parse', '--short', 'HEAD'], + stderr=null, + cwd=cwd, + ).strip() + except subprocess.CalledProcessError: + git_commit = "" + + try: + dirty_string = "-this_is_a_dirty_checkout" + is_dirty = subprocess.check_output( + ['git', 'describe', '--dirty=' + dirty_string], + stderr=null, + cwd=cwd, + ).strip().endswith(dirty_string) + + git_dirty = "dirty" if is_dirty else "" + except subprocess.CalledProcessError: + git_dirty = "" + + if git_branch or git_tag or git_commit or git_dirty: + git_version = ",".join( + s for s in + (git_branch, git_tag, git_commit, git_dirty,) + if s + ) + + return ( + "%s/%s (%s)" % ( + name, module.__version__, git_version, + ) + ).encode("ascii") + except Exception as e: + logger.info("Failed to check for git repository: %s", e) + + return ("%s/%s" % (name, module.__version__,)).encode("ascii") -- cgit 1.5.1 From a3ac837599f62b77f458505f841cee6072c1f921 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 21 Apr 2016 17:21:02 +0100 Subject: Optionally split out the pushers into a separate process --- synapse/app/pusher.py | 208 +++++++++++++++++++++++++++++++++++++++++++++ synapse/config/server.py | 1 + synapse/push/httppusher.py | 2 +- synapse/push/pusherpool.py | 4 + synapse/server.py | 3 + 5 files changed, 217 insertions(+), 1 deletion(-) create mode 100644 synapse/app/pusher.py (limited to 'synapse/app') diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py new file mode 100644 index 0000000000..8922573db7 --- /dev/null +++ b/synapse/app/pusher.py @@ -0,0 +1,208 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import synapse + +from synapse.server import HomeServer +from synapse.util.versionstring import get_version_string +from synapse.config._base import ConfigError +from synapse.config.database import DatabaseConfig +from synapse.config.logger import LoggingConfig +from synapse.replication.slave.storage.events import SlavedEventStore +from synapse.replication.slave.storage.pushers import SlavedPusherStore +from synapse.replication.slave.storage.receipts import SlavedReceiptsStore +from synapse.storage.engines import create_engine +from synapse.storage import DataStore +from synapse.util.async import sleep +from synapse.util.logcontext import (LoggingContext, preserve_fn) + +from twisted.internet import reactor, defer + +import sys +import logging + +logger = logging.getLogger("synapse.app.pusher") + + +class SlaveConfig(DatabaseConfig): + def read_config(self, config): + self.replication_url = config["replication_url"] + self.server_name = config["server_name"] + self.use_insecure_ssl_client_just_for_testing_do_not_use = config.get( + "use_insecure_ssl_client_just_for_testing_do_not_use", False + ) + self.user_agent_suffix = None + self.start_pushers = True + + def default_config(self, **kwargs): + return """\ + ## Slave ## + #replication_url: https://localhost:{replication_port}/_synapse/replication + + report_stats: False + """ + + +class PusherSlaveConfig(SlaveConfig, LoggingConfig): + pass + + +class PusherSlaveStore( + SlavedEventStore, SlavedPusherStore, SlavedReceiptsStore +): + update_pusher_last_stream_ordering_and_success = ( + DataStore.update_pusher_last_stream_ordering_and_success.__func__ + ) + + +class PusherServer(HomeServer): + + def get_db_conn(self, run_new_connection=True): + # Any param beginning with cp_ is a parameter for adbapi, and should + # not be passed to the database engine. + db_params = { + k: v for k, v in self.db_config.get("args", {}).items() + if not k.startswith("cp_") + } + db_conn = self.database_engine.module.connect(**db_params) + + if run_new_connection: + self.database_engine.on_new_connection(db_conn) + return db_conn + + def setup(self): + logger.info("Setting up.") + self.datastore = PusherSlaveStore(self.get_db_conn(), self) + logger.info("Finished setting up.") + + def remove_pusher(self, app_id, push_key, user_id): + http_client = self.get_simple_http_client() + replication_url = self.config.replication_url + url = replication_url + "/remove_pushers" + return http_client.post_json_get_json(url, { + "remove": [{ + "app_id": app_id, + "push_key": push_key, + "user_id": user_id, + }] + }) + + @defer.inlineCallbacks + def replicate(self): + http_client = self.get_simple_http_client() + store = self.get_datastore() + replication_url = self.config.replication_url + pusher_pool = self.get_pusherpool() + + def stop_pusher(user_id, app_id, pushkey): + key = "%s:%s" % (app_id, pushkey) + pushers_for_user = pusher_pool.pushers.get(user_id, {}) + pusher = pushers_for_user.pop(key, None) + if pusher is None: + return + logger.info("Stopping pusher %r / %r", user_id, key) + pusher.on_stop() + + def start_pusher(user_id, app_id, pushkey): + key = "%s:%s" % (app_id, pushkey) + logger.info("Starting pusher %r / %r", user_id, key) + return pusher_pool._refresh_pusher(app_id, pushkey, user_id) + + @defer.inlineCallbacks + def poke_pushers(results): + pushers_rows = set( + map(tuple, results.get("pushers", {}).get("rows", [])) + ) + deleted_pushers_rows = set( + map(tuple, results.get("deleted_pushers", {}).get("rows", [])) + ) + for row in sorted(pushers_rows | deleted_pushers_rows): + if row in deleted_pushers_rows: + user_id, app_id, pushkey = row[1:4] + stop_pusher(user_id, app_id, pushkey) + elif row in pushers_rows: + user_id = row[1] + app_id = row[5] + pushkey = row[8] + yield start_pusher(user_id, app_id, pushkey) + + stream = results.get("events") + if stream: + min_stream_id = stream["rows"][0][0] + max_stream_id = stream["position"] + preserve_fn(pusher_pool.on_new_notifications)( + min_stream_id, max_stream_id + ) + + stream = results.get("receipts") + if stream: + rows = stream["rows"] + affected_room_ids = set(row[1] for row in rows) + min_stream_id = rows[0][0] + max_stream_id = stream["position"] + preserve_fn(pusher_pool.on_new_receipts)( + min_stream_id, max_stream_id, affected_room_ids + ) + + while True: + try: + args = store.stream_positions() + args["timeout"] = 30000 + result = yield http_client.get_json(replication_url, args=args) + yield store.process_replication(result) + poke_pushers(result) + except: + logger.exception("Error replicating from %r", replication_url) + sleep(30) + + +def setup(config_options): + try: + config = PusherSlaveConfig.load_config( + "Synapse pusher", config_options + ) + except ConfigError as e: + sys.stderr.write("\n" + e.message + "\n") + sys.exit(1) + + config.setup_logging() + + database_engine = create_engine(config.database_config) + + ps = PusherServer( + config.server_name, + db_config=config.database_config, + config=config, + version_string=get_version_string("Synapse", synapse), + database_engine=database_engine, + ) + + ps.setup() + + def start(): + ps.replicate() + ps.get_pusherpool().start() + ps.get_datastore().start_profiling() + + reactor.callWhenRunning(start) + + return ps + + +if __name__ == '__main__': + with LoggingContext("main"): + ps = setup(sys.argv[1:]) + reactor.run() diff --git a/synapse/config/server.py b/synapse/config/server.py index df4707e1d1..46c633548a 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -28,6 +28,7 @@ class ServerConfig(Config): self.print_pidfile = config.get("print_pidfile") self.user_agent_suffix = config.get("user_agent_suffix") self.use_frozen_dicts = config.get("use_frozen_dicts", True) + self.start_pushers = config.get("start_pushers", True) self.listeners = config.get("listeners", []) diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index 6950a20632..3992804845 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -230,7 +230,7 @@ class HttpPusher(object): "Pushkey %s was rejected: removing", pk ) - yield self.hs.get_pusherpool().remove_pusher( + yield self.hs.remove_pusher( self.app_id, pk, self.user_id ) defer.returnValue(True) diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index aa095f9d9b..6ef48d63f7 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -29,6 +29,7 @@ logger = logging.getLogger(__name__) class PusherPool: def __init__(self, _hs): self.hs = _hs + self.start_pushers = _hs.config.start_pushers self.store = self.hs.get_datastore() self.clock = self.hs.get_clock() self.pushers = {} @@ -177,6 +178,9 @@ class PusherPool: self._start_pushers([p]) def _start_pushers(self, pushers): + if not self.start_pushers: + logger.info("Not starting pushers because they are disabled in the config") + return logger.info("Starting %d pushers", len(pushers)) for pusherdict in pushers: try: diff --git a/synapse/server.py b/synapse/server.py index 368d615576..ee138de756 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -193,6 +193,9 @@ class HomeServer(object): **self.db_config.get("args", {}) ) + def remove_pusher(self, app_id, push_key, user_id): + return self.get_pusherpool().remove_pusher(app_id, push_key, user_id) + def _make_dependency_method(depname): def _get(hs): -- cgit 1.5.1 From 9e7aa98c229af4f657756f9089654d2eab7a96ce Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 22 Apr 2016 15:40:51 +0100 Subject: Split out create_resource_tree to a separate file --- synapse/app/homeserver.py | 89 +++--------------------------------- synapse/util/httpresourcetree.py | 98 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 105 insertions(+), 82 deletions(-) create mode 100644 synapse/util/httpresourcetree.py (limited to 'synapse/app') diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index d2085a9405..fdadffeba7 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -66,6 +66,7 @@ from synapse.federation.transport.server import TransportLayerServer from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string +from synapse.util.httpresourcetree import create_resource_tree from synapse import events @@ -174,7 +175,12 @@ class SynapseHomeServer(HomeServer): if name == "replication": resources[REPLICATION_PREFIX] = ReplicationResource(self) - root_resource = create_resource_tree(resources) + if WEB_CLIENT_PREFIX in resources: + root_resource = RootRedirect(WEB_CLIENT_PREFIX) + else: + root_resource = Resource() + + root_resource = create_resource_tree(resources, root_resource) if tls: reactor.listenSSL( port, @@ -494,87 +500,6 @@ class SynapseSite(Site): pass -def create_resource_tree(desired_tree, redirect_root_to_web_client=True): - """Create the resource tree for this Home Server. - - This in unduly complicated because Twisted does not support putting - child resources more than 1 level deep at a time. - - Args: - web_client (bool): True to enable the web client. - redirect_root_to_web_client (bool): True to redirect '/' to the - location of the web client. This does nothing if web_client is not - True. - """ - if redirect_root_to_web_client and WEB_CLIENT_PREFIX in desired_tree: - root_resource = RootRedirect(WEB_CLIENT_PREFIX) - else: - root_resource = Resource() - - # ideally we'd just use getChild and putChild but getChild doesn't work - # unless you give it a Request object IN ADDITION to the name :/ So - # instead, we'll store a copy of this mapping so we can actually add - # extra resources to existing nodes. See self._resource_id for the key. - resource_mappings = {} - for full_path, res in desired_tree.items(): - logger.info("Attaching %s to path %s", res, full_path) - last_resource = root_resource - for path_seg in full_path.split('/')[1:-1]: - if path_seg not in last_resource.listNames(): - # resource doesn't exist, so make a "dummy resource" - child_resource = Resource() - last_resource.putChild(path_seg, child_resource) - res_id = _resource_id(last_resource, path_seg) - resource_mappings[res_id] = child_resource - last_resource = child_resource - else: - # we have an existing Resource, use that instead. - res_id = _resource_id(last_resource, path_seg) - last_resource = resource_mappings[res_id] - - # =========================== - # now attach the actual desired resource - last_path_seg = full_path.split('/')[-1] - - # if there is already a resource here, thieve its children and - # replace it - res_id = _resource_id(last_resource, last_path_seg) - if res_id in resource_mappings: - # there is a dummy resource at this path already, which needs - # to be replaced with the desired resource. - existing_dummy_resource = resource_mappings[res_id] - for child_name in existing_dummy_resource.listNames(): - child_res_id = _resource_id( - existing_dummy_resource, child_name - ) - child_resource = resource_mappings[child_res_id] - # steal the children - res.putChild(child_name, child_resource) - - # finally, insert the desired resource in the right place - last_resource.putChild(last_path_seg, res) - res_id = _resource_id(last_resource, last_path_seg) - resource_mappings[res_id] = res - - return root_resource - - -def _resource_id(resource, path_seg): - """Construct an arbitrary resource ID so you can retrieve the mapping - later. - - If you want to represent resource A putChild resource B with path C, - the mapping should looks like _resource_id(A,C) = B. - - Args: - resource (Resource): The *parent* Resourceb - path_seg (str): The name of the child Resource to be attached. - Returns: - str: A unique string which can be a key to the child Resource. - """ - return "%s-%s" % (resource, path_seg) - - def run(hs): PROFILE_SYNAPSE = False if PROFILE_SYNAPSE: diff --git a/synapse/util/httpresourcetree.py b/synapse/util/httpresourcetree.py new file mode 100644 index 0000000000..45be47159a --- /dev/null +++ b/synapse/util/httpresourcetree.py @@ -0,0 +1,98 @@ +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.web.resource import Resource + +import logging + +logger = logging.getLogger(__name__) + + +def create_resource_tree(desired_tree, root_resource): + """Create the resource tree for this Home Server. + + This in unduly complicated because Twisted does not support putting + child resources more than 1 level deep at a time. + + Args: + web_client (bool): True to enable the web client. + root_resource (twisted.web.resource.Resource): The root + resource to add the tree to. + Returns: + twisted.web.resource.Resource: the ``root_resource`` with a tree of + child resources added to it. + """ + + # ideally we'd just use getChild and putChild but getChild doesn't work + # unless you give it a Request object IN ADDITION to the name :/ So + # instead, we'll store a copy of this mapping so we can actually add + # extra resources to existing nodes. See self._resource_id for the key. + resource_mappings = {} + for full_path, res in desired_tree.items(): + logger.info("Attaching %s to path %s", res, full_path) + last_resource = root_resource + for path_seg in full_path.split('/')[1:-1]: + if path_seg not in last_resource.listNames(): + # resource doesn't exist, so make a "dummy resource" + child_resource = Resource() + last_resource.putChild(path_seg, child_resource) + res_id = _resource_id(last_resource, path_seg) + resource_mappings[res_id] = child_resource + last_resource = child_resource + else: + # we have an existing Resource, use that instead. + res_id = _resource_id(last_resource, path_seg) + last_resource = resource_mappings[res_id] + + # =========================== + # now attach the actual desired resource + last_path_seg = full_path.split('/')[-1] + + # if there is already a resource here, thieve its children and + # replace it + res_id = _resource_id(last_resource, last_path_seg) + if res_id in resource_mappings: + # there is a dummy resource at this path already, which needs + # to be replaced with the desired resource. + existing_dummy_resource = resource_mappings[res_id] + for child_name in existing_dummy_resource.listNames(): + child_res_id = _resource_id( + existing_dummy_resource, child_name + ) + child_resource = resource_mappings[child_res_id] + # steal the children + res.putChild(child_name, child_resource) + + # finally, insert the desired resource in the right place + last_resource.putChild(last_path_seg, res) + res_id = _resource_id(last_resource, last_path_seg) + resource_mappings[res_id] = res + + return root_resource + + +def _resource_id(resource, path_seg): + """Construct an arbitrary resource ID so you can retrieve the mapping + later. + + If you want to represent resource A putChild resource B with path C, + the mapping should looks like _resource_id(A,C) = B. + + Args: + resource (Resource): The *parent* Resourceb + path_seg (str): The name of the child Resource to be attached. + Returns: + str: A unique string which can be a key to the child Resource. + """ + return "%s-%s" % (resource, path_seg) -- cgit 1.5.1 From e856036f4c0b2744ef44a25f16b409ddb8c693e1 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 22 Apr 2016 16:09:55 +0100 Subject: Move SynapseSite to its own file --- synapse/app/homeserver.py | 133 +---------------------------------------- synapse/http/site.py | 146 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 149 insertions(+), 130 deletions(-) create mode 100644 synapse/http/site.py (limited to 'synapse/app') diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index d2085a9405..2818c55b7a 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -16,12 +16,9 @@ import synapse -import contextlib import logging import os -import re import sys -import time from synapse.config._base import ConfigError from synapse.python_dependencies import ( @@ -46,7 +43,7 @@ from twisted.internet import reactor, task, defer from twisted.application import service from twisted.web.resource import Resource, EncodingResourceWrapper from twisted.web.static import File -from twisted.web.server import Site, GzipEncoderFactory, Request +from twisted.web.server import GzipEncoderFactory from synapse.http.server import RootRedirect from synapse.rest.media.v0.content_repository import ContentRepoResource from synapse.rest.media.v1.media_repository import MediaRepositoryResource @@ -67,6 +64,8 @@ from synapse.federation.transport.server import TransportLayerServer from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string +from synapse.http.site import SynapseSite + from synapse import events from daemonize import Daemonize @@ -74,9 +73,6 @@ from daemonize import Daemonize logger = logging.getLogger("synapse.app.homeserver") -ACCESS_TOKEN_RE = re.compile(r'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$') - - def gz_wrap(r): return EncodingResourceWrapper(r, [GzipEncoderFactory()]) @@ -371,129 +367,6 @@ class SynapseService(service.Service): return self._port.stopListening() -class SynapseRequest(Request): - def __init__(self, site, *args, **kw): - Request.__init__(self, *args, **kw) - self.site = site - self.authenticated_entity = None - self.start_time = 0 - - def __repr__(self): - # We overwrite this so that we don't log ``access_token`` - return '<%s at 0x%x method=%s uri=%s clientproto=%s site=%s>' % ( - self.__class__.__name__, - id(self), - self.method, - self.get_redacted_uri(), - self.clientproto, - self.site.site_tag, - ) - - def get_redacted_uri(self): - return ACCESS_TOKEN_RE.sub( - r'\1\3', - self.uri - ) - - def get_user_agent(self): - return self.requestHeaders.getRawHeaders("User-Agent", [None])[-1] - - def started_processing(self): - self.site.access_logger.info( - "%s - %s - Received request: %s %s", - self.getClientIP(), - self.site.site_tag, - self.method, - self.get_redacted_uri() - ) - self.start_time = int(time.time() * 1000) - - def finished_processing(self): - - try: - context = LoggingContext.current_context() - ru_utime, ru_stime = context.get_resource_usage() - db_txn_count = context.db_txn_count - db_txn_duration = context.db_txn_duration - except: - ru_utime, ru_stime = (0, 0) - db_txn_count, db_txn_duration = (0, 0) - - self.site.access_logger.info( - "%s - %s - {%s}" - " Processed request: %dms (%dms, %dms) (%dms/%d)" - " %sB %s \"%s %s %s\" \"%s\"", - self.getClientIP(), - self.site.site_tag, - self.authenticated_entity, - int(time.time() * 1000) - self.start_time, - int(ru_utime * 1000), - int(ru_stime * 1000), - int(db_txn_duration * 1000), - int(db_txn_count), - self.sentLength, - self.code, - self.method, - self.get_redacted_uri(), - self.clientproto, - self.get_user_agent(), - ) - - @contextlib.contextmanager - def processing(self): - self.started_processing() - yield - self.finished_processing() - - -class XForwardedForRequest(SynapseRequest): - def __init__(self, *args, **kw): - SynapseRequest.__init__(self, *args, **kw) - - """ - Add a layer on top of another request that only uses the value of an - X-Forwarded-For header as the result of C{getClientIP}. - """ - def getClientIP(self): - """ - @return: The client address (the first address) in the value of the - I{X-Forwarded-For header}. If the header is not present, return - C{b"-"}. - """ - return self.requestHeaders.getRawHeaders( - b"x-forwarded-for", [b"-"])[0].split(b",")[0].strip() - - -class SynapseRequestFactory(object): - def __init__(self, site, x_forwarded_for): - self.site = site - self.x_forwarded_for = x_forwarded_for - - def __call__(self, *args, **kwargs): - if self.x_forwarded_for: - return XForwardedForRequest(self.site, *args, **kwargs) - else: - return SynapseRequest(self.site, *args, **kwargs) - - -class SynapseSite(Site): - """ - Subclass of a twisted http Site that does access logging with python's - standard logging - """ - def __init__(self, logger_name, site_tag, config, resource, *args, **kwargs): - Site.__init__(self, resource, *args, **kwargs) - - self.site_tag = site_tag - - proxied = config.get("x_forwarded", False) - self.requestFactory = SynapseRequestFactory(self, proxied) - self.access_logger = logging.getLogger(logger_name) - - def log(self, request): - pass - - def create_resource_tree(desired_tree, redirect_root_to_web_client=True): """Create the resource tree for this Home Server. diff --git a/synapse/http/site.py b/synapse/http/site.py new file mode 100644 index 0000000000..4b09d7ee66 --- /dev/null +++ b/synapse/http/site.py @@ -0,0 +1,146 @@ +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.util.logcontext import LoggingContext +from twisted.web.server import Site, Request + +import contextlib +import logging +import re +import time + +ACCESS_TOKEN_RE = re.compile(r'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$') + + +class SynapseRequest(Request): + def __init__(self, site, *args, **kw): + Request.__init__(self, *args, **kw) + self.site = site + self.authenticated_entity = None + self.start_time = 0 + + def __repr__(self): + # We overwrite this so that we don't log ``access_token`` + return '<%s at 0x%x method=%s uri=%s clientproto=%s site=%s>' % ( + self.__class__.__name__, + id(self), + self.method, + self.get_redacted_uri(), + self.clientproto, + self.site.site_tag, + ) + + def get_redacted_uri(self): + return ACCESS_TOKEN_RE.sub( + r'\1\3', + self.uri + ) + + def get_user_agent(self): + return self.requestHeaders.getRawHeaders("User-Agent", [None])[-1] + + def started_processing(self): + self.site.access_logger.info( + "%s - %s - Received request: %s %s", + self.getClientIP(), + self.site.site_tag, + self.method, + self.get_redacted_uri() + ) + self.start_time = int(time.time() * 1000) + + def finished_processing(self): + + try: + context = LoggingContext.current_context() + ru_utime, ru_stime = context.get_resource_usage() + db_txn_count = context.db_txn_count + db_txn_duration = context.db_txn_duration + except: + ru_utime, ru_stime = (0, 0) + db_txn_count, db_txn_duration = (0, 0) + + self.site.access_logger.info( + "%s - %s - {%s}" + " Processed request: %dms (%dms, %dms) (%dms/%d)" + " %sB %s \"%s %s %s\" \"%s\"", + self.getClientIP(), + self.site.site_tag, + self.authenticated_entity, + int(time.time() * 1000) - self.start_time, + int(ru_utime * 1000), + int(ru_stime * 1000), + int(db_txn_duration * 1000), + int(db_txn_count), + self.sentLength, + self.code, + self.method, + self.get_redacted_uri(), + self.clientproto, + self.get_user_agent(), + ) + + @contextlib.contextmanager + def processing(self): + self.started_processing() + yield + self.finished_processing() + + +class XForwardedForRequest(SynapseRequest): + def __init__(self, *args, **kw): + SynapseRequest.__init__(self, *args, **kw) + + """ + Add a layer on top of another request that only uses the value of an + X-Forwarded-For header as the result of C{getClientIP}. + """ + def getClientIP(self): + """ + @return: The client address (the first address) in the value of the + I{X-Forwarded-For header}. If the header is not present, return + C{b"-"}. + """ + return self.requestHeaders.getRawHeaders( + b"x-forwarded-for", [b"-"])[0].split(b",")[0].strip() + + +class SynapseRequestFactory(object): + def __init__(self, site, x_forwarded_for): + self.site = site + self.x_forwarded_for = x_forwarded_for + + def __call__(self, *args, **kwargs): + if self.x_forwarded_for: + return XForwardedForRequest(self.site, *args, **kwargs) + else: + return SynapseRequest(self.site, *args, **kwargs) + + +class SynapseSite(Site): + """ + Subclass of a twisted http Site that does access logging with python's + standard logging + """ + def __init__(self, logger_name, site_tag, config, resource, *args, **kwargs): + Site.__init__(self, resource, *args, **kwargs) + + self.site_tag = site_tag + + proxied = config.get("x_forwarded", False) + self.requestFactory = SynapseRequestFactory(self, proxied) + self.access_logger = logging.getLogger(logger_name) + + def log(self, request): + pass -- cgit 1.5.1 From 5905f36f0557f2b496e5b2759db295a3b2807574 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 22 Apr 2016 17:08:02 +0100 Subject: Split out setting up the manhole to a separate file --- synapse/app/homeserver.py | 33 +++++++------------------------ synapse/util/manhole.py | 50 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 57 insertions(+), 26 deletions(-) create mode 100644 synapse/util/manhole.py (limited to 'synapse/app') diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 1fa93be93e..b033073ef7 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -32,13 +32,6 @@ from synapse.storage.prepare_database import UpgradeDatabaseException, prepare_d from synapse.server import HomeServer - -from twisted.conch.manhole import ColoredManhole -from twisted.conch.insults import insults -from twisted.conch import manhole_ssh -from twisted.cred import checkers, portal - - from twisted.internet import reactor, task, defer from twisted.application import service from twisted.web.resource import Resource, EncodingResourceWrapper @@ -64,6 +57,7 @@ from synapse.federation.transport.server import TransportLayerServer from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string from synapse.util.httpresourcetree import create_resource_tree +from synapse.util.manhole import listen_manhole from synapse.http.site import SynapseSite @@ -209,25 +203,12 @@ class SynapseHomeServer(HomeServer): if listener["type"] == "http": self._listener_http(config, listener) elif listener["type"] == "manhole": - checker = checkers.InMemoryUsernamePasswordDatabaseDontUse( - matrix="rabbithole" - ) - - rlm = manhole_ssh.TerminalRealm() - rlm.chainedProtocolFactory = lambda: insults.ServerProtocol( - ColoredManhole, - { - "__name__": "__console__", - "hs": self, - } - ) - - f = manhole_ssh.ConchFactory(portal.Portal(rlm, [checker])) - - reactor.listenTCP( - listener["port"], - f, - interface=listener.get("bind_address", '127.0.0.1') + listen_manhole( + bind_address=listener.get("bind_address", '127.0.0.1'), + bind_port=listener["port"], + username="matrix", + password="rabbithole", + globals={"hs": self}, ) else: logger.warn("Unrecognized listener type: %s", listener["type"]) diff --git a/synapse/util/manhole.py b/synapse/util/manhole.py new file mode 100644 index 0000000000..e12583209f --- /dev/null +++ b/synapse/util/manhole.py @@ -0,0 +1,50 @@ +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.conch.manhole import ColoredManhole +from twisted.conch.insults import insults +from twisted.conch import manhole_ssh +from twisted.cred import checkers, portal + + +from twisted.internet import reactor + + +def listen_manhole(bind_address, bind_port, username, password, globals): + """Starts a ssh listener with password authentication using + the given username and password. Clients connecting to the ssh + listener will find themselves in a colored python shell with + the supplied globals. + + Args: + bind_address(str): IP address to listen on. + bind_port(int): TCP port to listen on. + username(str): The username ssh clients should auth with. + password(str): The password ssh clients should auth with. + globals(dict): The variables to expose in the shell. + """ + + checker = checkers.InMemoryUsernamePasswordDatabaseDontUse( + **{username: password} + ) + + rlm = manhole_ssh.TerminalRealm() + rlm.chainedProtocolFactory = lambda: insults.ServerProtocol( + ColoredManhole, + dict(globals, __name__="__console__") + ) + + factory = manhole_ssh.ConchFactory(portal.Portal(rlm, [checker])) + + reactor.listenTCP(bind_port, factory, interface=bind_address) -- cgit 1.5.1 From f22f46f4f902e071fe322854a228f8fe53677cdc Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 25 Apr 2016 14:59:21 +0100 Subject: Move the listenTCP call outside the manhole function --- synapse/app/homeserver.py | 16 +++++++++------- synapse/util/manhole.py | 14 +++++--------- 2 files changed, 14 insertions(+), 16 deletions(-) (limited to 'synapse/app') diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index b033073ef7..df675c0ed4 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -57,7 +57,7 @@ from synapse.federation.transport.server import TransportLayerServer from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.manhole import listen_manhole +from synapse.util.manhole import manhole from synapse.http.site import SynapseSite @@ -203,12 +203,14 @@ class SynapseHomeServer(HomeServer): if listener["type"] == "http": self._listener_http(config, listener) elif listener["type"] == "manhole": - listen_manhole( - bind_address=listener.get("bind_address", '127.0.0.1'), - bind_port=listener["port"], - username="matrix", - password="rabbithole", - globals={"hs": self}, + reactor.listenTCP( + listener["port"], + manhole( + username="matrix", + password="rabbithole", + globals={"hs": self}, + ), + interface=listener.get("bind_address", '127.0.0.1') ) else: logger.warn("Unrecognized listener type: %s", listener["type"]) diff --git a/synapse/util/manhole.py b/synapse/util/manhole.py index e12583209f..9b106cdf47 100644 --- a/synapse/util/manhole.py +++ b/synapse/util/manhole.py @@ -18,21 +18,19 @@ from twisted.conch import manhole_ssh from twisted.cred import checkers, portal -from twisted.internet import reactor - - -def listen_manhole(bind_address, bind_port, username, password, globals): +def manhole(username, password, globals): """Starts a ssh listener with password authentication using the given username and password. Clients connecting to the ssh listener will find themselves in a colored python shell with the supplied globals. Args: - bind_address(str): IP address to listen on. - bind_port(int): TCP port to listen on. username(str): The username ssh clients should auth with. password(str): The password ssh clients should auth with. globals(dict): The variables to expose in the shell. + + Returns: + twisted.internet.protocol.Factory: A factory to pass to ``listenTCP`` """ checker = checkers.InMemoryUsernamePasswordDatabaseDontUse( @@ -45,6 +43,4 @@ def listen_manhole(bind_address, bind_port, username, password, globals): dict(globals, __name__="__console__") ) - factory = manhole_ssh.ConchFactory(portal.Portal(rlm, [checker])) - - reactor.listenTCP(bind_port, factory, interface=bind_address) + return manhole_ssh.ConchFactory(portal.Portal(rlm, [checker])) -- cgit 1.5.1 From 72e2fafa207c28581c62bcce2f1a6ede410fee5a Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 25 Apr 2016 17:34:25 +0100 Subject: Add a metrics listener and a ssh listener to the pusher --- synapse/app/pusher.py | 69 +++++++++++++++++++++++++++++++++++++++++++++++-- synapse/util/manhole.py | 26 ++++++++++++++++++- 2 files changed, 92 insertions(+), 3 deletions(-) (limited to 'synapse/app') diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 8922573db7..abb9f1fe8e 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -17,19 +17,25 @@ import synapse from synapse.server import HomeServer -from synapse.util.versionstring import get_version_string from synapse.config._base import ConfigError from synapse.config.database import DatabaseConfig from synapse.config.logger import LoggingConfig +from synapse.http.site import SynapseSite +from synapse.metrics.resource import MetricsResource, METRICS_PREFIX from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.pushers import SlavedPusherStore from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.storage.engines import create_engine from synapse.storage import DataStore from synapse.util.async import sleep -from synapse.util.logcontext import (LoggingContext, preserve_fn) +from synapse.util.httpresourcetree import create_resource_tree +from synapse.util.logcontext import LoggingContext, preserve_fn +from synapse.util.manhole import manhole +from synapse.util.rlimit import change_resource_limit +from synapse.util.versionstring import get_version_string from twisted.internet import reactor, defer +from twisted.web.resource import Resource import sys import logging @@ -46,12 +52,28 @@ class SlaveConfig(DatabaseConfig): ) self.user_agent_suffix = None self.start_pushers = True + self.listeners = config["listeners"] + self.soft_file_limit = config.get("soft_file_limit") def default_config(self, **kwargs): return """\ ## Slave ## + # The replication listener on the synapse to talk to. #replication_url: https://localhost:{replication_port}/_synapse/replication + listeners: [] + # Uncomment to enable a ssh manhole listener on the pusher. + # - type: manhole + # port: {manhole_port} + # bind_address: 127.0.0.1 + # Uncomment to enable a metric listener on the pusher. + # - type: http + # port: {metrics_port} + # bind_address: 127.0.0.1 + # resources: + # - names: ["metrics"], + # compress: False + report_stats: False """ @@ -100,6 +122,46 @@ class PusherServer(HomeServer): }] }) + def _listen_http(self, listener_config): + port = listener_config["port"] + bind_address = listener_config.get("bind_address", "") + site_tag = listener_config.get("tag", port) + resources = {} + for res in listener_config["resources"]: + for name in res["names"]: + if name == "metrics": + resources[METRICS_PREFIX] = MetricsResource(self) + + root_resource = create_resource_tree(resources, Resource()) + reactor.listenTCP( + port, + SynapseSite( + "synapse.access.http.%s" % (site_tag,), + site_tag, + listener_config, + root_resource, + ), + interface=bind_address + ) + logger.info("Synapse pusher now listening on port %d", port) + + def start_listening(self): + for listener in self.config.listeners: + if listener["type"] == "http": + self._listen_http(listener) + elif listener["type"] == "manhole": + reactor.listenTCP( + listener["port"], + manhole( + username="matrix", + password="rabbithole", + globals={"hs": self}, + ), + interface=listener.get("bind_address", '127.0.0.1') + ) + else: + logger.warn("Unrecognized listener type: %s", listener["type"]) + @defer.inlineCallbacks def replicate(self): http_client = self.get_simple_http_client() @@ -191,6 +253,9 @@ def setup(config_options): ) ps.setup() + ps.start_listening() + + change_resource_limit(ps.config.soft_file_limit) def start(): ps.replicate() diff --git a/synapse/util/manhole.py b/synapse/util/manhole.py index 9b106cdf47..97e0f00b67 100644 --- a/synapse/util/manhole.py +++ b/synapse/util/manhole.py @@ -16,6 +16,26 @@ from twisted.conch.manhole import ColoredManhole from twisted.conch.insults import insults from twisted.conch import manhole_ssh from twisted.cred import checkers, portal +from twisted.conch.ssh.keys import Key + +PUBLIC_KEY = ( + "ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAGEArzJx8OYOnJmzf4tfBEvLi8DVPrJ3/c9k2I/Az" + "64fxjHf9imyRJbixtQhlH9lfNjUIx+4LmrJH5QNRsFporcHDKOTwTTYLh5KmRpslkYHRivcJS" + "kbh/C+BR3utDS555mV" +) + +PRIVATE_KEY = """-----BEGIN RSA PRIVATE KEY----- +MIIByAIBAAJhAK8ycfDmDpyZs3+LXwRLy4vA1T6yd/3PZNiPwM+uH8Yx3/YpskSW +4sbUIZR/ZXzY1CMfuC5qyR+UDUbBaaK3Bwyjk8E02C4eSpkabJZGB0Yr3CUpG4fw +vgUd7rQ0ueeZlQIBIwJgbh+1VZfr7WftK5lu7MHtqE1S1vPWZQYE3+VUn8yJADyb +Z4fsZaCrzW9lkIqXkE3GIY+ojdhZhkO1gbG0118sIgphwSWKRxK0mvh6ERxKqIt1 +xJEJO74EykXZV4oNJ8sjAjEA3J9r2ZghVhGN6V8DnQrTk24Td0E8hU8AcP0FVP+8 +PQm/g/aXf2QQkQT+omdHVEJrAjEAy0pL0EBH6EVS98evDCBtQw22OZT52qXlAwZ2 +gyTriKFVoqjeEjt3SZKKqXHSApP/AjBLpF99zcJJZRq2abgYlf9lv1chkrWqDHUu +DZttmYJeEfiFBBavVYIF1dOlZT0G8jMCMBc7sOSZodFnAiryP+Qg9otSBjJ3bQML +pSTqy7c3a2AScC/YyOwkDaICHnnD3XyjMwIxALRzl0tQEKMXs6hH8ToUdlLROCrP +EhQ0wahUTCk1gKA4uPD6TMTChavbh4K63OvbKg== +-----END RSA PRIVATE KEY-----""" def manhole(username, password, globals): @@ -43,4 +63,8 @@ def manhole(username, password, globals): dict(globals, __name__="__console__") ) - return manhole_ssh.ConchFactory(portal.Portal(rlm, [checker])) + factory = manhole_ssh.ConchFactory(portal.Portal(rlm, [checker])) + factory.publicKeys['ssh-rsa'] = Key.fromString(PUBLIC_KEY) + factory.privateKeys['ssh-rsa'] = Key.fromString(PRIVATE_KEY) + + return factory -- cgit 1.5.1 From f15e9e8de4329ed7f726d3405c8aa37a52c8db24 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 25 Apr 2016 17:56:24 +0100 Subject: Remove the uncomments from the comments --- synapse/app/pusher.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/app') diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index abb9f1fe8e..e6aa0aa65c 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -62,11 +62,11 @@ class SlaveConfig(DatabaseConfig): #replication_url: https://localhost:{replication_port}/_synapse/replication listeners: [] - # Uncomment to enable a ssh manhole listener on the pusher. + # Enable a ssh manhole listener on the pusher. # - type: manhole # port: {manhole_port} # bind_address: 127.0.0.1 - # Uncomment to enable a metric listener on the pusher. + # Enable a metric listener on the pusher. # - type: http # port: {metrics_port} # bind_address: 127.0.0.1 -- cgit 1.5.1 From 9c417c54d461be6877b58220311c7c1bb83dabb1 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 26 Apr 2016 10:45:02 +0100 Subject: Add a couple of update methods to the PusherSlaveStore --- synapse/app/pusher.py | 8 ++++++++ 1 file changed, 8 insertions(+) (limited to 'synapse/app') diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index e6aa0aa65c..9381fe2251 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -89,6 +89,14 @@ class PusherSlaveStore( DataStore.update_pusher_last_stream_ordering_and_success.__func__ ) + update_pusher_failing_since = ( + DataStore.update_pusher_failing_since.__func__ + ) + + update_pusher_last_stream_ordering = ( + DataStore.update_pusher_last_stream_ordering.__func__ + ) + class PusherServer(HomeServer): -- cgit 1.5.1 From 6df5a6a833620a6388c90850be4d457bc4c5c4eb Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 26 Apr 2016 15:37:41 +0100 Subject: Optionally daemonize the pusher --- synapse/app/pusher.py | 35 +++++++++++++++++++++++++++++++---- 1 file changed, 31 insertions(+), 4 deletions(-) (limited to 'synapse/app') diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 9381fe2251..5f3200cf4c 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -37,6 +37,8 @@ from synapse.util.versionstring import get_version_string from twisted.internet import reactor, defer from twisted.web.resource import Resource +from daemonize import Daemonize + import sys import logging @@ -54,13 +56,19 @@ class SlaveConfig(DatabaseConfig): self.start_pushers = True self.listeners = config["listeners"] self.soft_file_limit = config.get("soft_file_limit") + self.daemonize = config.get("daemonize") + self.pid_file = self.abspath(config.get("pid_file")) - def default_config(self, **kwargs): + def default_config(self, server_name, **kwargs): + pid_file = self.abspath("pusher.pid") return """\ - ## Slave ## + # Slave configuration + # The replication listener on the synapse to talk to. #replication_url: https://localhost:{replication_port}/_synapse/replication + server_name: "%(server_name)s" + listeners: [] # Enable a ssh manhole listener on the pusher. # - type: manhole @@ -75,7 +83,12 @@ class SlaveConfig(DatabaseConfig): # compress: False report_stats: False - """ + + daemonize: False + + pid_file: %(pid_file)s + + """ % locals() class PusherSlaveConfig(SlaveConfig, LoggingConfig): @@ -248,6 +261,9 @@ def setup(config_options): sys.stderr.write("\n" + e.message + "\n") sys.exit(1) + if not config: + sys.exit(0) + config.setup_logging() database_engine = create_engine(config.database_config) @@ -278,4 +294,15 @@ def setup(config_options): if __name__ == '__main__': with LoggingContext("main"): ps = setup(sys.argv[1:]) - reactor.run() + + if ps.config.daemonize: + daemon = Daemonize( + app="synapse-pusher", + pid=ps.config.pid_file, + action=reactor.run, + auto_close_fds=False, + verbose=True, + logger=logger, + ) + else: + reactor.run() -- cgit 1.5.1 From b80b93ea0f47f9854bc093c72f4f0bd42898fabe Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 26 Apr 2016 15:57:28 +0100 Subject: Add a log context to the daemonized pusher --- synapse/app/pusher.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) (limited to 'synapse/app') diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 5f3200cf4c..a67650b5d1 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -296,10 +296,15 @@ if __name__ == '__main__': ps = setup(sys.argv[1:]) if ps.config.daemonize: + def run(): + with LoggingContext("run"): + change_resource_limit(ps.config.soft_file_limit) + reactor.run() + daemon = Daemonize( app="synapse-pusher", pid=ps.config.pid_file, - action=reactor.run, + action=run, auto_close_fds=False, verbose=True, logger=logger, -- cgit 1.5.1 From c9eab73f2a5c0e61ffd0de46d8bd4750f53e7ccb Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 26 Apr 2016 17:06:04 +0100 Subject: Fix typo in default pusher config --- synapse/app/pusher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/app') diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index a67650b5d1..230156559e 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -79,7 +79,7 @@ class SlaveConfig(DatabaseConfig): # port: {metrics_port} # bind_address: 127.0.0.1 # resources: - # - names: ["metrics"], + # - names: ["metrics"] # compress: False report_stats: False -- cgit 1.5.1 From 71df32719050892eae42cd741eaffc3dcf2eb603 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 26 Apr 2016 17:07:09 +0100 Subject: Actually start the pusher daemon --- synapse/app/pusher.py | 2 ++ 1 file changed, 2 insertions(+) (limited to 'synapse/app') diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 230156559e..b5339f030d 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -309,5 +309,7 @@ if __name__ == '__main__': verbose=True, logger=logger, ) + + daemon.start() else: reactor.run() -- cgit 1.5.1 From 8cc82aad873d8c6c933a20d8c7552d061b4e6316 Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 4 May 2016 11:47:59 +0100 Subject: Add db functions used for email to the pusher app --- synapse/app/pusher.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) (limited to 'synapse/app') diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index b5339f030d..9976908983 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -110,6 +110,18 @@ class PusherSlaveStore( DataStore.update_pusher_last_stream_ordering.__func__ ) + get_throttle_params_by_room = ( + DataStore.get_throttle_params_by_room.__func__ + ) + + set_throttle_params = ( + DataStore.set_throttle_params.__func__ + ) + + get_time_of_last_push_action_before = ( + DataStore.get_time_of_last_push_action_before.__func__ + ) + class PusherServer(HomeServer): -- cgit 1.5.1 From 9ef05a12c380a6a5a45226d6086d70512fd4f073 Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 4 May 2016 14:52:10 +0100 Subject: Add date header & message id --- synapse/app/pusher.py | 3 ++- synapse/push/mailer.py | 2 ++ 2 files changed, 4 insertions(+), 1 deletion(-) (limited to 'synapse/app') diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 9976908983..89c8d5c7ce 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -20,6 +20,7 @@ from synapse.server import HomeServer from synapse.config._base import ConfigError from synapse.config.database import DatabaseConfig from synapse.config.logger import LoggingConfig +from synapse.config.emailconfig import EmailConfig from synapse.http.site import SynapseSite from synapse.metrics.resource import MetricsResource, METRICS_PREFIX from synapse.replication.slave.storage.events import SlavedEventStore @@ -91,7 +92,7 @@ class SlaveConfig(DatabaseConfig): """ % locals() -class PusherSlaveConfig(SlaveConfig, LoggingConfig): +class PusherSlaveConfig(SlaveConfig, LoggingConfig, EmailConfig): pass diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py index 1e6654e2bb..037bc990d0 100644 --- a/synapse/push/mailer.py +++ b/synapse/push/mailer.py @@ -152,6 +152,8 @@ class Mailer(object): multipart_msg['Subject'] = "New Matrix Notifications" multipart_msg['From'] = self.hs.config.email_notif_from multipart_msg['To'] = email_address + multipart_msg['Date'] = email.utils.formatdate() + multipart_msg['Message-ID'] = email.utils.make_msgid() multipart_msg.attach(text_part) multipart_msg.attach(html_part) -- cgit 1.5.1 From 0c4ccdcb83308396670b896d7b9f6c40c1e94660 Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 10 May 2016 18:51:14 +0200 Subject: Also pass through get_profile_displayname --- synapse/app/pusher.py | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'synapse/app') diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 89c8d5c7ce..9fec6869fd 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -123,6 +123,10 @@ class PusherSlaveStore( DataStore.get_time_of_last_push_action_before.__func__ ) + get_profile_displayname = ( + DataStore.get_profile_displayname.__func__ + ) + class PusherServer(HomeServer): -- cgit 1.5.1 From 3367e65476f224b12a94277552787650ab57d984 Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 10 May 2016 18:53:15 +0200 Subject: Pass through get_state_groups --- synapse/app/pusher.py | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'synapse/app') diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 9fec6869fd..6f5e7a2e3f 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -127,6 +127,10 @@ class PusherSlaveStore( DataStore.get_profile_displayname.__func__ ) + get_state_groups = ( + DataStore.get_state_groups.__func__ + ) + class PusherServer(HomeServer): -- cgit 1.5.1 From 35b6e6d2a8639dba4e23372dff0be76c4d9a8936 Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 10 May 2016 18:56:40 +0200 Subject: Pass though _get_state_group_for_events --- synapse/app/pusher.py | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'synapse/app') diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 6f5e7a2e3f..d59922422c 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -131,6 +131,10 @@ class PusherSlaveStore( DataStore.get_state_groups.__func__ ) + _get_state_group_for_events = ( + DataStore._get_state_group_for_events.__func__ + ) + class PusherServer(HomeServer): -- cgit 1.5.1 From 89b5ef7c4be824af4314497ad1338c2ac0e66ed8 Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 10 May 2016 19:05:22 +0200 Subject: Cached functions must be accessed through the dict --- synapse/app/pusher.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'synapse/app') diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index d59922422c..64371aaba8 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -23,6 +23,7 @@ from synapse.config.logger import LoggingConfig from synapse.config.emailconfig import EmailConfig from synapse.http.site import SynapseSite from synapse.metrics.resource import MetricsResource, METRICS_PREFIX +from synapse.storage.events import EventsStore from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.pushers import SlavedPusherStore from synapse.replication.slave.storage.receipts import SlavedReceiptsStore @@ -132,7 +133,7 @@ class PusherSlaveStore( ) _get_state_group_for_events = ( - DataStore._get_state_group_for_events.__func__ + EventsStore.__dict__["_get_state_group_for_events"] ) -- cgit 1.5.1 From 90afc07f39cdb01dfd33a13ae5bdfb9d89505331 Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 10 May 2016 19:10:46 +0200 Subject: StateStore, not EventsStore --- synapse/app/pusher.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/app') diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 64371aaba8..b0a42fe67c 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -23,7 +23,7 @@ from synapse.config.logger import LoggingConfig from synapse.config.emailconfig import EmailConfig from synapse.http.site import SynapseSite from synapse.metrics.resource import MetricsResource, METRICS_PREFIX -from synapse.storage.events import EventsStore +from synapse.storage.state import StateStore from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.pushers import SlavedPusherStore from synapse.replication.slave.storage.receipts import SlavedReceiptsStore @@ -133,7 +133,7 @@ class PusherSlaveStore( ) _get_state_group_for_events = ( - EventsStore.__dict__["_get_state_group_for_events"] + StateStore.__dict__["_get_state_group_for_events"] ) -- cgit 1.5.1 From ae1af262f6ad32ffeb4f17b19a43599b9bf9e129 Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 10 May 2016 19:18:03 +0200 Subject: Pass through _get_state_group_for_events --- synapse/app/pusher.py | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'synapse/app') diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index b0a42fe67c..8e9c0e1960 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -136,6 +136,10 @@ class PusherSlaveStore( StateStore.__dict__["_get_state_group_for_events"] ) + _get_state_group_for_event = ( + StateStore.__dict__["_get_state_group_for_event"] + ) + class PusherServer(HomeServer): -- cgit 1.5.1 From b5e646a18ce2a293e5d35dcb560ba50183a87429 Mon Sep 17 00:00:00 2001 From: David Baker Date: Fri, 13 May 2016 11:36:50 +0100 Subject: Make email notifs work on the pusher synapse Plus general bugfix to email notif code --- synapse/app/pusher.py | 47 +++++++++++++++++++++++++++++++++++++++++++++++ synapse/push/mailer.py | 1 + 2 files changed, 48 insertions(+) (limited to 'synapse/app') diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 8e9c0e1960..662cd0dc6b 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -24,6 +24,8 @@ from synapse.config.emailconfig import EmailConfig from synapse.http.site import SynapseSite from synapse.metrics.resource import MetricsResource, METRICS_PREFIX from synapse.storage.state import StateStore +from synapse.storage.roommember import RoomMemberStore +from synapse.storage.account_data import AccountDataStore from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.pushers import SlavedPusherStore from synapse.replication.slave.storage.receipts import SlavedReceiptsStore @@ -60,6 +62,7 @@ class SlaveConfig(DatabaseConfig): self.soft_file_limit = config.get("soft_file_limit") self.daemonize = config.get("daemonize") self.pid_file = self.abspath(config.get("pid_file")) + self.public_baseurl = config["public_baseurl"] def default_config(self, server_name, **kwargs): pid_file = self.abspath("pusher.pid") @@ -132,6 +135,30 @@ class PusherSlaveStore( DataStore.get_state_groups.__func__ ) + _get_state_for_groups = ( + DataStore._get_state_for_groups.__func__ + ) + + _get_all_state_from_cache = ( + DataStore._get_all_state_from_cache.__func__ + ) + + get_events_around = ( + DataStore.get_events_around.__func__ + ) + + _get_events_around_txn = ( + DataStore._get_events_around_txn.__func__ + ) + + get_state_for_events = ( + DataStore.get_state_for_events.__func__ + ) + + _get_some_state_from_cache = ( + DataStore._get_some_state_from_cache.__func__ + ) + _get_state_group_for_events = ( StateStore.__dict__["_get_state_group_for_events"] ) @@ -140,6 +167,26 @@ class PusherSlaveStore( StateStore.__dict__["_get_state_group_for_event"] ) + _get_state_groups_from_groups = ( + StateStore.__dict__["_get_state_groups_from_groups"] + ) + + _get_state_group_from_group = ( + StateStore.__dict__["_get_state_group_from_group"] + ) + + get_global_account_data_by_type_for_users = ( + AccountDataStore.__dict__["get_global_account_data_by_type_for_users"] + ) + + get_global_account_data_by_type_for_user = ( + AccountDataStore.__dict__["get_global_account_data_by_type_for_user"] + ) + + who_forgot_in_room = ( + RoomMemberStore.__dict__["who_forgot_in_room"] + ) + class PusherServer(HomeServer): diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py index 5d60c1efcf..2be294f52e 100644 --- a/synapse/push/mailer.py +++ b/synapse/push/mailer.py @@ -397,6 +397,7 @@ class Mailer(object): return "" serverAndMediaId = value[6:] + fragment = None if '#' in serverAndMediaId: (serverAndMediaId, fragment) = serverAndMediaId.split('#', 1) fragment = "#" + fragment -- cgit 1.5.1 From 206eb9fd947ba86060340ba2154d1112570b76cd Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 13 May 2016 16:58:14 +0100 Subject: Shift some of the state_group methods into the SlavedEventStore --- synapse/app/pusher.py | 45 ----------------------------- synapse/replication/slave/storage/events.py | 19 ++++++++++++ 2 files changed, 19 insertions(+), 45 deletions(-) (limited to 'synapse/app') diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 662cd0dc6b..9d41b62db5 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -23,7 +23,6 @@ from synapse.config.logger import LoggingConfig from synapse.config.emailconfig import EmailConfig from synapse.http.site import SynapseSite from synapse.metrics.resource import MetricsResource, METRICS_PREFIX -from synapse.storage.state import StateStore from synapse.storage.roommember import RoomMemberStore from synapse.storage.account_data import AccountDataStore from synapse.replication.slave.storage.events import SlavedEventStore @@ -131,50 +130,6 @@ class PusherSlaveStore( DataStore.get_profile_displayname.__func__ ) - get_state_groups = ( - DataStore.get_state_groups.__func__ - ) - - _get_state_for_groups = ( - DataStore._get_state_for_groups.__func__ - ) - - _get_all_state_from_cache = ( - DataStore._get_all_state_from_cache.__func__ - ) - - get_events_around = ( - DataStore.get_events_around.__func__ - ) - - _get_events_around_txn = ( - DataStore._get_events_around_txn.__func__ - ) - - get_state_for_events = ( - DataStore.get_state_for_events.__func__ - ) - - _get_some_state_from_cache = ( - DataStore._get_some_state_from_cache.__func__ - ) - - _get_state_group_for_events = ( - StateStore.__dict__["_get_state_group_for_events"] - ) - - _get_state_group_for_event = ( - StateStore.__dict__["_get_state_group_for_event"] - ) - - _get_state_groups_from_groups = ( - StateStore.__dict__["_get_state_groups_from_groups"] - ) - - _get_state_group_from_group = ( - StateStore.__dict__["_get_state_group_from_group"] - ) - get_global_account_data_by_type_for_users = ( AccountDataStore.__dict__["get_global_account_data_by_type_for_users"] ) diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 7ba7a6f6e4..0e29bd51d6 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -75,6 +75,18 @@ class SlavedEventStore(BaseSlavedStore): get_unread_event_push_actions_by_room_for_user = ( EventPushActionsStore.__dict__["get_unread_event_push_actions_by_room_for_user"] ) + _get_state_group_for_events = ( + StateStore.__dict__["_get_state_group_for_events"] + ) + _get_state_group_for_event = ( + StateStore.__dict__["_get_state_group_for_event"] + ) + _get_state_groups_from_groups = ( + StateStore.__dict__["_get_state_groups_from_groups"] + ) + _get_state_group_from_group = ( + StateStore.__dict__["_get_state_group_from_group"] + ) get_unread_push_actions_for_user_in_range = ( DataStore.get_unread_push_actions_for_user_in_range.__func__ @@ -96,6 +108,9 @@ class SlavedEventStore(BaseSlavedStore): get_room_events_stream_for_room = ( DataStore.get_room_events_stream_for_room.__func__ ) + get_events_around = DataStore.get_events_around.__func__ + get_state_for_events = DataStore.get_state_for_events.__func__ + get_state_groups = DataStore.get_state_groups.__func__ _set_before_and_after = DataStore._set_before_and_after @@ -116,6 +131,10 @@ class SlavedEventStore(BaseSlavedStore): DataStore._get_rooms_for_user_where_membership_is_txn.__func__ ) _get_members_rows_txn = DataStore._get_members_rows_txn.__func__ + _get_state_for_groups = DataStore._get_state_for_groups.__func__ + _get_all_state_from_cache = DataStore._get_all_state_from_cache.__func__ + _get_events_around_txn = DataStore._get_events_around_txn.__func__ + _get_some_state_from_cache = DataStore._get_some_state_from_cache.__func__ def stream_positions(self): result = super(SlavedEventStore, self).stream_positions() -- cgit 1.5.1 From f03ddc98ec38771a329f47c13c76dd4e93fbef16 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 13 May 2016 17:01:28 +0100 Subject: Use the SlavedAccountDataStore --- synapse/app/pusher.py | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) (limited to 'synapse/app') diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 9d41b62db5..8ff8329f8e 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -24,10 +24,10 @@ from synapse.config.emailconfig import EmailConfig from synapse.http.site import SynapseSite from synapse.metrics.resource import MetricsResource, METRICS_PREFIX from synapse.storage.roommember import RoomMemberStore -from synapse.storage.account_data import AccountDataStore from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.pushers import SlavedPusherStore from synapse.replication.slave.storage.receipts import SlavedReceiptsStore +from synapse.replication.slave.storage.account_data import SlavedAccountDataStore from synapse.storage.engines import create_engine from synapse.storage import DataStore from synapse.util.async import sleep @@ -100,7 +100,8 @@ class PusherSlaveConfig(SlaveConfig, LoggingConfig, EmailConfig): class PusherSlaveStore( - SlavedEventStore, SlavedPusherStore, SlavedReceiptsStore + SlavedEventStore, SlavedPusherStore, SlavedReceiptsStore, + SlavedAccountDataStore ): update_pusher_last_stream_ordering_and_success = ( DataStore.update_pusher_last_stream_ordering_and_success.__func__ @@ -130,14 +131,6 @@ class PusherSlaveStore( DataStore.get_profile_displayname.__func__ ) - get_global_account_data_by_type_for_users = ( - AccountDataStore.__dict__["get_global_account_data_by_type_for_users"] - ) - - get_global_account_data_by_type_for_user = ( - AccountDataStore.__dict__["get_global_account_data_by_type_for_user"] - ) - who_forgot_in_room = ( RoomMemberStore.__dict__["who_forgot_in_room"] ) -- cgit 1.5.1 From b3f29dc1e59ccb3e53a124d9c0d85d26377350f4 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 13 May 2016 17:16:27 +0100 Subject: Manually expire broken caches like the who_forgot_in_room --- synapse/app/pusher.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) (limited to 'synapse/app') diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 8ff8329f8e..135dd58c15 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -131,6 +131,11 @@ class PusherSlaveStore( DataStore.get_profile_displayname.__func__ ) + # XXX: This is a bit broken because we don't persist forgotten rooms + # in a way that they can be streamed. This means that we don't have a + # way to invalidate the forgotten rooms cache correctly. + # For now we expire the cache every 10 minutes. + BROKEN_CACHE_EXPIRY_MS = 60 * 60 * 1000 who_forgot_in_room = ( RoomMemberStore.__dict__["who_forgot_in_room"] ) @@ -214,6 +219,7 @@ class PusherServer(HomeServer): store = self.get_datastore() replication_url = self.config.replication_url pusher_pool = self.get_pusherpool() + clock = self.get_clock() def stop_pusher(user_id, app_id, pushkey): key = "%s:%s" % (app_id, pushkey) @@ -265,11 +271,21 @@ class PusherServer(HomeServer): min_stream_id, max_stream_id, affected_room_ids ) + def expire_broken_caches(): + store.who_forgot_in_room.invalidate_all() + + next_expire_broken_caches_ms = 0 while True: try: args = store.stream_positions() args["timeout"] = 30000 result = yield http_client.get_json(replication_url, args=args) + now_ms = clock.time_msec() + if now_ms > next_expire_broken_caches_ms: + expire_broken_caches() + next_expire_broken_caches_ms = ( + now_ms + store.BROKEN_CACHE_EXPIRY_MS + ) yield store.process_replication(result) poke_pushers(result) except: -- cgit 1.5.1 From c5c5a7403bd1cb3c9e3a3529d220cb1ea55e89df Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 16 May 2016 17:01:57 +0100 Subject: Make synctl read a cache factor from config file --- synapse/app/synctl.py | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'synapse/app') diff --git a/synapse/app/synctl.py b/synapse/app/synctl.py index ab3a31d7b7..669a3ae92a 100755 --- a/synapse/app/synctl.py +++ b/synapse/app/synctl.py @@ -66,6 +66,10 @@ def main(): config = yaml.load(open(configfile)) pidfile = config["pid_file"] + cache_factor = config.get("synctl_cache_factor", None) + + if cache_factor: + os.environ["SYNAPSE_CACHE_FACTOR"] = cache_factor action = sys.argv[1] if sys.argv[1:] else "usage" if action == "start": -- cgit 1.5.1 From c39f305067bf2d4527322922014ff682547de103 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 16 May 2016 17:21:30 +0100 Subject: os.environ requires a string --- synapse/app/synctl.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/app') diff --git a/synapse/app/synctl.py b/synapse/app/synctl.py index 669a3ae92a..39f4bf6e53 100755 --- a/synapse/app/synctl.py +++ b/synapse/app/synctl.py @@ -69,7 +69,7 @@ def main(): cache_factor = config.get("synctl_cache_factor", None) if cache_factor: - os.environ["SYNAPSE_CACHE_FACTOR"] = cache_factor + os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor) action = sys.argv[1] if sys.argv[1:] else "usage" if action == "start": -- cgit 1.5.1 From 1f31cc37f8611f9ae5612ef5be82e63735fbdf34 Mon Sep 17 00:00:00 2001 From: David Baker Date: Thu, 2 Jun 2016 17:21:31 +0100 Subject: Working unsubscribe links going straight to the HS and authed by macaroons that let you delete pushers and nothing else --- synapse/api/auth.py | 7 +++++++ synapse/app/pusher.py | 23 ++++++++++++++++++++++- synapse/push/mailer.py | 8 ++++---- synapse/rest/client/v1/pusher.py | 4 +++- 4 files changed, 36 insertions(+), 6 deletions(-) (limited to 'synapse/app') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 463bd8b692..31e1abb964 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -660,6 +660,13 @@ class Auth(object): "is_guest": True, "token_id": None, } + elif rights == "delete_pusher": + # We don't store these tokens in the database + ret = { + "user": user, + "is_guest": False, + "token_id": None, + } else: # This codepath exists so that we can actually return a # token ID, because we use token IDs in place of device diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 135dd58c15..f1de1e7ce9 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -21,6 +21,7 @@ from synapse.config._base import ConfigError from synapse.config.database import DatabaseConfig from synapse.config.logger import LoggingConfig from synapse.config.emailconfig import EmailConfig +from synapse.config.key import KeyConfig from synapse.http.site import SynapseSite from synapse.metrics.resource import MetricsResource, METRICS_PREFIX from synapse.storage.roommember import RoomMemberStore @@ -63,6 +64,26 @@ class SlaveConfig(DatabaseConfig): self.pid_file = self.abspath(config.get("pid_file")) self.public_baseurl = config["public_baseurl"] + # some things used by the auth handler but not actually used in the + # pusher codebase + self.bcrypt_rounds = None + self.ldap_enabled = None + self.ldap_server = None + self.ldap_port = None + self.ldap_tls = None + self.ldap_search_base = None + self.ldap_search_property = None + self.ldap_email_property = None + self.ldap_full_name_property = None + + # We would otherwise try to use the registration shared secret as the + # macaroon shared secret if there was no macaroon_shared_secret, but + # that means pulling in RegistrationConfig too. We don't need to be + # backwards compaitible in the pusher codebase so just make people set + # macaroon_shared_secret. We set this to None to prevent it referencing + # an undefined key. + self.registration_shared_secret = None + def default_config(self, server_name, **kwargs): pid_file = self.abspath("pusher.pid") return """\ @@ -95,7 +116,7 @@ class SlaveConfig(DatabaseConfig): """ % locals() -class PusherSlaveConfig(SlaveConfig, LoggingConfig, EmailConfig): +class PusherSlaveConfig(SlaveConfig, LoggingConfig, EmailConfig, KeyConfig): pass diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py index e877d8fdad..60d3700afa 100644 --- a/synapse/push/mailer.py +++ b/synapse/push/mailer.py @@ -81,7 +81,7 @@ class Mailer(object): def __init__(self, hs, app_name): self.hs = hs self.store = self.hs.get_datastore() - self.handlers = self.hs.get_handlers() + self.auth_handler = self.hs.get_auth_handler() self.state_handler = self.hs.get_state_handler() loader = jinja2.FileSystemLoader(self.hs.config.email_template_dir) self.app_name = app_name @@ -161,7 +161,7 @@ class Mailer(object): template_vars = { "user_display_name": user_display_name, - "unsubscribe_link": self.make_unsubscribe_link(app_id, email_address), + "unsubscribe_link": self.make_unsubscribe_link(user_id, app_id, email_address), "summary_text": summary_text, "app_name": self.app_name, "rooms": rooms, @@ -427,9 +427,9 @@ class Mailer(object): notif['room_id'], notif['event_id'] ) - def make_unsubscribe_link(self, app_id, email_address): + def make_unsubscribe_link(self, user_id, app_id, email_address): params = { - "access_token": self.handlers.auth.generate_delete_pusher_token(), + "access_token": self.auth_handler.generate_delete_pusher_token(user_id), "app_id": app_id, "pushkey": email_address, } diff --git a/synapse/rest/client/v1/pusher.py b/synapse/rest/client/v1/pusher.py index fa7a0992dd..9a2ed6ed88 100644 --- a/synapse/rest/client/v1/pusher.py +++ b/synapse/rest/client/v1/pusher.py @@ -149,11 +149,13 @@ class PushersRemoveRestServlet(RestServlet): def __init__(self, hs): super(RestServlet, self).__init__() + self.hs = hs self.notifier = hs.get_notifier() + self.auth = hs.get_v1auth() @defer.inlineCallbacks def on_GET(self, request): - requester = yield self.auth.get_user_by_req(request, "delete_pusher") + requester = yield self.auth.get_user_by_req(request, rights="delete_pusher") user = requester.user app_id = parse_string(request, "app_id", required=True) -- cgit 1.5.1 From abb151f3c9bf78f2825dba18da6bbc88ce61d32c Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 3 Jun 2016 11:57:26 +0100 Subject: Add a separate process that can handle /sync requests --- synapse/app/synchrotron.py | 467 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 467 insertions(+) create mode 100644 synapse/app/synchrotron.py (limited to 'synapse/app') diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py new file mode 100644 index 0000000000..f592ad352e --- /dev/null +++ b/synapse/app/synchrotron.py @@ -0,0 +1,467 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import synapse + +from synapse.api.constants import EventTypes +from synapse.config._base import ConfigError +from synapse.config.database import DatabaseConfig +from synapse.config.logger import LoggingConfig +from synapse.config.appservice import AppServiceConfig +from synapse.events import FrozenEvent +from synapse.handlers.presence import PresenceHandler +from synapse.http.site import SynapseSite +from synapse.http.server import JsonResource +from synapse.metrics.resource import MetricsResource, METRICS_PREFIX +from synapse.rest.client.v2_alpha import sync +from synapse.replication.slave.storage.events import SlavedEventStore +from synapse.replication.slave.storage.receipts import SlavedReceiptsStore +from synapse.replication.slave.storage.account_data import SlavedAccountDataStore +from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore +from synapse.replication.slave.storage.registration import SlavedRegistrationStore +from synapse.replication.slave.storage.filtering import SlavedFilteringStore +from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore +from synapse.replication.slave.storage.presence import SlavedPresenceStore +from synapse.server import HomeServer +from synapse.storage.engines import create_engine +from synapse.storage.presence import UserPresenceState +from synapse.storage.roommember import RoomMemberStore +from synapse.util.async import sleep +from synapse.util.httpresourcetree import create_resource_tree +from synapse.util.logcontext import LoggingContext +from synapse.util.manhole import manhole +from synapse.util.rlimit import change_resource_limit +from synapse.util.stringutils import random_string +from synapse.util.versionstring import get_version_string + +from twisted.internet import reactor, defer +from twisted.web.resource import Resource + +from daemonize import Daemonize + +import sys +import logging +import contextlib +import ujson as json + +logger = logging.getLogger("synapse.app.synchrotron") + + +class SynchrotronConfig(DatabaseConfig, LoggingConfig, AppServiceConfig): + def read_config(self, config): + self.replication_url = config["replication_url"] + self.server_name = config["server_name"] + self.use_insecure_ssl_client_just_for_testing_do_not_use = config.get( + "use_insecure_ssl_client_just_for_testing_do_not_use", False + ) + self.user_agent_suffix = None + self.listeners = config["listeners"] + self.soft_file_limit = config.get("soft_file_limit") + self.daemonize = config.get("daemonize") + self.pid_file = self.abspath(config.get("pid_file")) + self.macaroon_secret_key = config["macaroon_secret_key"] + self.expire_access_token = config.get("expire_access_token", False) + + def default_config(self, server_name, **kwargs): + pid_file = self.abspath("synchroton.pid") + return """\ + # Slave configuration + + # The replication listener on the synapse to talk to. + #replication_url: https://localhost:{replication_port}/_synapse/replication + + server_name: "%(server_name)s" + + listeners: + # Enable a /sync listener on the synchrontron + #- type: http + # port: {http_port} + # bind_address: "" + # Enable a ssh manhole listener on the synchrotron + # - type: manhole + # port: {manhole_port} + # bind_address: 127.0.0.1 + # Enable a metric listener on the synchrotron + # - type: http + # port: {metrics_port} + # bind_address: 127.0.0.1 + # resources: + # - names: ["metrics"] + # compress: False + + report_stats: False + + daemonize: False + + pid_file: %(pid_file)s + """ % locals() + + +class SynchrotronSlavedStore( + SlavedPushRuleStore, + SlavedEventStore, + SlavedReceiptsStore, + SlavedAccountDataStore, + SlavedApplicationServiceStore, + SlavedRegistrationStore, + SlavedFilteringStore, + SlavedPresenceStore, +): + def get_presence_list_accepted(self, user_localpart): + return () + + def insert_client_ip(self, user, access_token, ip, user_agent): + pass + + # XXX: This is a bit broken because we don't persist forgotten rooms + # in a way that they can be streamed. This means that we don't have a + # way to invalidate the forgotten rooms cache correctly. + # For now we expire the cache every 10 minutes. + BROKEN_CACHE_EXPIRY_MS = 60 * 60 * 1000 + who_forgot_in_room = ( + RoomMemberStore.__dict__["who_forgot_in_room"] + ) + + +class SynchrotronPresence(object): + def __init__(self, hs): + self.http_client = hs.get_simple_http_client() + self.store = hs.get_datastore() + self.user_to_num_current_syncs = {} + self.syncing_users_url = hs.config.replication_url + "/syncing_users" + self.clock = hs.get_clock() + + active_presence = self.store.take_presence_startup_info() + self.user_to_current_state = { + state.user_id: state + for state in active_presence + } + + self.process_id = random_string(16) + logger.info("Presence process_id is %r", self.process_id) + + def set_state(self, user, state): + # TODO Hows this supposed to work? + pass + + get_states = PresenceHandler.get_states.__func__ + current_state_for_users = PresenceHandler.current_state_for_users.__func__ + + @defer.inlineCallbacks + def user_syncing(self, user_id, affect_presence): + if affect_presence: + curr_sync = self.user_to_num_current_syncs.get(user_id, 0) + self.user_to_num_current_syncs[user_id] = curr_sync + 1 + # TODO: Send this less frequently. + # TODO: Make sure this doesn't race. Currently we can lose updates + # if two users come online in quick sucession and the second http + # to the master completes before the first. + # TODO: Don't block the sync request on this HTTP hit. + yield self._send_syncing_users() + + def _end(): + if affect_presence: + self.user_to_num_current_syncs[user_id] -= 1 + + @contextlib.contextmanager + def _user_syncing(): + try: + yield + finally: + _end() + + defer.returnValue(_user_syncing()) + + def _send_syncing_users(self): + return self.http_client.post_json_get_json(self.syncing_users_url, { + "process_id": self.process_id, + "syncing_users": [ + user_id for user_id, count in self.user_to_num_current_syncs.items() + if count > 0 + ], + }) + + def process_replication(self, result): + stream = result.get("presence", {"rows": []}) + for row in stream["rows"]: + ( + position, user_id, state, last_active_ts, + last_federation_update_ts, last_user_sync_ts, status_msg, + currently_active + ) = row + self.user_to_current_state[user_id] = UserPresenceState( + user_id, state, last_active_ts, + last_federation_update_ts, last_user_sync_ts, status_msg, + currently_active + ) + + +class SynchrotronTyping(object): + def __init__(self, hs): + self._latest_room_serial = 0 + self._room_serials = {} + self._room_typing = {} + + def stream_positions(self): + return {"typing": self._latest_room_serial} + + def process_replication(self, result): + stream = result.get("typing") + if stream: + self._latest_room_serial = int(stream["position"]) + + for row in stream["rows"]: + position, room_id, typing_json = row + typing = json.loads(typing_json) + self._room_serials[room_id] = position + self._room_typing[room_id] = typing + + +class SynchrotronApplicationService(object): + def notify_interested_services(self, event): + pass + + +class SynchrotronServer(HomeServer): + def get_db_conn(self, run_new_connection=True): + # Any param beginning with cp_ is a parameter for adbapi, and should + # not be passed to the database engine. + db_params = { + k: v for k, v in self.db_config.get("args", {}).items() + if not k.startswith("cp_") + } + db_conn = self.database_engine.module.connect(**db_params) + + if run_new_connection: + self.database_engine.on_new_connection(db_conn) + return db_conn + + def setup(self): + logger.info("Setting up.") + self.datastore = SynchrotronSlavedStore(self.get_db_conn(), self) + logger.info("Finished setting up.") + + def _listen_http(self, listener_config): + port = listener_config["port"] + bind_address = listener_config.get("bind_address", "") + site_tag = listener_config.get("tag", port) + resources = {} + for res in listener_config["resources"]: + for name in res["names"]: + if name == "metrics": + resources[METRICS_PREFIX] = MetricsResource(self) + elif name == "client": + resource = JsonResource(self, canonical_json=False) + sync.register_servlets(self, resource) + resources.update({ + "/_matrix/client/r0": resource, + "/_matrix/client/unstable": resource, + "/_matrix/client/v2_alpha": resource, + }) + + root_resource = create_resource_tree(resources, Resource()) + reactor.listenTCP( + port, + SynapseSite( + "synapse.access.http.%s" % (site_tag,), + site_tag, + listener_config, + root_resource, + ), + interface=bind_address + ) + logger.info("Synapse synchrotron now listening on port %d", port) + + def start_listening(self): + for listener in self.config.listeners: + if listener["type"] == "http": + self._listen_http(listener) + elif listener["type"] == "manhole": + reactor.listenTCP( + listener["port"], + manhole( + username="matrix", + password="rabbithole", + globals={"hs": self}, + ), + interface=listener.get("bind_address", '127.0.0.1') + ) + else: + logger.warn("Unrecognized listener type: %s", listener["type"]) + + @defer.inlineCallbacks + def replicate(self): + http_client = self.get_simple_http_client() + store = self.get_datastore() + replication_url = self.config.replication_url + clock = self.get_clock() + notifier = self.get_notifier() + presence_handler = self.get_presence_handler() + typing_handler = self.get_typing_handler() + + def expire_broken_caches(): + store.who_forgot_in_room.invalidate_all() + + def notify_from_stream( + result, stream_name, stream_key, room=None, user=None + ): + stream = result.get(stream_name) + if stream: + position_index = stream["field_names"].index("position") + if room: + room_index = stream["field_names"].index(room) + if user: + user_index = stream["field_names"].index(user) + + users = () + rooms = () + for row in stream["rows"]: + position = row[position_index] + + if user: + users = (row[user_index],) + + if room: + rooms = (row[room_index],) + + notifier.on_new_event( + stream_key, position, users=users, rooms=rooms + ) + + def notify(result): + stream = result.get("events") + if stream: + max_position = stream["position"] + for row in stream["rows"]: + position = row[0] + internal = json.loads(row[1]) + event_json = json.loads(row[2]) + event = FrozenEvent(event_json, internal_metadata_dict=internal) + extra_users = () + if event.type == EventTypes.Member: + extra_users = (event.state_key,) + notifier.on_new_room_event( + event, position, max_position, extra_users + ) + + notify_from_stream( + result, "push_rules", "push_rules_key", user="user_id" + ) + notify_from_stream( + result, "user_account_data", "account_data_key", user="user_id" + ) + notify_from_stream( + result, "room_account_data", "account_data_key", user="user_id" + ) + notify_from_stream( + result, "tag_account_data", "account_data_key", user="user_id" + ) + notify_from_stream( + result, "receipts", "receipt_key", room="room_id" + ) + notify_from_stream( + result, "typing", "typing_key", room="room_id" + ) + + next_expire_broken_caches_ms = 0 + while True: + try: + args = store.stream_positions() + args.update(typing_handler.stream_positions()) + args["timeout"] = 30000 + result = yield http_client.get_json(replication_url, args=args) + now_ms = clock.time_msec() + if now_ms > next_expire_broken_caches_ms: + expire_broken_caches() + next_expire_broken_caches_ms = ( + now_ms + store.BROKEN_CACHE_EXPIRY_MS + ) + yield store.process_replication(result) + typing_handler.process_replication(result) + presence_handler.process_replication(result) + notify(result) + except: + logger.exception("Error replicating from %r", replication_url) + sleep(5) + + def build_presence_handler(self): + return SynchrotronPresence(self) + + def build_typing_handler(self): + return SynchrotronTyping(self) + + +def setup(config_options): + try: + config = SynchrotronConfig.load_config( + "Synapse synchrotron", config_options + ) + except ConfigError as e: + sys.stderr.write("\n" + e.message + "\n") + sys.exit(1) + + if not config: + sys.exit(0) + + config.setup_logging() + + database_engine = create_engine(config.database_config) + + ss = SynchrotronServer( + config.server_name, + db_config=config.database_config, + config=config, + version_string=get_version_string("Synapse", synapse), + database_engine=database_engine, + application_service_handler=SynchrotronApplicationService(), + ) + + ss.setup() + ss.start_listening() + + change_resource_limit(ss.config.soft_file_limit) + + def start(): + ss.get_datastore().start_profiling() + ss.replicate() + + reactor.callWhenRunning(start) + + return ss + + +if __name__ == '__main__': + with LoggingContext("main"): + ps = setup(sys.argv[1:]) + + if ps.config.daemonize: + def run(): + with LoggingContext("run"): + change_resource_limit(ps.config.soft_file_limit) + reactor.run() + + daemon = Daemonize( + app="synapse-pusher", + pid=ps.config.pid_file, + action=run, + auto_close_fds=False, + verbose=True, + logger=logger, + ) + + daemon.start() + else: + reactor.run() -- cgit 1.5.1 From 80aade380545a0b661e2bbef48e175900ed4d41f Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 3 Jun 2016 14:24:19 +0100 Subject: Send updates to the syncing users every ten seconds or immediately if they've just come online --- synapse/app/synchrotron.py | 53 +++++++++++++++++++++++++++++++++++++--------- 1 file changed, 43 insertions(+), 10 deletions(-) (limited to 'synapse/app') diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index f592ad352e..7b45c87a96 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -16,7 +16,7 @@ import synapse -from synapse.api.constants import EventTypes +from synapse.api.constants import EventTypes, PresenceState from synapse.config._base import ConfigError from synapse.config.database import DatabaseConfig from synapse.config.logger import LoggingConfig @@ -41,7 +41,7 @@ from synapse.storage.presence import UserPresenceState from synapse.storage.roommember import RoomMemberStore from synapse.util.async import sleep from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext +from synapse.util.logcontext import LoggingContext, preserve_fn from synapse.util.manhole import manhole from synapse.util.rlimit import change_resource_limit from synapse.util.stringutils import random_string @@ -135,6 +135,8 @@ class SynchrotronSlavedStore( RoomMemberStore.__dict__["who_forgot_in_room"] ) +UPDATE_SYNCING_USERS_MS = 10 * 1000 + class SynchrotronPresence(object): def __init__(self, hs): @@ -153,6 +155,13 @@ class SynchrotronPresence(object): self.process_id = random_string(16) logger.info("Presence process_id is %r", self.process_id) + self._sending_sync = False + self._need_to_send_sync = False + self.clock.looping_call( + self._send_syncing_users_regularly, + UPDATE_SYNCING_USERS_MS, + ) + def set_state(self, user, state): # TODO Hows this supposed to work? pass @@ -165,12 +174,10 @@ class SynchrotronPresence(object): if affect_presence: curr_sync = self.user_to_num_current_syncs.get(user_id, 0) self.user_to_num_current_syncs[user_id] = curr_sync + 1 - # TODO: Send this less frequently. - # TODO: Make sure this doesn't race. Currently we can lose updates - # if two users come online in quick sucession and the second http - # to the master completes before the first. - # TODO: Don't block the sync request on this HTTP hit. - yield self._send_syncing_users() + prev_states = yield self.current_state_for_users([user_id]) + if prev_states[user_id].state == PresenceState.OFFLINE: + # TODO: Don't block the sync request on this HTTP hit. + yield self._send_syncing_users_now() def _end(): if affect_presence: @@ -185,8 +192,24 @@ class SynchrotronPresence(object): defer.returnValue(_user_syncing()) - def _send_syncing_users(self): - return self.http_client.post_json_get_json(self.syncing_users_url, { + def _send_syncing_users_regularly(self): + # Only send an update if we aren't in the middle of sending one. + if not self._sending_sync: + preserve_fn(self._send_syncing_users_now)() + + @defer.inlineCallbacks + def _send_syncing_users_now(self): + if self._sending_sync: + # We don't want to race with sending another update. + # Instead we wait for that update to finish and send another + # update afterwards. + self._need_to_send_sync = True + return + + # Flag that we are sending an update. + self._sending_sync = True + + yield self.http_client.post_json_get_json(self.syncing_users_url, { "process_id": self.process_id, "syncing_users": [ user_id for user_id, count in self.user_to_num_current_syncs.items() @@ -194,6 +217,16 @@ class SynchrotronPresence(object): ], }) + # Unset the flag as we are no longer sending an update. + self._sending_sync = False + if self._need_to_send_sync: + # If something happened while we were sending the update then + # we might need to send another update. + # TODO: Check if the update that was sent matches the current state + # as we only need to send an update if they are different. + self._need_to_send_sync = False + yield self._send_syncing_users_now() + def process_replication(self, result): stream = result.get("presence", {"rows": []}) for row in stream["rows"]: -- cgit 1.5.1 From 0b3c80a234cd8f16c8714af7e7b719dc2e635b20 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 3 Jun 2016 14:55:01 +0100 Subject: Use ClientIpStore to record client ips --- synapse/app/synchrotron.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) (limited to 'synapse/app') diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 7b45c87a96..0446a1643d 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -27,6 +27,7 @@ from synapse.http.site import SynapseSite from synapse.http.server import JsonResource from synapse.metrics.resource import MetricsResource, METRICS_PREFIX from synapse.rest.client.v2_alpha import sync +from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.account_data import SlavedAccountDataStore @@ -36,6 +37,7 @@ from synapse.replication.slave.storage.filtering import SlavedFilteringStore from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore from synapse.replication.slave.storage.presence import SlavedPresenceStore from synapse.server import HomeServer +from synapse.storage.client_ips import ClientIpStore from synapse.storage.engines import create_engine from synapse.storage.presence import UserPresenceState from synapse.storage.roommember import RoomMemberStore @@ -119,13 +121,12 @@ class SynchrotronSlavedStore( SlavedRegistrationStore, SlavedFilteringStore, SlavedPresenceStore, + BaseSlavedStore, + ClientIpStore, # After BaseSlavedStre because the constructor is different ): def get_presence_list_accepted(self, user_localpart): return () - def insert_client_ip(self, user, access_token, ip, user_agent): - pass - # XXX: This is a bit broken because we don't persist forgotten rooms # in a way that they can be streamed. This means that we don't have a # way to invalidate the forgotten rooms cache correctly. -- cgit 1.5.1 From da491e75b2d46c885f7fbb9240501c223e7c59bd Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 3 Jun 2016 14:56:36 +0100 Subject: Appease flake8 --- synapse/app/synchrotron.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/app') diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 0446a1643d..af06ce70d1 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -122,7 +122,7 @@ class SynchrotronSlavedStore( SlavedFilteringStore, SlavedPresenceStore, BaseSlavedStore, - ClientIpStore, # After BaseSlavedStre because the constructor is different + ClientIpStore, # After BaseSlavedStre because the constructor is different ): def get_presence_list_accepted(self, user_localpart): return () -- cgit 1.5.1 From 48340e4f13a8090feac070ebb507e7629d03b530 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 3 Jun 2016 15:02:27 +0100 Subject: Clear the list of ongoing syncs on shutdown --- synapse/app/synchrotron.py | 9 +++++++++ 1 file changed, 9 insertions(+) (limited to 'synapse/app') diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index af06ce70d1..f4b416f777 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -163,6 +163,8 @@ class SynchrotronPresence(object): UPDATE_SYNCING_USERS_MS, ) + reactor.addSystemEventTrigger("before", "shutdown", self._on_shutdown) + def set_state(self, user, state): # TODO Hows this supposed to work? pass @@ -193,6 +195,13 @@ class SynchrotronPresence(object): defer.returnValue(_user_syncing()) + @defer.inlineCallbacks + def _on_shutdown(self): + # When the synchrotron is shutdown tell the master to clear the in + # progress syncs for this process + self.user_to_num_current_syncs.clear() + yield self._send_syncing_users_now() + def _send_syncing_users_regularly(self): # Only send an update if we aren't in the middle of sending one. if not self._sending_sync: -- cgit 1.5.1 From 8f79084bd44f76223048c1bd6d836f904edcc95e Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 3 Jun 2016 18:03:40 +0100 Subject: Add get_presence_list_accepted to the broken caches in synchrotron --- synapse/app/synchrotron.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) (limited to 'synapse/app') diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index f4b416f777..c77854fab1 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -39,7 +39,7 @@ from synapse.replication.slave.storage.presence import SlavedPresenceStore from synapse.server import HomeServer from synapse.storage.client_ips import ClientIpStore from synapse.storage.engines import create_engine -from synapse.storage.presence import UserPresenceState +from synapse.storage.presence import PresenceStore, UserPresenceState from synapse.storage.roommember import RoomMemberStore from synapse.util.async import sleep from synapse.util.httpresourcetree import create_resource_tree @@ -124,9 +124,6 @@ class SynchrotronSlavedStore( BaseSlavedStore, ClientIpStore, # After BaseSlavedStre because the constructor is different ): - def get_presence_list_accepted(self, user_localpart): - return () - # XXX: This is a bit broken because we don't persist forgotten rooms # in a way that they can be streamed. This means that we don't have a # way to invalidate the forgotten rooms cache correctly. @@ -136,6 +133,13 @@ class SynchrotronSlavedStore( RoomMemberStore.__dict__["who_forgot_in_room"] ) + # XXX: This is a bit broken because we don't persist the accepted list in a + # way that can be replicated. This means that we don't have a way to + # invalidate the cache correctly. + get_presence_list_accepted = PresenceStore.__dict__[ + "get_presence_list_accepted" + ] + UPDATE_SYNCING_USERS_MS = 10 * 1000 @@ -357,6 +361,7 @@ class SynchrotronServer(HomeServer): def expire_broken_caches(): store.who_forgot_in_room.invalidate_all() + store.get_presence_list_accepted.invalidate_all() def notify_from_stream( result, stream_name, stream_key, room=None, user=None -- cgit 1.5.1 From ac9716f1546ae486cac435b8a577cc2c54b666d6 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 3 Jun 2016 18:10:00 +0100 Subject: Fix spelling --- synapse/app/synchrotron.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/app') diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index c77854fab1..aa81e1c5da 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -122,7 +122,7 @@ class SynchrotronSlavedStore( SlavedFilteringStore, SlavedPresenceStore, BaseSlavedStore, - ClientIpStore, # After BaseSlavedStre because the constructor is different + ClientIpStore, # After BaseSlavedStore because the constructor is different ): # XXX: This is a bit broken because we don't persist forgotten rooms # in a way that they can be streamed. This means that we don't have a -- cgit 1.5.1 From 5ef84da4f11f1b1cceb0c44d9867bb597ee68e64 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 6 Jun 2016 16:05:28 +0100 Subject: Yield on the sleeps intended to backoff replication --- synapse/app/pusher.py | 2 +- synapse/app/synchrotron.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/app') diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index f1de1e7ce9..3c3fa38053 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -311,7 +311,7 @@ class PusherServer(HomeServer): poke_pushers(result) except: logger.exception("Error replicating from %r", replication_url) - sleep(30) + yield sleep(30) def setup(config_options): diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index aa81e1c5da..7273055cc1 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -443,7 +443,7 @@ class SynchrotronServer(HomeServer): notify(result) except: logger.exception("Error replicating from %r", replication_url) - sleep(5) + yield sleep(5) def build_presence_handler(self): return SynchrotronPresence(self) -- cgit 1.5.1 From 4a5bbb1941ae63f1d6632aa35e80274e56c8dbb9 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 6 Jun 2016 16:37:12 +0100 Subject: Fix a KeyError in the synchrotron presence --- synapse/app/synchrotron.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'synapse/app') diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index aa81e1c5da..3d0d5cc15a 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -187,7 +187,10 @@ class SynchrotronPresence(object): yield self._send_syncing_users_now() def _end(): - if affect_presence: + # We check that the user_id is in user_to_num_current_syncs because + # user_to_num_current_syncs may have been cleared if we are + # shutting down. + if affect_presence and user_id in self.user_to_num_current_syncs: self.user_to_num_current_syncs[user_id] -= 1 @contextlib.contextmanager -- cgit 1.5.1 From dded389ac16ec023c986df400d25ca94a4a28677 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 7 Jun 2016 15:45:56 +0100 Subject: Allow setting of gc.set_thresholds --- synapse/app/homeserver.py | 5 +++++ synapse/app/pusher.py | 5 +++++ synapse/app/synchrotron.py | 15 ++++++++++----- synapse/config/server.py | 19 ++++++++++++++++++- 4 files changed, 38 insertions(+), 6 deletions(-) (limited to 'synapse/app') diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index df675c0ed4..22e1721fc4 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -16,6 +16,7 @@ import synapse +import gc import logging import os import sys @@ -351,6 +352,8 @@ class SynapseService(service.Service): def startService(self): hs = setup(self.config) change_resource_limit(hs.config.soft_file_limit) + if hs.config.gc_thresholds: + gc.set_threshold(*hs.config.gc_thresholds) def stopService(self): return self._port.stopListening() @@ -422,6 +425,8 @@ def run(hs): # sys.settrace(logcontext_tracer) with LoggingContext("run"): change_resource_limit(hs.config.soft_file_limit) + if hs.config.gc_thresholds: + gc.set_threshold(*hs.config.gc_thresholds) reactor.run() if hs.config.daemonize: diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 3c3fa38053..7e2bf7ecc2 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -43,6 +43,7 @@ from twisted.web.resource import Resource from daemonize import Daemonize +import gc import sys import logging @@ -342,6 +343,8 @@ def setup(config_options): ps.start_listening() change_resource_limit(ps.config.soft_file_limit) + if ps.config.gc_thresholds: + gc.set_threshold(*ps.config.gc_thresholds) def start(): ps.replicate() @@ -361,6 +364,8 @@ if __name__ == '__main__': def run(): with LoggingContext("run"): change_resource_limit(ps.config.soft_file_limit) + if ps.config.gc_thresholds: + gc.set_threshold(*ps.config.gc_thresholds) reactor.run() daemon = Daemonize( diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 5c552ffb29..f9673ab8d8 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -57,6 +57,7 @@ from daemonize import Daemonize import sys import logging import contextlib +import gc import ujson as json logger = logging.getLogger("synapse.app.synchrotron") @@ -484,6 +485,8 @@ def setup(config_options): ss.start_listening() change_resource_limit(ss.config.soft_file_limit) + if ss.config.gc_thresholds: + ss.set_threshold(*ss.config.gc_thresholds) def start(): ss.get_datastore().start_profiling() @@ -496,17 +499,19 @@ def setup(config_options): if __name__ == '__main__': with LoggingContext("main"): - ps = setup(sys.argv[1:]) + ss = setup(sys.argv[1:]) - if ps.config.daemonize: + if ss.config.daemonize: def run(): with LoggingContext("run"): - change_resource_limit(ps.config.soft_file_limit) + change_resource_limit(ss.config.soft_file_limit) + if ss.config.gc_thresholds: + gc.set_threshold(*ss.config.gc_thresholds) reactor.run() daemon = Daemonize( - app="synapse-pusher", - pid=ps.config.pid_file, + app="synapse-synchrotron", + pid=ss.config.pid_file, action=run, auto_close_fds=False, verbose=True, diff --git a/synapse/config/server.py b/synapse/config/server.py index c2d8f8a52f..44b8d422e0 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ._base import Config +from ._base import Config, ConfigError class ServerConfig(Config): @@ -38,6 +38,20 @@ class ServerConfig(Config): self.listeners = config.get("listeners", []) + thresholds = config.get("gc_thresholds", None) + if thresholds is not None: + try: + assert len(thresholds) == 3 + self.gc_thresholds = ( + int(thresholds[0]), int(thresholds[1]), int(thresholds[2]), + ) + except: + raise ConfigError( + "Value of `gc_threshold` must be a list of three integers if set" + ) + else: + self.gc_thresholds = None + bind_port = config.get("bind_port") if bind_port: self.listeners = [] @@ -157,6 +171,9 @@ class ServerConfig(Config): # hard limit. soft_file_limit: 0 + # The GC threshold parameters to pass to `gc.set_threshold`, if defined + # gc_thresholds: [700, 10, 10] + # A list of other Home Servers to fetch the public room directory from # and include in the public room directory of this home server # This is a temporary stopgap solution to populate new server with a -- cgit 1.5.1 From 2d1d1025fac846e2746dc627c0ebb6542c1488d3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 7 Jun 2016 16:26:25 +0100 Subject: Add gc_threshold to pusher and synchrotron --- synapse/app/pusher.py | 14 ++++++++++++++ synapse/app/synchrotron.py | 14 ++++++++++++++ 2 files changed, 28 insertions(+) (limited to 'synapse/app') diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 7e2bf7ecc2..4ec23d84c1 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -65,6 +65,20 @@ class SlaveConfig(DatabaseConfig): self.pid_file = self.abspath(config.get("pid_file")) self.public_baseurl = config["public_baseurl"] + thresholds = config.get("gc_thresholds", None) + if thresholds is not None: + try: + assert len(thresholds) == 3 + self.gc_thresholds = ( + int(thresholds[0]), int(thresholds[1]), int(thresholds[2]), + ) + except: + raise ConfigError( + "Value of `gc_threshold` must be a list of three integers if set" + ) + else: + self.gc_thresholds = None + # some things used by the auth handler but not actually used in the # pusher codebase self.bcrypt_rounds = None diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index f9673ab8d8..297e199453 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -78,6 +78,20 @@ class SynchrotronConfig(DatabaseConfig, LoggingConfig, AppServiceConfig): self.macaroon_secret_key = config["macaroon_secret_key"] self.expire_access_token = config.get("expire_access_token", False) + thresholds = config.get("gc_thresholds", None) + if thresholds is not None: + try: + assert len(thresholds) == 3 + self.gc_thresholds = ( + int(thresholds[0]), int(thresholds[1]), int(thresholds[2]), + ) + except: + raise ConfigError( + "Value of `gc_threshold` must be a list of three integers if set" + ) + else: + self.gc_thresholds = None + def default_config(self, server_name, **kwargs): pid_file = self.abspath("synchroton.pid") return """\ -- cgit 1.5.1