From 1f31cc37f8611f9ae5612ef5be82e63735fbdf34 Mon Sep 17 00:00:00 2001 From: David Baker Date: Thu, 2 Jun 2016 17:21:31 +0100 Subject: Working unsubscribe links going straight to the HS and authed by macaroons that let you delete pushers and nothing else --- synapse/app/pusher.py | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) (limited to 'synapse/app') diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 135dd58c15..f1de1e7ce9 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -21,6 +21,7 @@ from synapse.config._base import ConfigError from synapse.config.database import DatabaseConfig from synapse.config.logger import LoggingConfig from synapse.config.emailconfig import EmailConfig +from synapse.config.key import KeyConfig from synapse.http.site import SynapseSite from synapse.metrics.resource import MetricsResource, METRICS_PREFIX from synapse.storage.roommember import RoomMemberStore @@ -63,6 +64,26 @@ class SlaveConfig(DatabaseConfig): self.pid_file = self.abspath(config.get("pid_file")) self.public_baseurl = config["public_baseurl"] + # some things used by the auth handler but not actually used in the + # pusher codebase + self.bcrypt_rounds = None + self.ldap_enabled = None + self.ldap_server = None + self.ldap_port = None + self.ldap_tls = None + self.ldap_search_base = None + self.ldap_search_property = None + self.ldap_email_property = None + self.ldap_full_name_property = None + + # We would otherwise try to use the registration shared secret as the + # macaroon shared secret if there was no macaroon_shared_secret, but + # that means pulling in RegistrationConfig too. We don't need to be + # backwards compaitible in the pusher codebase so just make people set + # macaroon_shared_secret. We set this to None to prevent it referencing + # an undefined key. + self.registration_shared_secret = None + def default_config(self, server_name, **kwargs): pid_file = self.abspath("pusher.pid") return """\ @@ -95,7 +116,7 @@ class SlaveConfig(DatabaseConfig): """ % locals() -class PusherSlaveConfig(SlaveConfig, LoggingConfig, EmailConfig): +class PusherSlaveConfig(SlaveConfig, LoggingConfig, EmailConfig, KeyConfig): pass -- cgit 1.5.1 From abb151f3c9bf78f2825dba18da6bbc88ce61d32c Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 3 Jun 2016 11:57:26 +0100 Subject: Add a separate process that can handle /sync requests --- synapse/app/synchrotron.py | 467 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 467 insertions(+) create mode 100644 synapse/app/synchrotron.py (limited to 'synapse/app') diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py new file mode 100644 index 0000000000..f592ad352e --- /dev/null +++ b/synapse/app/synchrotron.py @@ -0,0 +1,467 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import synapse + +from synapse.api.constants import EventTypes +from synapse.config._base import ConfigError +from synapse.config.database import DatabaseConfig +from synapse.config.logger import LoggingConfig +from synapse.config.appservice import AppServiceConfig +from synapse.events import FrozenEvent +from synapse.handlers.presence import PresenceHandler +from synapse.http.site import SynapseSite +from synapse.http.server import JsonResource +from synapse.metrics.resource import MetricsResource, METRICS_PREFIX +from synapse.rest.client.v2_alpha import sync +from synapse.replication.slave.storage.events import SlavedEventStore +from synapse.replication.slave.storage.receipts import SlavedReceiptsStore +from synapse.replication.slave.storage.account_data import SlavedAccountDataStore +from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore +from synapse.replication.slave.storage.registration import SlavedRegistrationStore +from synapse.replication.slave.storage.filtering import SlavedFilteringStore +from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore +from synapse.replication.slave.storage.presence import SlavedPresenceStore +from synapse.server import HomeServer +from synapse.storage.engines import create_engine +from synapse.storage.presence import UserPresenceState +from synapse.storage.roommember import RoomMemberStore +from synapse.util.async import sleep +from synapse.util.httpresourcetree import create_resource_tree +from synapse.util.logcontext import LoggingContext +from synapse.util.manhole import manhole +from synapse.util.rlimit import change_resource_limit +from synapse.util.stringutils import random_string +from synapse.util.versionstring import get_version_string + +from twisted.internet import reactor, defer +from twisted.web.resource import Resource + +from daemonize import Daemonize + +import sys +import logging +import contextlib +import ujson as json + +logger = logging.getLogger("synapse.app.synchrotron") + + +class SynchrotronConfig(DatabaseConfig, LoggingConfig, AppServiceConfig): + def read_config(self, config): + self.replication_url = config["replication_url"] + self.server_name = config["server_name"] + self.use_insecure_ssl_client_just_for_testing_do_not_use = config.get( + "use_insecure_ssl_client_just_for_testing_do_not_use", False + ) + self.user_agent_suffix = None + self.listeners = config["listeners"] + self.soft_file_limit = config.get("soft_file_limit") + self.daemonize = config.get("daemonize") + self.pid_file = self.abspath(config.get("pid_file")) + self.macaroon_secret_key = config["macaroon_secret_key"] + self.expire_access_token = config.get("expire_access_token", False) + + def default_config(self, server_name, **kwargs): + pid_file = self.abspath("synchroton.pid") + return """\ + # Slave configuration + + # The replication listener on the synapse to talk to. + #replication_url: https://localhost:{replication_port}/_synapse/replication + + server_name: "%(server_name)s" + + listeners: + # Enable a /sync listener on the synchrontron + #- type: http + # port: {http_port} + # bind_address: "" + # Enable a ssh manhole listener on the synchrotron + # - type: manhole + # port: {manhole_port} + # bind_address: 127.0.0.1 + # Enable a metric listener on the synchrotron + # - type: http + # port: {metrics_port} + # bind_address: 127.0.0.1 + # resources: + # - names: ["metrics"] + # compress: False + + report_stats: False + + daemonize: False + + pid_file: %(pid_file)s + """ % locals() + + +class SynchrotronSlavedStore( + SlavedPushRuleStore, + SlavedEventStore, + SlavedReceiptsStore, + SlavedAccountDataStore, + SlavedApplicationServiceStore, + SlavedRegistrationStore, + SlavedFilteringStore, + SlavedPresenceStore, +): + def get_presence_list_accepted(self, user_localpart): + return () + + def insert_client_ip(self, user, access_token, ip, user_agent): + pass + + # XXX: This is a bit broken because we don't persist forgotten rooms + # in a way that they can be streamed. This means that we don't have a + # way to invalidate the forgotten rooms cache correctly. + # For now we expire the cache every 10 minutes. + BROKEN_CACHE_EXPIRY_MS = 60 * 60 * 1000 + who_forgot_in_room = ( + RoomMemberStore.__dict__["who_forgot_in_room"] + ) + + +class SynchrotronPresence(object): + def __init__(self, hs): + self.http_client = hs.get_simple_http_client() + self.store = hs.get_datastore() + self.user_to_num_current_syncs = {} + self.syncing_users_url = hs.config.replication_url + "/syncing_users" + self.clock = hs.get_clock() + + active_presence = self.store.take_presence_startup_info() + self.user_to_current_state = { + state.user_id: state + for state in active_presence + } + + self.process_id = random_string(16) + logger.info("Presence process_id is %r", self.process_id) + + def set_state(self, user, state): + # TODO Hows this supposed to work? + pass + + get_states = PresenceHandler.get_states.__func__ + current_state_for_users = PresenceHandler.current_state_for_users.__func__ + + @defer.inlineCallbacks + def user_syncing(self, user_id, affect_presence): + if affect_presence: + curr_sync = self.user_to_num_current_syncs.get(user_id, 0) + self.user_to_num_current_syncs[user_id] = curr_sync + 1 + # TODO: Send this less frequently. + # TODO: Make sure this doesn't race. Currently we can lose updates + # if two users come online in quick sucession and the second http + # to the master completes before the first. + # TODO: Don't block the sync request on this HTTP hit. + yield self._send_syncing_users() + + def _end(): + if affect_presence: + self.user_to_num_current_syncs[user_id] -= 1 + + @contextlib.contextmanager + def _user_syncing(): + try: + yield + finally: + _end() + + defer.returnValue(_user_syncing()) + + def _send_syncing_users(self): + return self.http_client.post_json_get_json(self.syncing_users_url, { + "process_id": self.process_id, + "syncing_users": [ + user_id for user_id, count in self.user_to_num_current_syncs.items() + if count > 0 + ], + }) + + def process_replication(self, result): + stream = result.get("presence", {"rows": []}) + for row in stream["rows"]: + ( + position, user_id, state, last_active_ts, + last_federation_update_ts, last_user_sync_ts, status_msg, + currently_active + ) = row + self.user_to_current_state[user_id] = UserPresenceState( + user_id, state, last_active_ts, + last_federation_update_ts, last_user_sync_ts, status_msg, + currently_active + ) + + +class SynchrotronTyping(object): + def __init__(self, hs): + self._latest_room_serial = 0 + self._room_serials = {} + self._room_typing = {} + + def stream_positions(self): + return {"typing": self._latest_room_serial} + + def process_replication(self, result): + stream = result.get("typing") + if stream: + self._latest_room_serial = int(stream["position"]) + + for row in stream["rows"]: + position, room_id, typing_json = row + typing = json.loads(typing_json) + self._room_serials[room_id] = position + self._room_typing[room_id] = typing + + +class SynchrotronApplicationService(object): + def notify_interested_services(self, event): + pass + + +class SynchrotronServer(HomeServer): + def get_db_conn(self, run_new_connection=True): + # Any param beginning with cp_ is a parameter for adbapi, and should + # not be passed to the database engine. + db_params = { + k: v for k, v in self.db_config.get("args", {}).items() + if not k.startswith("cp_") + } + db_conn = self.database_engine.module.connect(**db_params) + + if run_new_connection: + self.database_engine.on_new_connection(db_conn) + return db_conn + + def setup(self): + logger.info("Setting up.") + self.datastore = SynchrotronSlavedStore(self.get_db_conn(), self) + logger.info("Finished setting up.") + + def _listen_http(self, listener_config): + port = listener_config["port"] + bind_address = listener_config.get("bind_address", "") + site_tag = listener_config.get("tag", port) + resources = {} + for res in listener_config["resources"]: + for name in res["names"]: + if name == "metrics": + resources[METRICS_PREFIX] = MetricsResource(self) + elif name == "client": + resource = JsonResource(self, canonical_json=False) + sync.register_servlets(self, resource) + resources.update({ + "/_matrix/client/r0": resource, + "/_matrix/client/unstable": resource, + "/_matrix/client/v2_alpha": resource, + }) + + root_resource = create_resource_tree(resources, Resource()) + reactor.listenTCP( + port, + SynapseSite( + "synapse.access.http.%s" % (site_tag,), + site_tag, + listener_config, + root_resource, + ), + interface=bind_address + ) + logger.info("Synapse synchrotron now listening on port %d", port) + + def start_listening(self): + for listener in self.config.listeners: + if listener["type"] == "http": + self._listen_http(listener) + elif listener["type"] == "manhole": + reactor.listenTCP( + listener["port"], + manhole( + username="matrix", + password="rabbithole", + globals={"hs": self}, + ), + interface=listener.get("bind_address", '127.0.0.1') + ) + else: + logger.warn("Unrecognized listener type: %s", listener["type"]) + + @defer.inlineCallbacks + def replicate(self): + http_client = self.get_simple_http_client() + store = self.get_datastore() + replication_url = self.config.replication_url + clock = self.get_clock() + notifier = self.get_notifier() + presence_handler = self.get_presence_handler() + typing_handler = self.get_typing_handler() + + def expire_broken_caches(): + store.who_forgot_in_room.invalidate_all() + + def notify_from_stream( + result, stream_name, stream_key, room=None, user=None + ): + stream = result.get(stream_name) + if stream: + position_index = stream["field_names"].index("position") + if room: + room_index = stream["field_names"].index(room) + if user: + user_index = stream["field_names"].index(user) + + users = () + rooms = () + for row in stream["rows"]: + position = row[position_index] + + if user: + users = (row[user_index],) + + if room: + rooms = (row[room_index],) + + notifier.on_new_event( + stream_key, position, users=users, rooms=rooms + ) + + def notify(result): + stream = result.get("events") + if stream: + max_position = stream["position"] + for row in stream["rows"]: + position = row[0] + internal = json.loads(row[1]) + event_json = json.loads(row[2]) + event = FrozenEvent(event_json, internal_metadata_dict=internal) + extra_users = () + if event.type == EventTypes.Member: + extra_users = (event.state_key,) + notifier.on_new_room_event( + event, position, max_position, extra_users + ) + + notify_from_stream( + result, "push_rules", "push_rules_key", user="user_id" + ) + notify_from_stream( + result, "user_account_data", "account_data_key", user="user_id" + ) + notify_from_stream( + result, "room_account_data", "account_data_key", user="user_id" + ) + notify_from_stream( + result, "tag_account_data", "account_data_key", user="user_id" + ) + notify_from_stream( + result, "receipts", "receipt_key", room="room_id" + ) + notify_from_stream( + result, "typing", "typing_key", room="room_id" + ) + + next_expire_broken_caches_ms = 0 + while True: + try: + args = store.stream_positions() + args.update(typing_handler.stream_positions()) + args["timeout"] = 30000 + result = yield http_client.get_json(replication_url, args=args) + now_ms = clock.time_msec() + if now_ms > next_expire_broken_caches_ms: + expire_broken_caches() + next_expire_broken_caches_ms = ( + now_ms + store.BROKEN_CACHE_EXPIRY_MS + ) + yield store.process_replication(result) + typing_handler.process_replication(result) + presence_handler.process_replication(result) + notify(result) + except: + logger.exception("Error replicating from %r", replication_url) + sleep(5) + + def build_presence_handler(self): + return SynchrotronPresence(self) + + def build_typing_handler(self): + return SynchrotronTyping(self) + + +def setup(config_options): + try: + config = SynchrotronConfig.load_config( + "Synapse synchrotron", config_options + ) + except ConfigError as e: + sys.stderr.write("\n" + e.message + "\n") + sys.exit(1) + + if not config: + sys.exit(0) + + config.setup_logging() + + database_engine = create_engine(config.database_config) + + ss = SynchrotronServer( + config.server_name, + db_config=config.database_config, + config=config, + version_string=get_version_string("Synapse", synapse), + database_engine=database_engine, + application_service_handler=SynchrotronApplicationService(), + ) + + ss.setup() + ss.start_listening() + + change_resource_limit(ss.config.soft_file_limit) + + def start(): + ss.get_datastore().start_profiling() + ss.replicate() + + reactor.callWhenRunning(start) + + return ss + + +if __name__ == '__main__': + with LoggingContext("main"): + ps = setup(sys.argv[1:]) + + if ps.config.daemonize: + def run(): + with LoggingContext("run"): + change_resource_limit(ps.config.soft_file_limit) + reactor.run() + + daemon = Daemonize( + app="synapse-pusher", + pid=ps.config.pid_file, + action=run, + auto_close_fds=False, + verbose=True, + logger=logger, + ) + + daemon.start() + else: + reactor.run() -- cgit 1.5.1 From 80aade380545a0b661e2bbef48e175900ed4d41f Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 3 Jun 2016 14:24:19 +0100 Subject: Send updates to the syncing users every ten seconds or immediately if they've just come online --- synapse/app/synchrotron.py | 53 +++++++++++++++++++++++++++++++++++++--------- 1 file changed, 43 insertions(+), 10 deletions(-) (limited to 'synapse/app') diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index f592ad352e..7b45c87a96 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -16,7 +16,7 @@ import synapse -from synapse.api.constants import EventTypes +from synapse.api.constants import EventTypes, PresenceState from synapse.config._base import ConfigError from synapse.config.database import DatabaseConfig from synapse.config.logger import LoggingConfig @@ -41,7 +41,7 @@ from synapse.storage.presence import UserPresenceState from synapse.storage.roommember import RoomMemberStore from synapse.util.async import sleep from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext +from synapse.util.logcontext import LoggingContext, preserve_fn from synapse.util.manhole import manhole from synapse.util.rlimit import change_resource_limit from synapse.util.stringutils import random_string @@ -135,6 +135,8 @@ class SynchrotronSlavedStore( RoomMemberStore.__dict__["who_forgot_in_room"] ) +UPDATE_SYNCING_USERS_MS = 10 * 1000 + class SynchrotronPresence(object): def __init__(self, hs): @@ -153,6 +155,13 @@ class SynchrotronPresence(object): self.process_id = random_string(16) logger.info("Presence process_id is %r", self.process_id) + self._sending_sync = False + self._need_to_send_sync = False + self.clock.looping_call( + self._send_syncing_users_regularly, + UPDATE_SYNCING_USERS_MS, + ) + def set_state(self, user, state): # TODO Hows this supposed to work? pass @@ -165,12 +174,10 @@ class SynchrotronPresence(object): if affect_presence: curr_sync = self.user_to_num_current_syncs.get(user_id, 0) self.user_to_num_current_syncs[user_id] = curr_sync + 1 - # TODO: Send this less frequently. - # TODO: Make sure this doesn't race. Currently we can lose updates - # if two users come online in quick sucession and the second http - # to the master completes before the first. - # TODO: Don't block the sync request on this HTTP hit. - yield self._send_syncing_users() + prev_states = yield self.current_state_for_users([user_id]) + if prev_states[user_id].state == PresenceState.OFFLINE: + # TODO: Don't block the sync request on this HTTP hit. + yield self._send_syncing_users_now() def _end(): if affect_presence: @@ -185,8 +192,24 @@ class SynchrotronPresence(object): defer.returnValue(_user_syncing()) - def _send_syncing_users(self): - return self.http_client.post_json_get_json(self.syncing_users_url, { + def _send_syncing_users_regularly(self): + # Only send an update if we aren't in the middle of sending one. + if not self._sending_sync: + preserve_fn(self._send_syncing_users_now)() + + @defer.inlineCallbacks + def _send_syncing_users_now(self): + if self._sending_sync: + # We don't want to race with sending another update. + # Instead we wait for that update to finish and send another + # update afterwards. + self._need_to_send_sync = True + return + + # Flag that we are sending an update. + self._sending_sync = True + + yield self.http_client.post_json_get_json(self.syncing_users_url, { "process_id": self.process_id, "syncing_users": [ user_id for user_id, count in self.user_to_num_current_syncs.items() @@ -194,6 +217,16 @@ class SynchrotronPresence(object): ], }) + # Unset the flag as we are no longer sending an update. + self._sending_sync = False + if self._need_to_send_sync: + # If something happened while we were sending the update then + # we might need to send another update. + # TODO: Check if the update that was sent matches the current state + # as we only need to send an update if they are different. + self._need_to_send_sync = False + yield self._send_syncing_users_now() + def process_replication(self, result): stream = result.get("presence", {"rows": []}) for row in stream["rows"]: -- cgit 1.5.1 From 0b3c80a234cd8f16c8714af7e7b719dc2e635b20 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 3 Jun 2016 14:55:01 +0100 Subject: Use ClientIpStore to record client ips --- synapse/app/synchrotron.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) (limited to 'synapse/app') diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 7b45c87a96..0446a1643d 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -27,6 +27,7 @@ from synapse.http.site import SynapseSite from synapse.http.server import JsonResource from synapse.metrics.resource import MetricsResource, METRICS_PREFIX from synapse.rest.client.v2_alpha import sync +from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.account_data import SlavedAccountDataStore @@ -36,6 +37,7 @@ from synapse.replication.slave.storage.filtering import SlavedFilteringStore from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore from synapse.replication.slave.storage.presence import SlavedPresenceStore from synapse.server import HomeServer +from synapse.storage.client_ips import ClientIpStore from synapse.storage.engines import create_engine from synapse.storage.presence import UserPresenceState from synapse.storage.roommember import RoomMemberStore @@ -119,13 +121,12 @@ class SynchrotronSlavedStore( SlavedRegistrationStore, SlavedFilteringStore, SlavedPresenceStore, + BaseSlavedStore, + ClientIpStore, # After BaseSlavedStre because the constructor is different ): def get_presence_list_accepted(self, user_localpart): return () - def insert_client_ip(self, user, access_token, ip, user_agent): - pass - # XXX: This is a bit broken because we don't persist forgotten rooms # in a way that they can be streamed. This means that we don't have a # way to invalidate the forgotten rooms cache correctly. -- cgit 1.5.1 From da491e75b2d46c885f7fbb9240501c223e7c59bd Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 3 Jun 2016 14:56:36 +0100 Subject: Appease flake8 --- synapse/app/synchrotron.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/app') diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 0446a1643d..af06ce70d1 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -122,7 +122,7 @@ class SynchrotronSlavedStore( SlavedFilteringStore, SlavedPresenceStore, BaseSlavedStore, - ClientIpStore, # After BaseSlavedStre because the constructor is different + ClientIpStore, # After BaseSlavedStre because the constructor is different ): def get_presence_list_accepted(self, user_localpart): return () -- cgit 1.5.1 From 48340e4f13a8090feac070ebb507e7629d03b530 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 3 Jun 2016 15:02:27 +0100 Subject: Clear the list of ongoing syncs on shutdown --- synapse/app/synchrotron.py | 9 +++++++++ 1 file changed, 9 insertions(+) (limited to 'synapse/app') diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index af06ce70d1..f4b416f777 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -163,6 +163,8 @@ class SynchrotronPresence(object): UPDATE_SYNCING_USERS_MS, ) + reactor.addSystemEventTrigger("before", "shutdown", self._on_shutdown) + def set_state(self, user, state): # TODO Hows this supposed to work? pass @@ -193,6 +195,13 @@ class SynchrotronPresence(object): defer.returnValue(_user_syncing()) + @defer.inlineCallbacks + def _on_shutdown(self): + # When the synchrotron is shutdown tell the master to clear the in + # progress syncs for this process + self.user_to_num_current_syncs.clear() + yield self._send_syncing_users_now() + def _send_syncing_users_regularly(self): # Only send an update if we aren't in the middle of sending one. if not self._sending_sync: -- cgit 1.5.1 From 8f79084bd44f76223048c1bd6d836f904edcc95e Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 3 Jun 2016 18:03:40 +0100 Subject: Add get_presence_list_accepted to the broken caches in synchrotron --- synapse/app/synchrotron.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) (limited to 'synapse/app') diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index f4b416f777..c77854fab1 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -39,7 +39,7 @@ from synapse.replication.slave.storage.presence import SlavedPresenceStore from synapse.server import HomeServer from synapse.storage.client_ips import ClientIpStore from synapse.storage.engines import create_engine -from synapse.storage.presence import UserPresenceState +from synapse.storage.presence import PresenceStore, UserPresenceState from synapse.storage.roommember import RoomMemberStore from synapse.util.async import sleep from synapse.util.httpresourcetree import create_resource_tree @@ -124,9 +124,6 @@ class SynchrotronSlavedStore( BaseSlavedStore, ClientIpStore, # After BaseSlavedStre because the constructor is different ): - def get_presence_list_accepted(self, user_localpart): - return () - # XXX: This is a bit broken because we don't persist forgotten rooms # in a way that they can be streamed. This means that we don't have a # way to invalidate the forgotten rooms cache correctly. @@ -136,6 +133,13 @@ class SynchrotronSlavedStore( RoomMemberStore.__dict__["who_forgot_in_room"] ) + # XXX: This is a bit broken because we don't persist the accepted list in a + # way that can be replicated. This means that we don't have a way to + # invalidate the cache correctly. + get_presence_list_accepted = PresenceStore.__dict__[ + "get_presence_list_accepted" + ] + UPDATE_SYNCING_USERS_MS = 10 * 1000 @@ -357,6 +361,7 @@ class SynchrotronServer(HomeServer): def expire_broken_caches(): store.who_forgot_in_room.invalidate_all() + store.get_presence_list_accepted.invalidate_all() def notify_from_stream( result, stream_name, stream_key, room=None, user=None -- cgit 1.5.1 From ac9716f1546ae486cac435b8a577cc2c54b666d6 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 3 Jun 2016 18:10:00 +0100 Subject: Fix spelling --- synapse/app/synchrotron.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/app') diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index c77854fab1..aa81e1c5da 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -122,7 +122,7 @@ class SynchrotronSlavedStore( SlavedFilteringStore, SlavedPresenceStore, BaseSlavedStore, - ClientIpStore, # After BaseSlavedStre because the constructor is different + ClientIpStore, # After BaseSlavedStore because the constructor is different ): # XXX: This is a bit broken because we don't persist forgotten rooms # in a way that they can be streamed. This means that we don't have a -- cgit 1.5.1 From 5ef84da4f11f1b1cceb0c44d9867bb597ee68e64 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 6 Jun 2016 16:05:28 +0100 Subject: Yield on the sleeps intended to backoff replication --- synapse/app/pusher.py | 2 +- synapse/app/synchrotron.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/app') diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index f1de1e7ce9..3c3fa38053 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -311,7 +311,7 @@ class PusherServer(HomeServer): poke_pushers(result) except: logger.exception("Error replicating from %r", replication_url) - sleep(30) + yield sleep(30) def setup(config_options): diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index aa81e1c5da..7273055cc1 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -443,7 +443,7 @@ class SynchrotronServer(HomeServer): notify(result) except: logger.exception("Error replicating from %r", replication_url) - sleep(5) + yield sleep(5) def build_presence_handler(self): return SynchrotronPresence(self) -- cgit 1.5.1 From 4a5bbb1941ae63f1d6632aa35e80274e56c8dbb9 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 6 Jun 2016 16:37:12 +0100 Subject: Fix a KeyError in the synchrotron presence --- synapse/app/synchrotron.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'synapse/app') diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index aa81e1c5da..3d0d5cc15a 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -187,7 +187,10 @@ class SynchrotronPresence(object): yield self._send_syncing_users_now() def _end(): - if affect_presence: + # We check that the user_id is in user_to_num_current_syncs because + # user_to_num_current_syncs may have been cleared if we are + # shutting down. + if affect_presence and user_id in self.user_to_num_current_syncs: self.user_to_num_current_syncs[user_id] -= 1 @contextlib.contextmanager -- cgit 1.5.1 From dded389ac16ec023c986df400d25ca94a4a28677 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 7 Jun 2016 15:45:56 +0100 Subject: Allow setting of gc.set_thresholds --- synapse/app/homeserver.py | 5 +++++ synapse/app/pusher.py | 5 +++++ synapse/app/synchrotron.py | 15 ++++++++++----- synapse/config/server.py | 19 ++++++++++++++++++- 4 files changed, 38 insertions(+), 6 deletions(-) (limited to 'synapse/app') diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index df675c0ed4..22e1721fc4 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -16,6 +16,7 @@ import synapse +import gc import logging import os import sys @@ -351,6 +352,8 @@ class SynapseService(service.Service): def startService(self): hs = setup(self.config) change_resource_limit(hs.config.soft_file_limit) + if hs.config.gc_thresholds: + gc.set_threshold(*hs.config.gc_thresholds) def stopService(self): return self._port.stopListening() @@ -422,6 +425,8 @@ def run(hs): # sys.settrace(logcontext_tracer) with LoggingContext("run"): change_resource_limit(hs.config.soft_file_limit) + if hs.config.gc_thresholds: + gc.set_threshold(*hs.config.gc_thresholds) reactor.run() if hs.config.daemonize: diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 3c3fa38053..7e2bf7ecc2 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -43,6 +43,7 @@ from twisted.web.resource import Resource from daemonize import Daemonize +import gc import sys import logging @@ -342,6 +343,8 @@ def setup(config_options): ps.start_listening() change_resource_limit(ps.config.soft_file_limit) + if ps.config.gc_thresholds: + gc.set_threshold(*ps.config.gc_thresholds) def start(): ps.replicate() @@ -361,6 +364,8 @@ if __name__ == '__main__': def run(): with LoggingContext("run"): change_resource_limit(ps.config.soft_file_limit) + if ps.config.gc_thresholds: + gc.set_threshold(*ps.config.gc_thresholds) reactor.run() daemon = Daemonize( diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 5c552ffb29..f9673ab8d8 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -57,6 +57,7 @@ from daemonize import Daemonize import sys import logging import contextlib +import gc import ujson as json logger = logging.getLogger("synapse.app.synchrotron") @@ -484,6 +485,8 @@ def setup(config_options): ss.start_listening() change_resource_limit(ss.config.soft_file_limit) + if ss.config.gc_thresholds: + ss.set_threshold(*ss.config.gc_thresholds) def start(): ss.get_datastore().start_profiling() @@ -496,17 +499,19 @@ def setup(config_options): if __name__ == '__main__': with LoggingContext("main"): - ps = setup(sys.argv[1:]) + ss = setup(sys.argv[1:]) - if ps.config.daemonize: + if ss.config.daemonize: def run(): with LoggingContext("run"): - change_resource_limit(ps.config.soft_file_limit) + change_resource_limit(ss.config.soft_file_limit) + if ss.config.gc_thresholds: + gc.set_threshold(*ss.config.gc_thresholds) reactor.run() daemon = Daemonize( - app="synapse-pusher", - pid=ps.config.pid_file, + app="synapse-synchrotron", + pid=ss.config.pid_file, action=run, auto_close_fds=False, verbose=True, diff --git a/synapse/config/server.py b/synapse/config/server.py index c2d8f8a52f..44b8d422e0 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ._base import Config +from ._base import Config, ConfigError class ServerConfig(Config): @@ -38,6 +38,20 @@ class ServerConfig(Config): self.listeners = config.get("listeners", []) + thresholds = config.get("gc_thresholds", None) + if thresholds is not None: + try: + assert len(thresholds) == 3 + self.gc_thresholds = ( + int(thresholds[0]), int(thresholds[1]), int(thresholds[2]), + ) + except: + raise ConfigError( + "Value of `gc_threshold` must be a list of three integers if set" + ) + else: + self.gc_thresholds = None + bind_port = config.get("bind_port") if bind_port: self.listeners = [] @@ -157,6 +171,9 @@ class ServerConfig(Config): # hard limit. soft_file_limit: 0 + # The GC threshold parameters to pass to `gc.set_threshold`, if defined + # gc_thresholds: [700, 10, 10] + # A list of other Home Servers to fetch the public room directory from # and include in the public room directory of this home server # This is a temporary stopgap solution to populate new server with a -- cgit 1.5.1 From 2d1d1025fac846e2746dc627c0ebb6542c1488d3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 7 Jun 2016 16:26:25 +0100 Subject: Add gc_threshold to pusher and synchrotron --- synapse/app/pusher.py | 14 ++++++++++++++ synapse/app/synchrotron.py | 14 ++++++++++++++ 2 files changed, 28 insertions(+) (limited to 'synapse/app') diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 7e2bf7ecc2..4ec23d84c1 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -65,6 +65,20 @@ class SlaveConfig(DatabaseConfig): self.pid_file = self.abspath(config.get("pid_file")) self.public_baseurl = config["public_baseurl"] + thresholds = config.get("gc_thresholds", None) + if thresholds is not None: + try: + assert len(thresholds) == 3 + self.gc_thresholds = ( + int(thresholds[0]), int(thresholds[1]), int(thresholds[2]), + ) + except: + raise ConfigError( + "Value of `gc_threshold` must be a list of three integers if set" + ) + else: + self.gc_thresholds = None + # some things used by the auth handler but not actually used in the # pusher codebase self.bcrypt_rounds = None diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index f9673ab8d8..297e199453 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -78,6 +78,20 @@ class SynchrotronConfig(DatabaseConfig, LoggingConfig, AppServiceConfig): self.macaroon_secret_key = config["macaroon_secret_key"] self.expire_access_token = config.get("expire_access_token", False) + thresholds = config.get("gc_thresholds", None) + if thresholds is not None: + try: + assert len(thresholds) == 3 + self.gc_thresholds = ( + int(thresholds[0]), int(thresholds[1]), int(thresholds[2]), + ) + except: + raise ConfigError( + "Value of `gc_threshold` must be a list of three integers if set" + ) + else: + self.gc_thresholds = None + def default_config(self, server_name, **kwargs): pid_file = self.abspath("synchroton.pid") return """\ -- cgit 1.5.1 From 7dbb473339bc41daf6c05b64756f97e011f653f5 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 9 Jun 2016 18:50:38 +0100 Subject: Add function to load config without generating it Renames ``load_config`` to ``load_or_generate_config`` Adds a method called ``load_config`` that just loads the config. The main synapse.app.homeserver will continue to use ``load_or_generate_config`` to retain backwards compat. However new worker processes can use ``load_config`` to load the config avoiding some of the cruft needed to generate the config. As the new ``load_config`` method is expected to be used by new configs it removes support for the legacy commandline overrides that ``load_or_generate_config`` supports --- synapse/app/homeserver.py | 3 +- synapse/config/_base.py | 147 ++++++++++++++++++++++++++++++------------ tests/config/test_generate.py | 2 +- tests/config/test_load.py | 22 ++++++- 4 files changed, 126 insertions(+), 48 deletions(-) (limited to 'synapse/app') diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 22e1721fc4..40ffd9bf0d 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -266,10 +266,9 @@ def setup(config_options): HomeServer """ try: - config = HomeServerConfig.load_config( + config = HomeServerConfig.load_or_generate_config( "Synapse Homeserver", config_options, - generate_section="Homeserver" ) except ConfigError as e: sys.stderr.write("\n" + e.message + "\n") diff --git a/synapse/config/_base.py b/synapse/config/_base.py index 7449f36491..af9f17bf7b 100644 --- a/synapse/config/_base.py +++ b/synapse/config/_base.py @@ -157,9 +157,40 @@ class Config(object): return default_config, config @classmethod - def load_config(cls, description, argv, generate_section=None): + def load_config(cls, description, argv): + config_parser = argparse.ArgumentParser( + description=description, + ) + config_parser.add_argument( + "-c", "--config-path", + action="append", + metavar="CONFIG_FILE", + help="Specify config file. Can be given multiple times and" + " may specify directories containing *.yaml files." + ) + + config_parser.add_argument( + "--keys-directory", + metavar="DIRECTORY", + help="Where files such as certs and signing keys are stored when" + " their location is given explicitly in the config." + " Defaults to the directory containing the last config file", + ) + + config_args = config_parser.parse_args(argv) + + config_files = find_config_files(search_paths=config_args.config_path) + obj = cls() + obj.read_config_files( + config_files, + keys_directory=config_args.keys_directory, + generate_keys=False, + ) + return obj + @classmethod + def load_or_generate_config(cls, description, argv): config_parser = argparse.ArgumentParser(add_help=False) config_parser.add_argument( "-c", "--config-path", @@ -176,7 +207,7 @@ class Config(object): config_parser.add_argument( "--report-stats", action="store", - help="Stuff", + help="Whether the generated config reports anonymized usage statistics", choices=["yes", "no"] ) config_parser.add_argument( @@ -197,36 +228,11 @@ class Config(object): ) config_args, remaining_args = config_parser.parse_known_args(argv) + config_files = find_config_files(search_paths=config_args.config_path) + generate_keys = config_args.generate_keys - config_files = [] - if config_args.config_path: - for config_path in config_args.config_path: - if os.path.isdir(config_path): - # We accept specifying directories as config paths, we search - # inside that directory for all files matching *.yaml, and then - # we apply them in *sorted* order. - files = [] - for entry in os.listdir(config_path): - entry_path = os.path.join(config_path, entry) - if not os.path.isfile(entry_path): - print ( - "Found subdirectory in config directory: %r. IGNORING." - ) % (entry_path, ) - continue - - if not entry.endswith(".yaml"): - print ( - "Found file in config directory that does not" - " end in '.yaml': %r. IGNORING." - ) % (entry_path, ) - continue - - files.append(entry_path) - - config_files.extend(sorted(files)) - else: - config_files.append(config_path) + obj = cls() if config_args.generate_config: if config_args.report_stats is None: @@ -299,28 +305,43 @@ class Config(object): " -c CONFIG-FILE\"" ) - if config_args.keys_directory: - config_dir_path = config_args.keys_directory - else: - config_dir_path = os.path.dirname(config_args.config_path[-1]) - config_dir_path = os.path.abspath(config_dir_path) + obj.read_config_files( + config_files, + keys_directory=config_args.keys_directory, + generate_keys=generate_keys, + ) + + if generate_keys: + return None + + obj.invoke_all("read_arguments", args) + + return obj + + def read_config_files(self, config_files, keys_directory=None, + generate_keys=False): + if not keys_directory: + keys_directory = os.path.dirname(config_files[-1]) + + config_dir_path = os.path.abspath(keys_directory) specified_config = {} for config_file in config_files: - yaml_config = cls.read_config_file(config_file) + yaml_config = self.read_config_file(config_file) specified_config.update(yaml_config) if "server_name" not in specified_config: raise ConfigError(MISSING_SERVER_NAME) server_name = specified_config["server_name"] - _, config = obj.generate_config( + _, config = self.generate_config( config_dir_path=config_dir_path, server_name=server_name, is_generating_file=False, ) config.pop("log_config") config.update(specified_config) + if "report_stats" not in config: raise ConfigError( MISSING_REPORT_STATS_CONFIG_INSTRUCTIONS + "\n" + @@ -328,11 +349,51 @@ class Config(object): ) if generate_keys: - obj.invoke_all("generate_files", config) + self.invoke_all("generate_files", config) return - obj.invoke_all("read_config", config) - - obj.invoke_all("read_arguments", args) - - return obj + self.invoke_all("read_config", config) + + +def find_config_files(search_paths): + """Finds config files using a list of search paths. If a path is a file + then that file path is added to the list. If a search path is a directory + then all the "*.yaml" files in that directory are added to the list in + sorted order. + + Args: + search_paths(list(str)): A list of paths to search. + + Returns: + list(str): A list of file paths. + """ + + config_files = [] + if search_paths: + for config_path in search_paths: + if os.path.isdir(config_path): + # We accept specifying directories as config paths, we search + # inside that directory for all files matching *.yaml, and then + # we apply them in *sorted* order. + files = [] + for entry in os.listdir(config_path): + entry_path = os.path.join(config_path, entry) + if not os.path.isfile(entry_path): + print ( + "Found subdirectory in config directory: %r. IGNORING." + ) % (entry_path, ) + continue + + if not entry.endswith(".yaml"): + print ( + "Found file in config directory that does not" + " end in '.yaml': %r. IGNORING." + ) % (entry_path, ) + continue + + files.append(entry_path) + + config_files.extend(sorted(files)) + else: + config_files.append(config_path) + return config_files diff --git a/tests/config/test_generate.py b/tests/config/test_generate.py index 4329d73974..8f57fbeb23 100644 --- a/tests/config/test_generate.py +++ b/tests/config/test_generate.py @@ -30,7 +30,7 @@ class ConfigGenerationTestCase(unittest.TestCase): shutil.rmtree(self.dir) def test_generate_config_generates_files(self): - HomeServerConfig.load_config("", [ + HomeServerConfig.load_or_generate_config("", [ "--generate-config", "-c", self.file, "--report-stats=yes", diff --git a/tests/config/test_load.py b/tests/config/test_load.py index bf46233c5c..161a87d7e3 100644 --- a/tests/config/test_load.py +++ b/tests/config/test_load.py @@ -34,6 +34,8 @@ class ConfigLoadingTestCase(unittest.TestCase): self.generate_config_and_remove_lines_containing("server_name") with self.assertRaises(Exception): HomeServerConfig.load_config("", ["-c", self.file]) + with self.assertRaises(Exception): + HomeServerConfig.load_or_generate_config("", ["-c", self.file]) def test_generates_and_loads_macaroon_secret_key(self): self.generate_config() @@ -54,11 +56,24 @@ class ConfigLoadingTestCase(unittest.TestCase): "was: %r" % (config.macaroon_secret_key,) ) + config = HomeServerConfig.load_or_generate_config("", ["-c", self.file]) + self.assertTrue( + hasattr(config, "macaroon_secret_key"), + "Want config to have attr macaroon_secret_key" + ) + if len(config.macaroon_secret_key) < 5: + self.fail( + "Want macaroon secret key to be string of at least length 5," + "was: %r" % (config.macaroon_secret_key,) + ) + def test_load_succeeds_if_macaroon_secret_key_missing(self): self.generate_config_and_remove_lines_containing("macaroon") config1 = HomeServerConfig.load_config("", ["-c", self.file]) config2 = HomeServerConfig.load_config("", ["-c", self.file]) + config3 = HomeServerConfig.load_or_generate_config("", ["-c", self.file]) self.assertEqual(config1.macaroon_secret_key, config2.macaroon_secret_key) + self.assertEqual(config1.macaroon_secret_key, config3.macaroon_secret_key) def test_disable_registration(self): self.generate_config() @@ -70,14 +85,17 @@ class ConfigLoadingTestCase(unittest.TestCase): config = HomeServerConfig.load_config("", ["-c", self.file]) self.assertFalse(config.enable_registration) + config = HomeServerConfig.load_or_generate_config("", ["-c", self.file]) + self.assertFalse(config.enable_registration) + # Check that either config value is clobbered by the command line. - config = HomeServerConfig.load_config("", [ + config = HomeServerConfig.load_or_generate_config("", [ "-c", self.file, "--enable-registration" ]) self.assertTrue(config.enable_registration) def generate_config(self): - HomeServerConfig.load_config("", [ + HomeServerConfig.load_or_generate_config("", [ "--generate-config", "-c", self.file, "--report-stats=yes", -- cgit 1.5.1 From 885ee861f7270fef1370a2d63e009a8fceaf8dd5 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 16 Jun 2016 11:06:12 +0100 Subject: Inline the synchrotron and pusher configs into the main config --- synapse/app/pusher.py | 171 ++++++++++++------------------------------- synapse/app/synchrotron.py | 137 +++++++++------------------------- synapse/config/homeserver.py | 4 +- synapse/config/logger.py | 102 +++++++++++++------------- synapse/config/server.py | 31 ++++---- 5 files changed, 154 insertions(+), 291 deletions(-) (limited to 'synapse/app') diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 4ec23d84c1..6c8c02fb38 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -18,10 +18,9 @@ import synapse from synapse.server import HomeServer from synapse.config._base import ConfigError -from synapse.config.database import DatabaseConfig -from synapse.config.logger import LoggingConfig -from synapse.config.emailconfig import EmailConfig -from synapse.config.key import KeyConfig +from synapse.config.workers import clobber_with_worker_config +from synapse.config.logger import setup_logging +from synapse.config.homeserver import HomeServerConfig from synapse.http.site import SynapseSite from synapse.metrics.resource import MetricsResource, METRICS_PREFIX from synapse.storage.roommember import RoomMemberStore @@ -43,98 +42,12 @@ from twisted.web.resource import Resource from daemonize import Daemonize -import gc import sys import logging logger = logging.getLogger("synapse.app.pusher") -class SlaveConfig(DatabaseConfig): - def read_config(self, config): - self.replication_url = config["replication_url"] - self.server_name = config["server_name"] - self.use_insecure_ssl_client_just_for_testing_do_not_use = config.get( - "use_insecure_ssl_client_just_for_testing_do_not_use", False - ) - self.user_agent_suffix = None - self.start_pushers = True - self.listeners = config["listeners"] - self.soft_file_limit = config.get("soft_file_limit") - self.daemonize = config.get("daemonize") - self.pid_file = self.abspath(config.get("pid_file")) - self.public_baseurl = config["public_baseurl"] - - thresholds = config.get("gc_thresholds", None) - if thresholds is not None: - try: - assert len(thresholds) == 3 - self.gc_thresholds = ( - int(thresholds[0]), int(thresholds[1]), int(thresholds[2]), - ) - except: - raise ConfigError( - "Value of `gc_threshold` must be a list of three integers if set" - ) - else: - self.gc_thresholds = None - - # some things used by the auth handler but not actually used in the - # pusher codebase - self.bcrypt_rounds = None - self.ldap_enabled = None - self.ldap_server = None - self.ldap_port = None - self.ldap_tls = None - self.ldap_search_base = None - self.ldap_search_property = None - self.ldap_email_property = None - self.ldap_full_name_property = None - - # We would otherwise try to use the registration shared secret as the - # macaroon shared secret if there was no macaroon_shared_secret, but - # that means pulling in RegistrationConfig too. We don't need to be - # backwards compaitible in the pusher codebase so just make people set - # macaroon_shared_secret. We set this to None to prevent it referencing - # an undefined key. - self.registration_shared_secret = None - - def default_config(self, server_name, **kwargs): - pid_file = self.abspath("pusher.pid") - return """\ - # Slave configuration - - # The replication listener on the synapse to talk to. - #replication_url: https://localhost:{replication_port}/_synapse/replication - - server_name: "%(server_name)s" - - listeners: [] - # Enable a ssh manhole listener on the pusher. - # - type: manhole - # port: {manhole_port} - # bind_address: 127.0.0.1 - # Enable a metric listener on the pusher. - # - type: http - # port: {metrics_port} - # bind_address: 127.0.0.1 - # resources: - # - names: ["metrics"] - # compress: False - - report_stats: False - - daemonize: False - - pid_file: %(pid_file)s - - """ % locals() - - -class PusherSlaveConfig(SlaveConfig, LoggingConfig, EmailConfig, KeyConfig): - pass - - class PusherSlaveStore( SlavedEventStore, SlavedPusherStore, SlavedReceiptsStore, SlavedAccountDataStore @@ -232,8 +145,8 @@ class PusherServer(HomeServer): ) logger.info("Synapse pusher now listening on port %d", port) - def start_listening(self): - for listener in self.config.listeners: + def start_listening(self, listeners): + for listener in listeners: if listener["type"] == "http": self._listen_http(listener) elif listener["type"] == "manhole": @@ -329,19 +242,32 @@ class PusherServer(HomeServer): yield sleep(30) -def setup(config_options): +def setup(worker_name, config_options): try: - config = PusherSlaveConfig.load_config( + config = HomeServerConfig.load_config( "Synapse pusher", config_options ) except ConfigError as e: sys.stderr.write("\n" + e.message + "\n") sys.exit(1) - if not config: - sys.exit(0) + worker_config = config.workers[worker_name] - config.setup_logging() + setup_logging(worker_config.log_config, worker_config.log_file) + + clobber_with_worker_config(config, worker_config) + + if config.start_pushers: + sys.stderr.write( + "\nThe pushers must be disabled in the main synapse process" + "\nbefore they can be run in a separate worker." + "\nPlease add ``start_pushers: false`` to the main config" + "\n" + ) + sys.exit(1) + + # Force the pushers to start since they will be disabled in the main config + config.start_pushers = True database_engine = create_engine(config.database_config) @@ -354,11 +280,15 @@ def setup(config_options): ) ps.setup() - ps.start_listening() - - change_resource_limit(ps.config.soft_file_limit) - if ps.config.gc_thresholds: - gc.set_threshold(*ps.config.gc_thresholds) + ps.start_listening(worker_config.listeners) + + def run(): + with LoggingContext("run"): + logger.info("Running") + change_resource_limit(worker_config.soft_file_limit) + if worker_config.gc_thresholds: + ps.set_threshold(worker_config.gc_thresholds) + reactor.run() def start(): ps.replicate() @@ -367,30 +297,21 @@ def setup(config_options): reactor.callWhenRunning(start) - return ps + if worker_config.daemonize: + daemon = Daemonize( + app="synapse-pusher", + pid=worker_config.pid_file, + action=run, + auto_close_fds=False, + verbose=True, + logger=logger, + ) + daemon.start() + else: + run() if __name__ == '__main__': with LoggingContext("main"): - ps = setup(sys.argv[1:]) - - if ps.config.daemonize: - def run(): - with LoggingContext("run"): - change_resource_limit(ps.config.soft_file_limit) - if ps.config.gc_thresholds: - gc.set_threshold(*ps.config.gc_thresholds) - reactor.run() - - daemon = Daemonize( - app="synapse-pusher", - pid=ps.config.pid_file, - action=run, - auto_close_fds=False, - verbose=True, - logger=logger, - ) - - daemon.start() - else: - reactor.run() + worker_name = sys.argv[1] + ps = setup(worker_name, sys.argv[2:]) diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 297e199453..7a607faef6 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -18,9 +18,9 @@ import synapse from synapse.api.constants import EventTypes, PresenceState from synapse.config._base import ConfigError -from synapse.config.database import DatabaseConfig -from synapse.config.logger import LoggingConfig -from synapse.config.appservice import AppServiceConfig +from synapse.config.homeserver import HomeServerConfig +from synapse.config.logger import setup_logging +from synapse.config.workers import clobber_with_worker_config from synapse.events import FrozenEvent from synapse.handlers.presence import PresenceHandler from synapse.http.site import SynapseSite @@ -57,76 +57,11 @@ from daemonize import Daemonize import sys import logging import contextlib -import gc import ujson as json logger = logging.getLogger("synapse.app.synchrotron") -class SynchrotronConfig(DatabaseConfig, LoggingConfig, AppServiceConfig): - def read_config(self, config): - self.replication_url = config["replication_url"] - self.server_name = config["server_name"] - self.use_insecure_ssl_client_just_for_testing_do_not_use = config.get( - "use_insecure_ssl_client_just_for_testing_do_not_use", False - ) - self.user_agent_suffix = None - self.listeners = config["listeners"] - self.soft_file_limit = config.get("soft_file_limit") - self.daemonize = config.get("daemonize") - self.pid_file = self.abspath(config.get("pid_file")) - self.macaroon_secret_key = config["macaroon_secret_key"] - self.expire_access_token = config.get("expire_access_token", False) - - thresholds = config.get("gc_thresholds", None) - if thresholds is not None: - try: - assert len(thresholds) == 3 - self.gc_thresholds = ( - int(thresholds[0]), int(thresholds[1]), int(thresholds[2]), - ) - except: - raise ConfigError( - "Value of `gc_threshold` must be a list of three integers if set" - ) - else: - self.gc_thresholds = None - - def default_config(self, server_name, **kwargs): - pid_file = self.abspath("synchroton.pid") - return """\ - # Slave configuration - - # The replication listener on the synapse to talk to. - #replication_url: https://localhost:{replication_port}/_synapse/replication - - server_name: "%(server_name)s" - - listeners: - # Enable a /sync listener on the synchrontron - #- type: http - # port: {http_port} - # bind_address: "" - # Enable a ssh manhole listener on the synchrotron - # - type: manhole - # port: {manhole_port} - # bind_address: 127.0.0.1 - # Enable a metric listener on the synchrotron - # - type: http - # port: {metrics_port} - # bind_address: 127.0.0.1 - # resources: - # - names: ["metrics"] - # compress: False - - report_stats: False - - daemonize: False - - pid_file: %(pid_file)s - """ % locals() - - class SynchrotronSlavedStore( SlavedPushRuleStore, SlavedEventStore, @@ -350,8 +285,8 @@ class SynchrotronServer(HomeServer): ) logger.info("Synapse synchrotron now listening on port %d", port) - def start_listening(self): - for listener in self.config.listeners: + def start_listening(self, listeners): + for listener in listeners: if listener["type"] == "http": self._listen_http(listener) elif listener["type"] == "manhole": @@ -470,19 +405,20 @@ class SynchrotronServer(HomeServer): return SynchrotronTyping(self) -def setup(config_options): +def start(worker_name, config_options): try: - config = SynchrotronConfig.load_config( + config = HomeServerConfig.load_config( "Synapse synchrotron", config_options ) except ConfigError as e: sys.stderr.write("\n" + e.message + "\n") sys.exit(1) - if not config: - sys.exit(0) + worker_config = config.workers[worker_name] - config.setup_logging() + setup_logging(worker_config.log_config, worker_config.log_file) + + clobber_with_worker_config(config, worker_config) database_engine = create_engine(config.database_config) @@ -496,11 +432,15 @@ def setup(config_options): ) ss.setup() - ss.start_listening() - - change_resource_limit(ss.config.soft_file_limit) - if ss.config.gc_thresholds: - ss.set_threshold(*ss.config.gc_thresholds) + ss.start_listening(worker_config.listeners) + + def run(): + with LoggingContext("run"): + logger.info("Running") + change_resource_limit(worker_config.soft_file_limit) + if worker_config.gc_thresholds: + ss.set_threshold(worker_config.gc_thresholds) + reactor.run() def start(): ss.get_datastore().start_profiling() @@ -508,30 +448,21 @@ def setup(config_options): reactor.callWhenRunning(start) - return ss + if worker_config.daemonize: + daemon = Daemonize( + app="synapse-synchrotron", + pid=worker_config.pid_file, + action=run, + auto_close_fds=False, + verbose=True, + logger=logger, + ) + daemon.start() + else: + run() if __name__ == '__main__': with LoggingContext("main"): - ss = setup(sys.argv[1:]) - - if ss.config.daemonize: - def run(): - with LoggingContext("run"): - change_resource_limit(ss.config.soft_file_limit) - if ss.config.gc_thresholds: - gc.set_threshold(*ss.config.gc_thresholds) - reactor.run() - - daemon = Daemonize( - app="synapse-synchrotron", - pid=ss.config.pid_file, - action=run, - auto_close_fds=False, - verbose=True, - logger=logger, - ) - - daemon.start() - else: - reactor.run() + worker_name = sys.argv[1] + start(worker_name, sys.argv[2:]) diff --git a/synapse/config/homeserver.py b/synapse/config/homeserver.py index fc2445484c..79b0534b3b 100644 --- a/synapse/config/homeserver.py +++ b/synapse/config/homeserver.py @@ -32,13 +32,15 @@ from .password import PasswordConfig from .jwt import JWTConfig from .ldap import LDAPConfig from .emailconfig import EmailConfig +from .workers import WorkerConfig class HomeServerConfig(TlsConfig, ServerConfig, DatabaseConfig, LoggingConfig, RatelimitConfig, ContentRepositoryConfig, CaptchaConfig, VoipConfig, RegistrationConfig, MetricsConfig, ApiConfig, AppServiceConfig, KeyConfig, SAML2Config, CasConfig, - JWTConfig, LDAPConfig, PasswordConfig, EmailConfig,): + JWTConfig, LDAPConfig, PasswordConfig, EmailConfig, + WorkerConfig,): pass diff --git a/synapse/config/logger.py b/synapse/config/logger.py index 5047db898f..dc68683fbc 100644 --- a/synapse/config/logger.py +++ b/synapse/config/logger.py @@ -126,54 +126,58 @@ class LoggingConfig(Config): ) def setup_logging(self): - log_format = ( - "%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s" - " - %(message)s" - ) - if self.log_config is None: - - level = logging.INFO - level_for_storage = logging.INFO - if self.verbosity: - level = logging.DEBUG - if self.verbosity > 1: - level_for_storage = logging.DEBUG - - # FIXME: we need a logging.WARN for a -q quiet option - logger = logging.getLogger('') - logger.setLevel(level) - - logging.getLogger('synapse.storage').setLevel(level_for_storage) - - formatter = logging.Formatter(log_format) - if self.log_file: - # TODO: Customisable file size / backup count - handler = logging.handlers.RotatingFileHandler( - self.log_file, maxBytes=(1000 * 1000 * 100), backupCount=3 - ) - - def sighup(signum, stack): - logger.info("Closing log file due to SIGHUP") - handler.doRollover() - logger.info("Opened new log file due to SIGHUP") - - # TODO(paul): obviously this is a terrible mechanism for - # stealing SIGHUP, because it means no other part of synapse - # can use it instead. If we want to catch SIGHUP anywhere - # else as well, I'd suggest we find a nicer way to broadcast - # it around. - if getattr(signal, "SIGHUP"): - signal.signal(signal.SIGHUP, sighup) - else: - handler = logging.StreamHandler() - handler.setFormatter(formatter) - - handler.addFilter(LoggingContextFilter(request="")) - - logger.addHandler(handler) + setup_logging(self.log_config, self.log_file, self.verbosity) + + +def setup_logging(log_config=None, log_file=None, verbosity=None): + log_format = ( + "%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s" + " - %(message)s" + ) + if log_config is None: + + level = logging.INFO + level_for_storage = logging.INFO + if verbosity: + level = logging.DEBUG + if verbosity > 1: + level_for_storage = logging.DEBUG + + # FIXME: we need a logging.WARN for a -q quiet option + logger = logging.getLogger('') + logger.setLevel(level) + + logging.getLogger('synapse.storage').setLevel(level_for_storage) + + formatter = logging.Formatter(log_format) + if log_file: + # TODO: Customisable file size / backup count + handler = logging.handlers.RotatingFileHandler( + log_file, maxBytes=(1000 * 1000 * 100), backupCount=3 + ) + + def sighup(signum, stack): + logger.info("Closing log file due to SIGHUP") + handler.doRollover() + logger.info("Opened new log file due to SIGHUP") + + # TODO(paul): obviously this is a terrible mechanism for + # stealing SIGHUP, because it means no other part of synapse + # can use it instead. If we want to catch SIGHUP anywhere + # else as well, I'd suggest we find a nicer way to broadcast + # it around. + if getattr(signal, "SIGHUP"): + signal.signal(signal.SIGHUP, sighup) else: - with open(self.log_config, 'r') as f: - logging.config.dictConfig(yaml.load(f)) + handler = logging.StreamHandler() + handler.setFormatter(formatter) + + handler.addFilter(LoggingContextFilter(request="")) + + logger.addHandler(handler) + else: + with open(log_config, 'r') as f: + logging.config.dictConfig(yaml.load(f)) - observer = PythonLoggingObserver() - observer.start() + observer = PythonLoggingObserver() + observer.start() diff --git a/synapse/config/server.py b/synapse/config/server.py index 44b8d422e0..f370b22c32 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -38,19 +38,7 @@ class ServerConfig(Config): self.listeners = config.get("listeners", []) - thresholds = config.get("gc_thresholds", None) - if thresholds is not None: - try: - assert len(thresholds) == 3 - self.gc_thresholds = ( - int(thresholds[0]), int(thresholds[1]), int(thresholds[2]), - ) - except: - raise ConfigError( - "Value of `gc_threshold` must be a list of three integers if set" - ) - else: - self.gc_thresholds = None + self.gc_thresholds = read_gc_thresholds(config.get("gc_thresholds", None)) bind_port = config.get("bind_port") if bind_port: @@ -264,3 +252,20 @@ class ServerConfig(Config): type=int, help="Turn on the twisted telnet manhole" " service on the given port.") + + +def read_gc_thresholds(thresholds): + """Reads the three integer thresholds for garbage collection. Ensures that + the thresholds are integers if thresholds are supplied. + """ + if thresholds is None: + return None + try: + assert len(thresholds) == 3 + return ( + int(thresholds[0]), int(thresholds[1]), int(thresholds[2]), + ) + except: + raise ConfigError( + "Value of `gc_threshold` must be a list of three integers if set" + ) -- cgit 1.5.1 From bde13833cb42fc6e09928ffb4f4efad9244abffa Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 16 Jun 2016 12:44:40 +0100 Subject: Access replication_url from the worker config directly --- synapse/app/pusher.py | 5 +++-- synapse/app/synchrotron.py | 5 +++-- synapse/config/workers.py | 4 ---- 3 files changed, 6 insertions(+), 8 deletions(-) (limited to 'synapse/app') diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 6c8c02fb38..a26a3bd394 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -112,7 +112,7 @@ class PusherServer(HomeServer): def remove_pusher(self, app_id, push_key, user_id): http_client = self.get_simple_http_client() - replication_url = self.config.replication_url + replication_url = self.worker_config.replication_url url = replication_url + "/remove_pushers" return http_client.post_json_get_json(url, { "remove": [{ @@ -166,7 +166,7 @@ class PusherServer(HomeServer): def replicate(self): http_client = self.get_simple_http_client() store = self.get_datastore() - replication_url = self.config.replication_url + replication_url = self.worker_config.replication_url pusher_pool = self.get_pusherpool() clock = self.get_clock() @@ -275,6 +275,7 @@ def setup(worker_name, config_options): config.server_name, db_config=config.database_config, config=config, + worker_config=worker_config, version_string=get_version_string("Synapse", synapse), database_engine=database_engine, ) diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 7a607faef6..4443c73e6a 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -98,7 +98,7 @@ class SynchrotronPresence(object): self.http_client = hs.get_simple_http_client() self.store = hs.get_datastore() self.user_to_num_current_syncs = {} - self.syncing_users_url = hs.config.replication_url + "/syncing_users" + self.syncing_users_url = hs.worker_config.replication_url + "/syncing_users" self.clock = hs.get_clock() active_presence = self.store.take_presence_startup_info() @@ -306,7 +306,7 @@ class SynchrotronServer(HomeServer): def replicate(self): http_client = self.get_simple_http_client() store = self.get_datastore() - replication_url = self.config.replication_url + replication_url = self.worker_config.replication_url clock = self.get_clock() notifier = self.get_notifier() presence_handler = self.get_presence_handler() @@ -426,6 +426,7 @@ def start(worker_name, config_options): config.server_name, db_config=config.database_config, config=config, + worker_config=worker_config, version_string=get_version_string("Synapse", synapse), database_engine=database_engine, application_service_handler=SynchrotronApplicationService(), diff --git a/synapse/config/workers.py b/synapse/config/workers.py index 4f4658c0a8..f2c77ef59a 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -46,10 +46,6 @@ def clobber_with_worker_config(config, worker_config): # worker config directly. config.event_cache_size = worker_config.event_cache_size - # TODO: The replication_url should only be accessed within worker specific - # code so it really shouldn't need to be clobbered in the main config. - config.replication_url = worker_config.replication_url - def read_worker_config(config): return Worker( -- cgit 1.5.1 From 364d6167926d5d8b2a312e3d35623d2e05330e0a Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 16 Jun 2016 12:53:15 +0100 Subject: Access the event_cache_size directly from the server object. This means that the workers can override the event_cache_size directly without clobbering the value in the main synapse config. --- synapse/app/pusher.py | 6 +++--- synapse/app/synchrotron.py | 6 +++--- synapse/config/workers.py | 14 -------------- synapse/server.py | 3 +++ synapse/storage/_base.py | 2 +- 5 files changed, 10 insertions(+), 21 deletions(-) (limited to 'synapse/app') diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index a26a3bd394..5d4db4f892 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -18,7 +18,6 @@ import synapse from synapse.server import HomeServer from synapse.config._base import ConfigError -from synapse.config.workers import clobber_with_worker_config from synapse.config.logger import setup_logging from synapse.config.homeserver import HomeServerConfig from synapse.http.site import SynapseSite @@ -241,6 +240,9 @@ class PusherServer(HomeServer): logger.exception("Error replicating from %r", replication_url) yield sleep(30) + def get_event_cache_size(self): + return self.worker_config.event_cache_size + def setup(worker_name, config_options): try: @@ -255,8 +257,6 @@ def setup(worker_name, config_options): setup_logging(worker_config.log_config, worker_config.log_file) - clobber_with_worker_config(config, worker_config) - if config.start_pushers: sys.stderr.write( "\nThe pushers must be disabled in the main synapse process" diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 4443c73e6a..d10bb2b3f0 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -20,7 +20,6 @@ from synapse.api.constants import EventTypes, PresenceState from synapse.config._base import ConfigError from synapse.config.homeserver import HomeServerConfig from synapse.config.logger import setup_logging -from synapse.config.workers import clobber_with_worker_config from synapse.events import FrozenEvent from synapse.handlers.presence import PresenceHandler from synapse.http.site import SynapseSite @@ -404,6 +403,9 @@ class SynchrotronServer(HomeServer): def build_typing_handler(self): return SynchrotronTyping(self) + def get_event_cache_size(self): + return self.worker_config.event_cache_size + def start(worker_name, config_options): try: @@ -418,8 +420,6 @@ def start(worker_name, config_options): setup_logging(worker_config.log_config, worker_config.log_file) - clobber_with_worker_config(config, worker_config) - database_engine = create_engine(config.database_config) ss = SynchrotronServer( diff --git a/synapse/config/workers.py b/synapse/config/workers.py index f2c77ef59a..503358e03e 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -33,20 +33,6 @@ Worker = collections.namedtuple("Worker", [ ]) -def clobber_with_worker_config(config, worker_config): - """Overrides some of the keys of the main config with worker-specific - values. We only need to override the keys that are accessed deep - withing synapse code. Most of the keys that we want to override in - the workers are accessed in setup code that is rewritten specifically - for the workers. In that new code we can access the worker config directly, - so we don't need to override the values in the main config.""" - - # TODO: The event_cache_size is accessed in the db setup. It should be - # possible to rejigg that code so that the cache size is pulled from the - # worker config directly. - config.event_cache_size = worker_config.event_cache_size - - def read_worker_config(config): return Worker( app=config["app"], diff --git a/synapse/server.py b/synapse/server.py index dd4b81c658..b3c31ece73 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -236,6 +236,9 @@ class HomeServer(object): def remove_pusher(self, app_id, push_key, user_id): return self.get_pusherpool().remove_pusher(app_id, push_key, user_id) + def get_event_cache_size(self): + return self.config.event_cache_size + def _make_dependency_method(depname): def _get(hs): diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 32c6677d47..2932880cc5 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -166,7 +166,7 @@ class SQLBaseStore(object): self._get_event_counters = PerformanceCounters() self._get_event_cache = Cache("*getEvent*", keylen=3, lru=True, - max_entries=hs.config.event_cache_size) + max_entries=hs.get_event_cache_size()) self._state_group_cache = DictionaryCache( "*stateGroupCache*", 2000 * CACHE_SIZE_FACTOR -- cgit 1.5.1 From a352b68acf473f59012340b7f481f3dfd6544ac6 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 16 Jun 2016 17:29:50 +0100 Subject: Use worker_ prefixes for worker config, use existing support for multiple config files --- synapse/app/pusher.py | 29 ++++++++++++--------------- synapse/app/synchrotron.py | 29 ++++++++++++--------------- synapse/config/workers.py | 49 ++++++++-------------------------------------- synapse/server.py | 3 --- synapse/storage/_base.py | 2 +- 5 files changed, 33 insertions(+), 79 deletions(-) (limited to 'synapse/app') diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 5d4db4f892..9ac26d52c6 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -111,7 +111,7 @@ class PusherServer(HomeServer): def remove_pusher(self, app_id, push_key, user_id): http_client = self.get_simple_http_client() - replication_url = self.worker_config.replication_url + replication_url = self.config.worker_replication_url url = replication_url + "/remove_pushers" return http_client.post_json_get_json(url, { "remove": [{ @@ -165,7 +165,7 @@ class PusherServer(HomeServer): def replicate(self): http_client = self.get_simple_http_client() store = self.get_datastore() - replication_url = self.worker_config.replication_url + replication_url = self.config.worker_replication_url pusher_pool = self.get_pusherpool() clock = self.get_clock() @@ -240,11 +240,8 @@ class PusherServer(HomeServer): logger.exception("Error replicating from %r", replication_url) yield sleep(30) - def get_event_cache_size(self): - return self.worker_config.event_cache_size - -def setup(worker_name, config_options): +def start(config_options): try: config = HomeServerConfig.load_config( "Synapse pusher", config_options @@ -253,9 +250,9 @@ def setup(worker_name, config_options): sys.stderr.write("\n" + e.message + "\n") sys.exit(1) - worker_config = config.workers[worker_name] + assert config.worker_app == "synapse.app.pusher" - setup_logging(worker_config.log_config, worker_config.log_file) + setup_logging(config.worker_log_config, config.worker_log_file) if config.start_pushers: sys.stderr.write( @@ -275,20 +272,19 @@ def setup(worker_name, config_options): config.server_name, db_config=config.database_config, config=config, - worker_config=worker_config, version_string=get_version_string("Synapse", synapse), database_engine=database_engine, ) ps.setup() - ps.start_listening(worker_config.listeners) + ps.start_listening(config.worker_listeners) def run(): with LoggingContext("run"): logger.info("Running") - change_resource_limit(worker_config.soft_file_limit) - if worker_config.gc_thresholds: - ps.set_threshold(worker_config.gc_thresholds) + change_resource_limit(config.soft_file_limit) + if config.gc_thresholds: + ps.set_threshold(config.gc_thresholds) reactor.run() def start(): @@ -298,10 +294,10 @@ def setup(worker_name, config_options): reactor.callWhenRunning(start) - if worker_config.daemonize: + if config.worker_daemonize: daemon = Daemonize( app="synapse-pusher", - pid=worker_config.pid_file, + pid=config.worker_pid_file, action=run, auto_close_fds=False, verbose=True, @@ -314,5 +310,4 @@ def setup(worker_name, config_options): if __name__ == '__main__': with LoggingContext("main"): - worker_name = sys.argv[1] - ps = setup(worker_name, sys.argv[2:]) + ps = start(sys.argv[1:]) diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index d10bb2b3f0..160db8637e 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -97,7 +97,7 @@ class SynchrotronPresence(object): self.http_client = hs.get_simple_http_client() self.store = hs.get_datastore() self.user_to_num_current_syncs = {} - self.syncing_users_url = hs.worker_config.replication_url + "/syncing_users" + self.syncing_users_url = hs.config.worker_replication_url + "/syncing_users" self.clock = hs.get_clock() active_presence = self.store.take_presence_startup_info() @@ -305,7 +305,7 @@ class SynchrotronServer(HomeServer): def replicate(self): http_client = self.get_simple_http_client() store = self.get_datastore() - replication_url = self.worker_config.replication_url + replication_url = self.config.worker_replication_url clock = self.get_clock() notifier = self.get_notifier() presence_handler = self.get_presence_handler() @@ -403,11 +403,8 @@ class SynchrotronServer(HomeServer): def build_typing_handler(self): return SynchrotronTyping(self) - def get_event_cache_size(self): - return self.worker_config.event_cache_size - -def start(worker_name, config_options): +def start(config_options): try: config = HomeServerConfig.load_config( "Synapse synchrotron", config_options @@ -416,9 +413,9 @@ def start(worker_name, config_options): sys.stderr.write("\n" + e.message + "\n") sys.exit(1) - worker_config = config.workers[worker_name] + assert config.worker_app == "synapse.app.synchrotron" - setup_logging(worker_config.log_config, worker_config.log_file) + setup_logging(config.worker_log_config, config.worker_log_file) database_engine = create_engine(config.database_config) @@ -426,21 +423,20 @@ def start(worker_name, config_options): config.server_name, db_config=config.database_config, config=config, - worker_config=worker_config, version_string=get_version_string("Synapse", synapse), database_engine=database_engine, application_service_handler=SynchrotronApplicationService(), ) ss.setup() - ss.start_listening(worker_config.listeners) + ss.start_listening(config.worker_listeners) def run(): with LoggingContext("run"): logger.info("Running") - change_resource_limit(worker_config.soft_file_limit) - if worker_config.gc_thresholds: - ss.set_threshold(worker_config.gc_thresholds) + change_resource_limit(config.soft_file_limit) + if config.gc_thresholds: + ss.set_threshold(config.gc_thresholds) reactor.run() def start(): @@ -449,10 +445,10 @@ def start(worker_name, config_options): reactor.callWhenRunning(start) - if worker_config.daemonize: + if config.worker_daemonize: daemon = Daemonize( app="synapse-synchrotron", - pid=worker_config.pid_file, + pid=config.worker_pid_file, action=run, auto_close_fds=False, verbose=True, @@ -465,5 +461,4 @@ def start(worker_name, config_options): if __name__ == '__main__': with LoggingContext("main"): - worker_name = sys.argv[1] - start(worker_name, sys.argv[2:]) + start(sys.argv[1:]) diff --git a/synapse/config/workers.py b/synapse/config/workers.py index 503358e03e..904789d155 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -13,52 +13,19 @@ # See the License for the specific language governing permissions and # limitations under the License. -import collections - from ._base import Config -from .server import read_gc_thresholds - - -Worker = collections.namedtuple("Worker", [ - "app", - "listeners", - "pid_file", - "daemonize", - "log_file", - "log_config", - "event_cache_size", - "soft_file_limit", - "gc_thresholds", - "replication_url", -]) - - -def read_worker_config(config): - return Worker( - app=config["app"], - listeners=config.get("listeners", []), - pid_file=config.get("pid_file"), - daemonize=config["daemonize"], - log_file=config.get("log_file"), - log_config=config.get("log_config"), - event_cache_size=Config.parse_size(config.get("event_cache_size", "10K")), - soft_file_limit=config.get("soft_file_limit"), - gc_thresholds=read_gc_thresholds(config.get("gc_thresholds")), - replication_url=config.get("replication_url"), - ) class WorkerConfig(Config): """The workers are processes run separately to the main synapse process. - Each worker has a name that identifies it within the config file. They have their own pid_file and listener configuration. They use the - replication_url to talk to the main synapse process. They have their - own cache size tuning, gc threshold tuning and open file limits.""" + replication_url to talk to the main synapse process.""" def read_config(self, config): - workers = config.get("workers", {}) - - self.workers = { - worker_name: read_worker_config(worker_config) - for worker_name, worker_config in workers.items() - } + self.worker_app = config.get("worker_app") + self.worker_listeners = config.get("worker_listeners") + self.worker_daemonize = config.get("worker_daemonize") + self.worker_pid_file = config.get("worker_pid_file") + self.worker_log_file = config.get("worker_log_file") + self.worker_log_config = config.get("worker_log_config") + self.worker_replication_url = config.get("worker_replication_url") diff --git a/synapse/server.py b/synapse/server.py index b3c31ece73..dd4b81c658 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -236,9 +236,6 @@ class HomeServer(object): def remove_pusher(self, app_id, push_key, user_id): return self.get_pusherpool().remove_pusher(app_id, push_key, user_id) - def get_event_cache_size(self): - return self.config.event_cache_size - def _make_dependency_method(depname): def _get(hs): diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 2932880cc5..32c6677d47 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -166,7 +166,7 @@ class SQLBaseStore(object): self._get_event_counters = PerformanceCounters() self._get_event_cache = Cache("*getEvent*", keylen=3, lru=True, - max_entries=hs.get_event_cache_size()) + max_entries=hs.config.event_cache_size) self._state_group_cache = DictionaryCache( "*stateGroupCache*", 2000 * CACHE_SIZE_FACTOR -- cgit 1.5.1 From 8c75040c25495bf29f4c76ca0fcc032975210012 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 17 Jun 2016 11:48:12 +0100 Subject: Fix setting gc thresholds in the workers --- synapse/app/pusher.py | 3 ++- synapse/app/synchrotron.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) (limited to 'synapse/app') diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 9ac26d52c6..4f1d18ab5f 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -43,6 +43,7 @@ from daemonize import Daemonize import sys import logging +import gc logger = logging.getLogger("synapse.app.pusher") @@ -284,7 +285,7 @@ def start(config_options): logger.info("Running") change_resource_limit(config.soft_file_limit) if config.gc_thresholds: - ps.set_threshold(config.gc_thresholds) + gc.set_threshold(*config.gc_thresholds) reactor.run() def start(): diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 160db8637e..8cf5bbbb6d 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -56,6 +56,7 @@ from daemonize import Daemonize import sys import logging import contextlib +import gc import ujson as json logger = logging.getLogger("synapse.app.synchrotron") @@ -436,7 +437,7 @@ def start(config_options): logger.info("Running") change_resource_limit(config.soft_file_limit) if config.gc_thresholds: - ss.set_threshold(config.gc_thresholds) + gc.set_threshold(*config.gc_thresholds) reactor.run() def start(): -- cgit 1.5.1 From 13e334506cf9093d2872ede95f1527c0c42d71fd Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 21 Jun 2016 11:47:39 +0100 Subject: Remove the legacy v0 content upload API. The existing content can still be downloaded. The last upload to the matrix.org server was in January 2015, so it is probably safe to remove the upload API. --- synapse/app/homeserver.py | 3 +- synapse/config/server.py | 20 ----- synapse/rest/media/v0/content_repository.py | 112 +--------------------------- 3 files changed, 3 insertions(+), 132 deletions(-) (limited to 'synapse/app') diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 40ffd9bf0d..9c2dd32953 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -147,7 +147,7 @@ class SynapseHomeServer(HomeServer): MEDIA_PREFIX: media_repo, LEGACY_MEDIA_PREFIX: media_repo, CONTENT_REPO_PREFIX: ContentRepoResource( - self, self.config.uploads_path, self.auth, self.content_addr + self, self.config.uploads_path ), }) @@ -301,7 +301,6 @@ def setup(config_options): db_config=config.database_config, tls_server_context_factory=tls_server_context_factory, config=config, - content_addr=config.content_addr, version_string=version_string, database_engine=database_engine, ) diff --git a/synapse/config/server.py b/synapse/config/server.py index 7840dc3ad6..d7e6f20518 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -107,26 +107,6 @@ class ServerConfig(Config): ] }) - # Attempt to guess the content_addr for the v0 content repostitory - content_addr = config.get("content_addr") - if not content_addr: - for listener in self.listeners: - if listener["type"] == "http" and not listener.get("tls", False): - unsecure_port = listener["port"] - break - else: - raise RuntimeError("Could not determine 'content_addr'") - - host = self.server_name - if ':' not in host: - host = "%s:%d" % (host, unsecure_port) - else: - host = host.split(':')[0] - host = "%s:%d" % (host, unsecure_port) - content_addr = "http://%s" % (host,) - - self.content_addr = content_addr - def default_config(self, server_name, **kwargs): if ":" in server_name: bind_port = int(server_name.split(":")[1]) diff --git a/synapse/rest/media/v0/content_repository.py b/synapse/rest/media/v0/content_repository.py index d9fc045fc6..956bd5da75 100644 --- a/synapse/rest/media/v0/content_repository.py +++ b/synapse/rest/media/v0/content_repository.py @@ -15,14 +15,12 @@ from synapse.http.server import respond_with_json_bytes, finish_request -from synapse.util.stringutils import random_string from synapse.api.errors import ( - cs_exception, SynapseError, CodeMessageException, Codes, cs_error + Codes, cs_error ) from twisted.protocols.basic import FileSender from twisted.web import server, resource -from twisted.internet import defer import base64 import simplejson as json @@ -50,64 +48,10 @@ class ContentRepoResource(resource.Resource): """ isLeaf = True - def __init__(self, hs, directory, auth, external_addr): + def __init__(self, hs, directory): resource.Resource.__init__(self) self.hs = hs self.directory = directory - self.auth = auth - self.external_addr = external_addr.rstrip('/') - self.max_upload_size = hs.config.max_upload_size - - if not os.path.isdir(self.directory): - os.mkdir(self.directory) - logger.info("ContentRepoResource : Created %s directory.", - self.directory) - - @defer.inlineCallbacks - def map_request_to_name(self, request): - # auth the user - requester = yield self.auth.get_user_by_req(request) - - # namespace all file uploads on the user - prefix = base64.urlsafe_b64encode( - requester.user.to_string() - ).replace('=', '') - - # use a random string for the main portion - main_part = random_string(24) - - # suffix with a file extension if we can make one. This is nice to - # provide a hint to clients on the file information. We will also reuse - # this info to spit back the content type to the client. - suffix = "" - if request.requestHeaders.hasHeader("Content-Type"): - content_type = request.requestHeaders.getRawHeaders( - "Content-Type")[0] - suffix = "." + base64.urlsafe_b64encode(content_type) - if (content_type.split("/")[0].lower() in - ["image", "video", "audio"]): - file_ext = content_type.split("/")[-1] - # be a little paranoid and only allow a-z - file_ext = re.sub("[^a-z]", "", file_ext) - suffix += "." + file_ext - - file_name = prefix + main_part + suffix - file_path = os.path.join(self.directory, file_name) - logger.info("User %s is uploading a file to path %s", - request.user.user_id.to_string(), - file_path) - - # keep trying to make a non-clashing file, with a sensible max attempts - attempts = 0 - while os.path.exists(file_path): - main_part = random_string(24) - file_name = prefix + main_part + suffix - file_path = os.path.join(self.directory, file_name) - attempts += 1 - if attempts > 25: # really? Really? - raise SynapseError(500, "Unable to create file.") - - defer.returnValue(file_path) def render_GET(self, request): # no auth here on purpose, to allow anyone to view, even across home @@ -155,58 +99,6 @@ class ContentRepoResource(resource.Resource): return server.NOT_DONE_YET - def render_POST(self, request): - self._async_render(request) - return server.NOT_DONE_YET - def render_OPTIONS(self, request): respond_with_json_bytes(request, 200, {}, send_cors=True) return server.NOT_DONE_YET - - @defer.inlineCallbacks - def _async_render(self, request): - try: - # TODO: The checks here are a bit late. The content will have - # already been uploaded to a tmp file at this point - content_length = request.getHeader("Content-Length") - if content_length is None: - raise SynapseError( - msg="Request must specify a Content-Length", code=400 - ) - if int(content_length) > self.max_upload_size: - raise SynapseError( - msg="Upload request body is too large", - code=413, - ) - - fname = yield self.map_request_to_name(request) - - # TODO I have a suspicious feeling this is just going to block - with open(fname, "wb") as f: - f.write(request.content.read()) - - # FIXME (erikj): These should use constants. - file_name = os.path.basename(fname) - # FIXME: we can't assume what the repo's public mounted path is - # ...plus self-signed SSL won't work to remote clients anyway - # ...and we can't assume that it's SSL anyway, as we might want to - # serve it via the non-SSL listener... - url = "%s/_matrix/content/%s" % ( - self.external_addr, file_name - ) - - respond_with_json_bytes(request, 200, - json.dumps({"content_token": url}), - send_cors=True) - - except CodeMessageException as e: - logger.exception(e) - respond_with_json_bytes(request, e.code, - json.dumps(cs_exception(e))) - except Exception as e: - logger.error("Failed to store file: %s" % e) - respond_with_json_bytes( - request, - 500, - json.dumps({"error": "Internal server error"}), - send_cors=True) -- cgit 1.5.1 From 5cc7564c5c56880ff98af934b9169eac4fe895d3 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 21 Jun 2016 16:38:05 +0100 Subject: Optionally start or stop workers in synctl. Optionally start or stop an individual worker by passing -w with the path to the worker config. Optionally start or stop every worker and the main synapse by passing -a with a path to a directory containing worker configs. The "-w" is intended to be used to bounce individual workers proceses. THe "-a" is intended for when you want to restart all the workers simultaneuously, for example when performing database upgrades. --- synapse/app/synctl.py | 178 ++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 151 insertions(+), 27 deletions(-) (limited to 'synapse/app') diff --git a/synapse/app/synctl.py b/synapse/app/synctl.py index 39f4bf6e53..bb41962d47 100755 --- a/synapse/app/synctl.py +++ b/synapse/app/synctl.py @@ -14,11 +14,14 @@ # See the License for the specific language governing permissions and # limitations under the License. -import sys +import argparse +import collections +import glob import os import os.path -import subprocess import signal +import subprocess +import sys import yaml SYNAPSE = ["python", "-B", "-m", "synapse.app.homeserver"] @@ -28,60 +31,181 @@ RED = "\x1b[1;31m" NORMAL = "\x1b[m" +def write(message, colour=NORMAL, stream=sys.stdout): + if colour == NORMAL: + stream.write(message + "\n") + else: + stream.write(colour + message + NORMAL + "\n") + + def start(configfile): - print ("Starting ...") + write("Starting ...") args = SYNAPSE args.extend(["--daemonize", "-c", configfile]) try: subprocess.check_call(args) - print (GREEN + "started" + NORMAL) + write("started synapse.app.homeserver(%r)" % (configfile,), colour=GREEN) + except subprocess.CalledProcessError as e: + write( + "error starting (exit code: %d); see above for logs" % e.returncode, + colour=RED, + ) + + +def start_worker(app, configfile, worker_configfile): + args = [ + "python", "-B", + "-m", app, + "-c", configfile, + "-c", worker_configfile + ] + + try: + subprocess.check_call(args) + write("started %s(%r)" % (app, worker_configfile), colour=GREEN) except subprocess.CalledProcessError as e: - print ( - RED + - "error starting (exit code: %d); see above for logs" % e.returncode + - NORMAL + write( + "error starting %s(%r) (exit code: %d); see above for logs" % ( + app, worker_configfile, e.returncode, + ), + colour=RED, ) -def stop(pidfile): +def stop(pidfile, app): if os.path.exists(pidfile): pid = int(open(pidfile).read()) os.kill(pid, signal.SIGTERM) - print (GREEN + "stopped" + NORMAL) + write("stopped %s" % (app,), colour=GREEN) + + +Worker = collections.namedtuple("Worker", [ + "app", "configfile", "pidfile", "cache_factor" +]) def main(): - configfile = sys.argv[2] if len(sys.argv) == 3 else "homeserver.yaml" + + parser = argparse.ArgumentParser() + + parser.add_argument( + "action", + choices=["start", "stop", "restart"], + help="whether to start, stop or restart the synapse", + ) + parser.add_argument( + "configfile", + nargs="?", + default="homeserver.yaml", + help="the homeserver config file, defaults to homserver.yaml", + ) + parser.add_argument( + "-w", "--worker", + metavar="WORKERCONFIG", + help="start or stop a single worker", + ) + parser.add_argument( + "-a", "--all-processes", + metavar="WORKERCONFIGDIR", + help="start or stop all the workers in the given directory" + " and the main synapse process", + ) + + options = parser.parse_args() + + if options.worker and options.all_processes: + write( + 'Cannot use "--worker" with "--all-processes"', + stream=sys.stderr + ) + sys.exit(1) + + configfile = options.configfile if not os.path.exists(configfile): - sys.stderr.write( + write( "No config file found\n" "To generate a config file, run '%s -c %s --generate-config" " --server-name='\n" % ( - " ".join(SYNAPSE), configfile - ) + " ".join(SYNAPSE), options.configfile + ), + stream=sys.stderr, ) sys.exit(1) - config = yaml.load(open(configfile)) + with open(configfile) as stream: + config = yaml.load(stream) + pidfile = config["pid_file"] - cache_factor = config.get("synctl_cache_factor", None) + cache_factor = config.get("synctl_cache_factor") + start_stop_synapse = True if cache_factor: os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor) - action = sys.argv[1] if sys.argv[1:] else "usage" - if action == "start": - start(configfile) - elif action == "stop": - stop(pidfile) - elif action == "restart": - stop(pidfile) - start(configfile) - else: - sys.stderr.write("Usage: %s [start|stop|restart] [configfile]\n" % (sys.argv[0],)) - sys.exit(1) + worker_configfiles = [] + if options.worker: + start_stop_synapse = False + worker_configfile = options.worker + if not os.path.exists(worker_configfile): + write( + "No worker config found at %r" % (worker_configfile,), + stream=sys.stderr, + ) + sys.exit(1) + worker_configfiles.append(worker_configfile) + + if options.all_processes: + worker_configdir = options.all_processes + if not os.path.isdir(worker_configdir): + write( + "No worker config directory found at %r" % (worker_configdir,), + stream=sys.stderr, + ) + sys.exit(1) + worker_configfiles.extend(sorted(glob.glob( + os.path.join(worker_configdir, "*.yaml") + ))) + + workers = [] + for worker_configfile in worker_configfiles: + with open(worker_configfile) as stream: + worker_config = yaml.load(stream) + worker_app = worker_config["worker_app"] + worker_pidfile = worker_config["worker_pid_file"] + worker_daemonize = worker_config["worker_daemonize"] + assert worker_daemonize # TODO print something more user friendly + worker_cache_factor = worker_config.get("synctl_cache_factor") + workers.append(Worker( + worker_app, worker_configfile, worker_pidfile, worker_cache_factor, + )) + + action = options.action + + if action == "stop" or action == "restart": + for worker in workers: + stop(worker.pidfile, worker.app) + + if start_stop_synapse: + stop(pidfile, "synapse.app.homeserver") + + # TODO: Wait for synapse to actually shutdown before starting it again + + if action == "start" or action == "restart": + if start_stop_synapse: + start(configfile) + + for worker in workers: + if worker.cache_factor: + os.environ["SYNAPSE_CACHE_FACTOR"] = str(worker.cache_factor) + + start_worker(worker.app, configfile, worker.configfile) + + if cache_factor: + os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor) + else: + os.environ.pop("SYNAPSE_CACHE_FACTOR", None) if __name__ == "__main__": -- cgit 1.5.1 From 66868119dc3c42c3cc6ea0b41ade81285ef1c9de Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 20 Jul 2016 15:47:28 +0100 Subject: Add metrics for psutil derived memory usage --- synapse/app/homeserver.py | 3 +++ synapse/metrics/__init__.py | 9 ++++++++- synapse/metrics/metric.py | 38 ++++++++++++++++++++++++++++++++++++++ synapse/python_dependencies.py | 1 + 4 files changed, 50 insertions(+), 1 deletion(-) (limited to 'synapse/app') diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 9c2dd32953..fe68ceb07c 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -51,6 +51,7 @@ from synapse.api.urls import ( from synapse.config.homeserver import HomeServerConfig from synapse.crypto import context_factory from synapse.util.logcontext import LoggingContext +from synapse.metrics import register_memory_metrics from synapse.metrics.resource import MetricsResource, METRICS_PREFIX from synapse.replication.resource import ReplicationResource, REPLICATION_PREFIX from synapse.federation.transport.server import TransportLayerServer @@ -335,6 +336,8 @@ def setup(config_options): hs.get_datastore().start_doing_background_updates() hs.get_replication_layer().start_get_pdu_cache() + register_memory_metrics(hs) + reactor.callWhenRunning(start) return hs diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index bdd7292a30..cce3dba47c 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -27,7 +27,8 @@ import gc from twisted.internet import reactor from .metric import ( - CounterMetric, CallbackMetric, DistributionMetric, CacheMetric + CounterMetric, CallbackMetric, DistributionMetric, CacheMetric, + MemoryUsageMetric, ) @@ -66,6 +67,12 @@ class Metrics(object): return self._register(CacheMetric, *args, **kwargs) +def register_memory_metrics(hs): + metric = MemoryUsageMetric(hs) + all_metrics.append(metric) + return metric + + def get_metrics_for(pkg_name): """ Returns a Metrics instance for conveniently creating metrics namespaced with the given name prefix. """ diff --git a/synapse/metrics/metric.py b/synapse/metrics/metric.py index 341043952a..d100841a7f 100644 --- a/synapse/metrics/metric.py +++ b/synapse/metrics/metric.py @@ -16,6 +16,8 @@ from itertools import chain +import psutil + # TODO(paul): I can't believe Python doesn't have one of these def map_concat(func, items): @@ -153,3 +155,39 @@ class CacheMetric(object): """%s:total{name="%s"} %d""" % (self.name, self.cache_name, total), """%s:size{name="%s"} %d""" % (self.name, self.cache_name, size), ] + + +class MemoryUsageMetric(object): + """Keeps track of the current memory usage, using psutil. + + The class will keep the current min/max/sum/counts of rss over the last + WINDOW_SIZE_SEC, by polling UPDATE_HZ times per second + """ + + UPDATE_HZ = 2 # number of times to get memory per second + WINDOW_SIZE_SEC = 30 # the size of the window in seconds + + def __init__(self, hs): + clock = hs.get_clock() + self.memory_snapshots = [] + self.process = psutil.Process() + + clock.looping_call(self._update_curr_values, 1000 / self.UPDATE_HZ) + + def _update_curr_values(self): + max_size = self.UPDATE_HZ * self.WINDOW_SIZE_SEC + self.memory_snapshots.append(self.process.memory_info().rss) + self.memory_snapshots[:] = self.memory_snapshots[-max_size:] + + def render(self): + max_rss = max(self.memory_snapshots) + min_rss = min(self.memory_snapshots) + sum_rss = sum(self.memory_snapshots) + len_rss = len(self.memory_snapshots) + + return [ + "process_psutil_rss:max %d" % max_rss, + "process_psutil_rss:min %d" % min_rss, + "process_psutil_rss:total %d" % sum_rss, + "process_psutil_rss:count %d" % len_rss, + ] diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index e024cec0a2..799d35da5e 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -36,6 +36,7 @@ REQUIREMENTS = { "blist": ["blist"], "pysaml2>=3.0.0,<4.0.0": ["saml2>=3.0.0,<4.0.0"], "pymacaroons-pynacl": ["pymacaroons"], + "psutil>=2.0.0": ["psutil>=2.0.0"], } CONDITIONAL_REQUIREMENTS = { "web_client": { -- cgit 1.5.1 From aede7248ab04118b83d7787547b9cf3fd615e7ad Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 21 Jul 2016 17:37:44 +0100 Subject: Split out a FederationReader process --- synapse/app/federation_reader.py | 200 ++++++++++++++++++++++++++++ synapse/replication/slave/storage/events.py | 5 + synapse/replication/slave/storage/keys.py | 29 ++++ synapse/storage/keys.py | 4 + 4 files changed, 238 insertions(+) create mode 100644 synapse/app/federation_reader.py create mode 100644 synapse/replication/slave/storage/keys.py (limited to 'synapse/app') diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py new file mode 100644 index 0000000000..98a18f9b3d --- /dev/null +++ b/synapse/app/federation_reader.py @@ -0,0 +1,200 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import synapse + +from synapse.config._base import ConfigError +from synapse.config.homeserver import HomeServerConfig +from synapse.config.logger import setup_logging +from synapse.http.site import SynapseSite +from synapse.metrics.resource import MetricsResource, METRICS_PREFIX +from synapse.replication.slave.storage._base import BaseSlavedStore +from synapse.replication.slave.storage.events import SlavedEventStore +from synapse.replication.slave.storage.keys import SlavedKeyStore +from synapse.server import HomeServer +from synapse.storage.engines import create_engine +from synapse.util.async import sleep +from synapse.util.httpresourcetree import create_resource_tree +from synapse.util.logcontext import LoggingContext +from synapse.util.manhole import manhole +from synapse.util.rlimit import change_resource_limit +from synapse.util.versionstring import get_version_string +from synapse.api.urls import FEDERATION_PREFIX +from synapse.federation.transport.server import TransportLayerServer +from synapse.crypto import context_factory + + +from twisted.internet import reactor, defer +from twisted.web.resource import Resource + +from daemonize import Daemonize + +import sys +import logging +import gc + +logger = logging.getLogger("synapse.app.federation_reader") + + +class FederationReaderSlavedStore( + SlavedEventStore, + SlavedKeyStore, + BaseSlavedStore, +): + pass + + +class FederationReaderServer(HomeServer): + def get_db_conn(self, run_new_connection=True): + # Any param beginning with cp_ is a parameter for adbapi, and should + # not be passed to the database engine. + db_params = { + k: v for k, v in self.db_config.get("args", {}).items() + if not k.startswith("cp_") + } + db_conn = self.database_engine.module.connect(**db_params) + + if run_new_connection: + self.database_engine.on_new_connection(db_conn) + return db_conn + + def setup(self): + logger.info("Setting up.") + self.datastore = FederationReaderSlavedStore(self.get_db_conn(), self) + logger.info("Finished setting up.") + + def _listen_http(self, listener_config): + port = listener_config["port"] + bind_address = listener_config.get("bind_address", "") + site_tag = listener_config.get("tag", port) + resources = {} + for res in listener_config["resources"]: + for name in res["names"]: + if name == "metrics": + resources[METRICS_PREFIX] = MetricsResource(self) + elif name == "federation": + resources.update({ + FEDERATION_PREFIX: TransportLayerServer(self), + }) + + root_resource = create_resource_tree(resources, Resource()) + reactor.listenTCP( + port, + SynapseSite( + "synapse.access.http.%s" % (site_tag,), + site_tag, + listener_config, + root_resource, + ), + interface=bind_address + ) + logger.info("Synapse federation reader now listening on port %d", port) + + def start_listening(self, listeners): + for listener in listeners: + if listener["type"] == "http": + self._listen_http(listener) + elif listener["type"] == "manhole": + reactor.listenTCP( + listener["port"], + manhole( + username="matrix", + password="rabbithole", + globals={"hs": self}, + ), + interface=listener.get("bind_address", '127.0.0.1') + ) + else: + logger.warn("Unrecognized listener type: %s", listener["type"]) + + @defer.inlineCallbacks + def replicate(self): + http_client = self.get_simple_http_client() + store = self.get_datastore() + replication_url = self.config.worker_replication_url + + while True: + try: + args = store.stream_positions() + args["timeout"] = 30000 + result = yield http_client.get_json(replication_url, args=args) + yield store.process_replication(result) + except: + logger.exception("Error replicating from %r", replication_url) + yield sleep(5) + + +def start(config_options): + try: + config = HomeServerConfig.load_config( + "Synapse federation reader", config_options + ) + except ConfigError as e: + sys.stderr.write("\n" + e.message + "\n") + sys.exit(1) + + assert config.worker_app == "synapse.app.federation_reader" + + setup_logging(config.worker_log_config, config.worker_log_file) + + database_engine = create_engine(config.database_config) + + tls_server_context_factory = context_factory.ServerContextFactory(config) + + ss = FederationReaderServer( + config.server_name, + db_config=config.database_config, + tls_server_context_factory=tls_server_context_factory, + config=config, + version_string=get_version_string("Synapse", synapse), + database_engine=database_engine, + ) + + ss.setup() + ss.get_handlers() + ss.start_listening(config.worker_listeners) + + def run(): + with LoggingContext("run"): + logger.info("Running") + change_resource_limit(config.soft_file_limit) + if config.gc_thresholds: + gc.set_threshold(*config.gc_thresholds) + reactor.run() + + def start(): + ss.get_datastore().start_profiling() + ss.replicate() + + reactor.callWhenRunning(start) + + if config.worker_daemonize: + daemon = Daemonize( + app="synapse-federation-reader", + pid=config.worker_pid_file, + action=run, + auto_close_fds=False, + verbose=True, + logger=logger, + ) + daemon.start() + else: + run() + + +if __name__ == '__main__': + with LoggingContext("main"): + start(sys.argv[1:]) diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 369d839464..2ba1e6b803 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -142,6 +142,11 @@ class SlavedEventStore(BaseSlavedStore): _get_events_around_txn = DataStore._get_events_around_txn.__func__ _get_some_state_from_cache = DataStore._get_some_state_from_cache.__func__ + get_backfill_events = DataStore.get_backfill_events.__func__ + _get_backfill_events = DataStore._get_backfill_events.__func__ + get_missing_events = DataStore.get_missing_events.__func__ + _get_missing_events = DataStore._get_missing_events.__func__ + def stream_positions(self): result = super(SlavedEventStore, self).stream_positions() result["events"] = self._stream_id_gen.get_current_token() diff --git a/synapse/replication/slave/storage/keys.py b/synapse/replication/slave/storage/keys.py new file mode 100644 index 0000000000..c1c895439d --- /dev/null +++ b/synapse/replication/slave/storage/keys.py @@ -0,0 +1,29 @@ +# -*- coding: utf-8 -*- +# Copyright 2015, 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +from ._base import BaseSlavedStore +from synapse.storage import DataStore +from synapse.storage.keys import KeyStore + + +class SlavedKeyStore(BaseSlavedStore): + # TODO: use the cached version and invalidate deleted tokens + get_all_server_verify_keys = defer.inlineCallbacks(KeyStore.__dict__[ + "get_all_server_verify_keys" + ].orig) + + get_server_verify_keys = DataStore.get_server_verify_keys.__func__ diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py index a495a8a7d9..1195efec08 100644 --- a/synapse/storage/keys.py +++ b/synapse/storage/keys.py @@ -22,6 +22,10 @@ import OpenSSL from signedjson.key import decode_verify_key_bytes import hashlib +import logging + +logger = logging.getLogger(__name__) + class KeyStore(SQLBaseStore): """Persistence for signature verification keys and tls X.509 certificates -- cgit 1.5.1 From 05e7e5e972446b639997f0ea461c2eea39617342 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 26 Jul 2016 11:59:08 +0100 Subject: Fix flake8 violation Apparently flake8 v3 puts the error on a different line to v2. Easiest way to make sure that happens is by putting the whole statement on one line :) --- synapse/app/__init__.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) (limited to 'synapse/app') diff --git a/synapse/app/__init__.py b/synapse/app/__init__.py index 1bc4279807..9c2b627590 100644 --- a/synapse/app/__init__.py +++ b/synapse/app/__init__.py @@ -16,13 +16,11 @@ import sys sys.dont_write_bytecode = True -from synapse.python_dependencies import ( - check_requirements, MissingRequirementError -) # NOQA +from synapse import python_dependencies # noqa: E402 try: - check_requirements() -except MissingRequirementError as e: + python_dependencies.check_requirements() +except python_dependencies.MissingRequirementError as e: message = "\n".join([ "Missing Requirement: %s" % (e.message,), "To install run:", -- cgit 1.5.1 From 76b89d0edb9df7c5d8b595b85ff895367631fdf2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 28 Jul 2016 17:03:40 +0100 Subject: Add slace storage functions for public room list --- synapse/app/federation_reader.py | 4 ++++ synapse/replication/slave/storage/directory.py | 23 +++++++++++++++++++++++ synapse/replication/slave/storage/room.py | 21 +++++++++++++++++++++ 3 files changed, 48 insertions(+) create mode 100644 synapse/replication/slave/storage/directory.py create mode 100644 synapse/replication/slave/storage/room.py (limited to 'synapse/app') diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py index 98a18f9b3d..2e5ba09014 100644 --- a/synapse/app/federation_reader.py +++ b/synapse/app/federation_reader.py @@ -24,6 +24,8 @@ from synapse.metrics.resource import MetricsResource, METRICS_PREFIX from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.keys import SlavedKeyStore +from synapse.replication.slave.storage.room import RoomStore +from synapse.replication.slave.storage.directory import DirectoryStore from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.util.async import sleep @@ -52,6 +54,8 @@ logger = logging.getLogger("synapse.app.federation_reader") class FederationReaderSlavedStore( SlavedEventStore, SlavedKeyStore, + RoomStore, + DirectoryStore, BaseSlavedStore, ): pass diff --git a/synapse/replication/slave/storage/directory.py b/synapse/replication/slave/storage/directory.py new file mode 100644 index 0000000000..5fbe3a303a --- /dev/null +++ b/synapse/replication/slave/storage/directory.py @@ -0,0 +1,23 @@ +# -*- coding: utf-8 -*- +# Copyright 2015, 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import BaseSlavedStore +from synapse.storage.directory import DirectoryStore + + +class DirectoryStore(BaseSlavedStore): + get_aliases_for_room = DirectoryStore.__dict__[ + "get_aliases_for_room" + ].orig diff --git a/synapse/replication/slave/storage/room.py b/synapse/replication/slave/storage/room.py new file mode 100644 index 0000000000..d5bb0f98ea --- /dev/null +++ b/synapse/replication/slave/storage/room.py @@ -0,0 +1,21 @@ +# -*- coding: utf-8 -*- +# Copyright 2015, 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import BaseSlavedStore +from synapse.storage import DataStore + + +class RoomStore(BaseSlavedStore): + get_public_room_ids = DataStore.get_public_room_ids.__func__ -- cgit 1.5.1 From ec8b217722be15fe110be77c7c7909a7758202cb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 28 Jul 2016 17:35:53 +0100 Subject: Add destination retry to slave store --- synapse/app/federation_reader.py | 2 ++ synapse/replication/slave/storage/transactions.py | 30 +++++++++++++++++++++++ 2 files changed, 32 insertions(+) create mode 100644 synapse/replication/slave/storage/transactions.py (limited to 'synapse/app') diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py index 2e5ba09014..58d425f9ac 100644 --- a/synapse/app/federation_reader.py +++ b/synapse/app/federation_reader.py @@ -25,6 +25,7 @@ from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.keys import SlavedKeyStore from synapse.replication.slave.storage.room import RoomStore +from synapse.replication.slave.storage.transactions import TransactionStore from synapse.replication.slave.storage.directory import DirectoryStore from synapse.server import HomeServer from synapse.storage.engines import create_engine @@ -56,6 +57,7 @@ class FederationReaderSlavedStore( SlavedKeyStore, RoomStore, DirectoryStore, + TransactionStore, BaseSlavedStore, ): pass diff --git a/synapse/replication/slave/storage/transactions.py b/synapse/replication/slave/storage/transactions.py new file mode 100644 index 0000000000..6f2ba98af5 --- /dev/null +++ b/synapse/replication/slave/storage/transactions.py @@ -0,0 +1,30 @@ +# -*- coding: utf-8 -*- +# Copyright 2015, 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer +from ._base import BaseSlavedStore +from synapse.storage import DataStore +from synapse.storage.transactions import TransactionStore + + +class TransactionStore(BaseSlavedStore): + get_destination_retry_timings = TransactionStore.__dict__[ + "get_destination_retry_timings" + ].orig + _get_destination_retry_timings = DataStore._get_destination_retry_timings.__func__ + + # For now, don't record the destination rety timings + def set_destination_retry_timings(*args, **kwargs): + return defer.succeed(None) -- cgit 1.5.1 From 24f36469bc5c634ff49c87e49e32579d6ac43d7c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 5 Aug 2016 16:36:07 +0100 Subject: Add federation /version API --- synapse/app/federation_reader.py | 2 +- synapse/app/homeserver.py | 2 +- synapse/app/pusher.py | 2 +- synapse/app/synchrotron.py | 2 +- synapse/federation/transport/server.py | 18 +++++++++++++++++- synapse/util/versionstring.py | 8 ++++---- 6 files changed, 25 insertions(+), 9 deletions(-) (limited to 'synapse/app') diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py index 58d425f9ac..7355499ae2 100644 --- a/synapse/app/federation_reader.py +++ b/synapse/app/federation_reader.py @@ -165,7 +165,7 @@ def start(config_options): db_config=config.database_config, tls_server_context_factory=tls_server_context_factory, config=config, - version_string=get_version_string("Synapse", synapse), + version_string="Synapse/" + get_version_string(synapse), database_engine=database_engine, ) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index fe68ceb07c..40e6f65236 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -285,7 +285,7 @@ def setup(config_options): # check any extra requirements we have now we have a config check_requirements(config) - version_string = get_version_string("Synapse", synapse) + version_string = "Synapse/" + get_version_string(synapse) logger.info("Server hostname: %s", config.server_name) logger.info("Server version: %s", version_string) diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 4f1d18ab5f..c8dde0fcb8 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -273,7 +273,7 @@ def start(config_options): config.server_name, db_config=config.database_config, config=config, - version_string=get_version_string("Synapse", synapse), + version_string="Synapse/" + get_version_string(synapse), database_engine=database_engine, ) diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 8cf5bbbb6d..215ccfd522 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -424,7 +424,7 @@ def start(config_options): config.server_name, db_config=config.database_config, config=config, - version_string=get_version_string("Synapse", synapse), + version_string="Synapse/" + get_version_string(synapse), database_engine=database_engine, application_service_handler=SynchrotronApplicationService(), ) diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index ee8f94e340..37c0d4fbc4 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -20,11 +20,12 @@ from synapse.api.errors import Codes, SynapseError from synapse.http.server import JsonResource from synapse.http.servlet import parse_json_object_from_request from synapse.util.ratelimitutils import FederationRateLimiter +from synapse.util.versionstring import get_version_string import functools import logging -import simplejson as json import re +import synapse logger = logging.getLogger(__name__) @@ -557,6 +558,20 @@ class PublicRoomList(BaseFederationServlet): defer.returnValue((200, data)) +class FederationVersionServlet(BaseFederationServlet): + PATH = "/version" + + REQUIRE_AUTH = False + + def on_GET(self, origin, content, query): + return defer.succeed((200, { + "server": { + "name": "Synapse", + "version": get_version_string(synapse) + }, + })) + + SERVLET_CLASSES = ( FederationSendServlet, FederationPullServlet, @@ -580,6 +595,7 @@ SERVLET_CLASSES = ( On3pidBindServlet, OpenIdUserInfo, PublicRoomList, + FederationVersionServlet, ) diff --git a/synapse/util/versionstring.py b/synapse/util/versionstring.py index a4f156cb3b..52086df465 100644 --- a/synapse/util/versionstring.py +++ b/synapse/util/versionstring.py @@ -21,7 +21,7 @@ import logging logger = logging.getLogger(__name__) -def get_version_string(name, module): +def get_version_string(module): try: null = open(os.devnull, 'w') cwd = os.path.dirname(os.path.abspath(module.__file__)) @@ -74,11 +74,11 @@ def get_version_string(name, module): ) return ( - "%s/%s (%s)" % ( - name, module.__version__, git_version, + "%s (%s)" % ( + module.__version__, git_version, ) ).encode("ascii") except Exception as e: logger.info("Failed to check for git repository: %s", e) - return ("%s/%s" % (name, module.__version__,)).encode("ascii") + return module.__version__.encode("ascii") -- cgit 1.5.1