From abb151f3c9bf78f2825dba18da6bbc88ce61d32c Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 3 Jun 2016 11:57:26 +0100 Subject: Add a separate process that can handle /sync requests --- synapse/app/synchrotron.py | 467 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 467 insertions(+) create mode 100644 synapse/app/synchrotron.py (limited to 'synapse/app/synchrotron.py') diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py new file mode 100644 index 0000000000..f592ad352e --- /dev/null +++ b/synapse/app/synchrotron.py @@ -0,0 +1,467 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import synapse + +from synapse.api.constants import EventTypes +from synapse.config._base import ConfigError +from synapse.config.database import DatabaseConfig +from synapse.config.logger import LoggingConfig +from synapse.config.appservice import AppServiceConfig +from synapse.events import FrozenEvent +from synapse.handlers.presence import PresenceHandler +from synapse.http.site import SynapseSite +from synapse.http.server import JsonResource +from synapse.metrics.resource import MetricsResource, METRICS_PREFIX +from synapse.rest.client.v2_alpha import sync +from synapse.replication.slave.storage.events import SlavedEventStore +from synapse.replication.slave.storage.receipts import SlavedReceiptsStore +from synapse.replication.slave.storage.account_data import SlavedAccountDataStore +from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore +from synapse.replication.slave.storage.registration import SlavedRegistrationStore +from synapse.replication.slave.storage.filtering import SlavedFilteringStore +from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore +from synapse.replication.slave.storage.presence import SlavedPresenceStore +from synapse.server import HomeServer +from synapse.storage.engines import create_engine +from synapse.storage.presence import UserPresenceState +from synapse.storage.roommember import RoomMemberStore +from synapse.util.async import sleep +from synapse.util.httpresourcetree import create_resource_tree +from synapse.util.logcontext import LoggingContext +from synapse.util.manhole import manhole +from synapse.util.rlimit import change_resource_limit +from synapse.util.stringutils import random_string +from synapse.util.versionstring import get_version_string + +from twisted.internet import reactor, defer +from twisted.web.resource import Resource + +from daemonize import Daemonize + +import sys +import logging +import contextlib +import ujson as json + +logger = logging.getLogger("synapse.app.synchrotron") + + +class SynchrotronConfig(DatabaseConfig, LoggingConfig, AppServiceConfig): + def read_config(self, config): + self.replication_url = config["replication_url"] + self.server_name = config["server_name"] + self.use_insecure_ssl_client_just_for_testing_do_not_use = config.get( + "use_insecure_ssl_client_just_for_testing_do_not_use", False + ) + self.user_agent_suffix = None + self.listeners = config["listeners"] + self.soft_file_limit = config.get("soft_file_limit") + self.daemonize = config.get("daemonize") + self.pid_file = self.abspath(config.get("pid_file")) + self.macaroon_secret_key = config["macaroon_secret_key"] + self.expire_access_token = config.get("expire_access_token", False) + + def default_config(self, server_name, **kwargs): + pid_file = self.abspath("synchroton.pid") + return """\ + # Slave configuration + + # The replication listener on the synapse to talk to. + #replication_url: https://localhost:{replication_port}/_synapse/replication + + server_name: "%(server_name)s" + + listeners: + # Enable a /sync listener on the synchrontron + #- type: http + # port: {http_port} + # bind_address: "" + # Enable a ssh manhole listener on the synchrotron + # - type: manhole + # port: {manhole_port} + # bind_address: 127.0.0.1 + # Enable a metric listener on the synchrotron + # - type: http + # port: {metrics_port} + # bind_address: 127.0.0.1 + # resources: + # - names: ["metrics"] + # compress: False + + report_stats: False + + daemonize: False + + pid_file: %(pid_file)s + """ % locals() + + +class SynchrotronSlavedStore( + SlavedPushRuleStore, + SlavedEventStore, + SlavedReceiptsStore, + SlavedAccountDataStore, + SlavedApplicationServiceStore, + SlavedRegistrationStore, + SlavedFilteringStore, + SlavedPresenceStore, +): + def get_presence_list_accepted(self, user_localpart): + return () + + def insert_client_ip(self, user, access_token, ip, user_agent): + pass + + # XXX: This is a bit broken because we don't persist forgotten rooms + # in a way that they can be streamed. This means that we don't have a + # way to invalidate the forgotten rooms cache correctly. + # For now we expire the cache every 10 minutes. + BROKEN_CACHE_EXPIRY_MS = 60 * 60 * 1000 + who_forgot_in_room = ( + RoomMemberStore.__dict__["who_forgot_in_room"] + ) + + +class SynchrotronPresence(object): + def __init__(self, hs): + self.http_client = hs.get_simple_http_client() + self.store = hs.get_datastore() + self.user_to_num_current_syncs = {} + self.syncing_users_url = hs.config.replication_url + "/syncing_users" + self.clock = hs.get_clock() + + active_presence = self.store.take_presence_startup_info() + self.user_to_current_state = { + state.user_id: state + for state in active_presence + } + + self.process_id = random_string(16) + logger.info("Presence process_id is %r", self.process_id) + + def set_state(self, user, state): + # TODO Hows this supposed to work? + pass + + get_states = PresenceHandler.get_states.__func__ + current_state_for_users = PresenceHandler.current_state_for_users.__func__ + + @defer.inlineCallbacks + def user_syncing(self, user_id, affect_presence): + if affect_presence: + curr_sync = self.user_to_num_current_syncs.get(user_id, 0) + self.user_to_num_current_syncs[user_id] = curr_sync + 1 + # TODO: Send this less frequently. + # TODO: Make sure this doesn't race. Currently we can lose updates + # if two users come online in quick sucession and the second http + # to the master completes before the first. + # TODO: Don't block the sync request on this HTTP hit. + yield self._send_syncing_users() + + def _end(): + if affect_presence: + self.user_to_num_current_syncs[user_id] -= 1 + + @contextlib.contextmanager + def _user_syncing(): + try: + yield + finally: + _end() + + defer.returnValue(_user_syncing()) + + def _send_syncing_users(self): + return self.http_client.post_json_get_json(self.syncing_users_url, { + "process_id": self.process_id, + "syncing_users": [ + user_id for user_id, count in self.user_to_num_current_syncs.items() + if count > 0 + ], + }) + + def process_replication(self, result): + stream = result.get("presence", {"rows": []}) + for row in stream["rows"]: + ( + position, user_id, state, last_active_ts, + last_federation_update_ts, last_user_sync_ts, status_msg, + currently_active + ) = row + self.user_to_current_state[user_id] = UserPresenceState( + user_id, state, last_active_ts, + last_federation_update_ts, last_user_sync_ts, status_msg, + currently_active + ) + + +class SynchrotronTyping(object): + def __init__(self, hs): + self._latest_room_serial = 0 + self._room_serials = {} + self._room_typing = {} + + def stream_positions(self): + return {"typing": self._latest_room_serial} + + def process_replication(self, result): + stream = result.get("typing") + if stream: + self._latest_room_serial = int(stream["position"]) + + for row in stream["rows"]: + position, room_id, typing_json = row + typing = json.loads(typing_json) + self._room_serials[room_id] = position + self._room_typing[room_id] = typing + + +class SynchrotronApplicationService(object): + def notify_interested_services(self, event): + pass + + +class SynchrotronServer(HomeServer): + def get_db_conn(self, run_new_connection=True): + # Any param beginning with cp_ is a parameter for adbapi, and should + # not be passed to the database engine. + db_params = { + k: v for k, v in self.db_config.get("args", {}).items() + if not k.startswith("cp_") + } + db_conn = self.database_engine.module.connect(**db_params) + + if run_new_connection: + self.database_engine.on_new_connection(db_conn) + return db_conn + + def setup(self): + logger.info("Setting up.") + self.datastore = SynchrotronSlavedStore(self.get_db_conn(), self) + logger.info("Finished setting up.") + + def _listen_http(self, listener_config): + port = listener_config["port"] + bind_address = listener_config.get("bind_address", "") + site_tag = listener_config.get("tag", port) + resources = {} + for res in listener_config["resources"]: + for name in res["names"]: + if name == "metrics": + resources[METRICS_PREFIX] = MetricsResource(self) + elif name == "client": + resource = JsonResource(self, canonical_json=False) + sync.register_servlets(self, resource) + resources.update({ + "/_matrix/client/r0": resource, + "/_matrix/client/unstable": resource, + "/_matrix/client/v2_alpha": resource, + }) + + root_resource = create_resource_tree(resources, Resource()) + reactor.listenTCP( + port, + SynapseSite( + "synapse.access.http.%s" % (site_tag,), + site_tag, + listener_config, + root_resource, + ), + interface=bind_address + ) + logger.info("Synapse synchrotron now listening on port %d", port) + + def start_listening(self): + for listener in self.config.listeners: + if listener["type"] == "http": + self._listen_http(listener) + elif listener["type"] == "manhole": + reactor.listenTCP( + listener["port"], + manhole( + username="matrix", + password="rabbithole", + globals={"hs": self}, + ), + interface=listener.get("bind_address", '127.0.0.1') + ) + else: + logger.warn("Unrecognized listener type: %s", listener["type"]) + + @defer.inlineCallbacks + def replicate(self): + http_client = self.get_simple_http_client() + store = self.get_datastore() + replication_url = self.config.replication_url + clock = self.get_clock() + notifier = self.get_notifier() + presence_handler = self.get_presence_handler() + typing_handler = self.get_typing_handler() + + def expire_broken_caches(): + store.who_forgot_in_room.invalidate_all() + + def notify_from_stream( + result, stream_name, stream_key, room=None, user=None + ): + stream = result.get(stream_name) + if stream: + position_index = stream["field_names"].index("position") + if room: + room_index = stream["field_names"].index(room) + if user: + user_index = stream["field_names"].index(user) + + users = () + rooms = () + for row in stream["rows"]: + position = row[position_index] + + if user: + users = (row[user_index],) + + if room: + rooms = (row[room_index],) + + notifier.on_new_event( + stream_key, position, users=users, rooms=rooms + ) + + def notify(result): + stream = result.get("events") + if stream: + max_position = stream["position"] + for row in stream["rows"]: + position = row[0] + internal = json.loads(row[1]) + event_json = json.loads(row[2]) + event = FrozenEvent(event_json, internal_metadata_dict=internal) + extra_users = () + if event.type == EventTypes.Member: + extra_users = (event.state_key,) + notifier.on_new_room_event( + event, position, max_position, extra_users + ) + + notify_from_stream( + result, "push_rules", "push_rules_key", user="user_id" + ) + notify_from_stream( + result, "user_account_data", "account_data_key", user="user_id" + ) + notify_from_stream( + result, "room_account_data", "account_data_key", user="user_id" + ) + notify_from_stream( + result, "tag_account_data", "account_data_key", user="user_id" + ) + notify_from_stream( + result, "receipts", "receipt_key", room="room_id" + ) + notify_from_stream( + result, "typing", "typing_key", room="room_id" + ) + + next_expire_broken_caches_ms = 0 + while True: + try: + args = store.stream_positions() + args.update(typing_handler.stream_positions()) + args["timeout"] = 30000 + result = yield http_client.get_json(replication_url, args=args) + now_ms = clock.time_msec() + if now_ms > next_expire_broken_caches_ms: + expire_broken_caches() + next_expire_broken_caches_ms = ( + now_ms + store.BROKEN_CACHE_EXPIRY_MS + ) + yield store.process_replication(result) + typing_handler.process_replication(result) + presence_handler.process_replication(result) + notify(result) + except: + logger.exception("Error replicating from %r", replication_url) + sleep(5) + + def build_presence_handler(self): + return SynchrotronPresence(self) + + def build_typing_handler(self): + return SynchrotronTyping(self) + + +def setup(config_options): + try: + config = SynchrotronConfig.load_config( + "Synapse synchrotron", config_options + ) + except ConfigError as e: + sys.stderr.write("\n" + e.message + "\n") + sys.exit(1) + + if not config: + sys.exit(0) + + config.setup_logging() + + database_engine = create_engine(config.database_config) + + ss = SynchrotronServer( + config.server_name, + db_config=config.database_config, + config=config, + version_string=get_version_string("Synapse", synapse), + database_engine=database_engine, + application_service_handler=SynchrotronApplicationService(), + ) + + ss.setup() + ss.start_listening() + + change_resource_limit(ss.config.soft_file_limit) + + def start(): + ss.get_datastore().start_profiling() + ss.replicate() + + reactor.callWhenRunning(start) + + return ss + + +if __name__ == '__main__': + with LoggingContext("main"): + ps = setup(sys.argv[1:]) + + if ps.config.daemonize: + def run(): + with LoggingContext("run"): + change_resource_limit(ps.config.soft_file_limit) + reactor.run() + + daemon = Daemonize( + app="synapse-pusher", + pid=ps.config.pid_file, + action=run, + auto_close_fds=False, + verbose=True, + logger=logger, + ) + + daemon.start() + else: + reactor.run() -- cgit 1.4.1 From 80aade380545a0b661e2bbef48e175900ed4d41f Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 3 Jun 2016 14:24:19 +0100 Subject: Send updates to the syncing users every ten seconds or immediately if they've just come online --- synapse/app/synchrotron.py | 53 +++++++++++++++++++++++++++++++++++++--------- 1 file changed, 43 insertions(+), 10 deletions(-) (limited to 'synapse/app/synchrotron.py') diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index f592ad352e..7b45c87a96 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -16,7 +16,7 @@ import synapse -from synapse.api.constants import EventTypes +from synapse.api.constants import EventTypes, PresenceState from synapse.config._base import ConfigError from synapse.config.database import DatabaseConfig from synapse.config.logger import LoggingConfig @@ -41,7 +41,7 @@ from synapse.storage.presence import UserPresenceState from synapse.storage.roommember import RoomMemberStore from synapse.util.async import sleep from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext +from synapse.util.logcontext import LoggingContext, preserve_fn from synapse.util.manhole import manhole from synapse.util.rlimit import change_resource_limit from synapse.util.stringutils import random_string @@ -135,6 +135,8 @@ class SynchrotronSlavedStore( RoomMemberStore.__dict__["who_forgot_in_room"] ) +UPDATE_SYNCING_USERS_MS = 10 * 1000 + class SynchrotronPresence(object): def __init__(self, hs): @@ -153,6 +155,13 @@ class SynchrotronPresence(object): self.process_id = random_string(16) logger.info("Presence process_id is %r", self.process_id) + self._sending_sync = False + self._need_to_send_sync = False + self.clock.looping_call( + self._send_syncing_users_regularly, + UPDATE_SYNCING_USERS_MS, + ) + def set_state(self, user, state): # TODO Hows this supposed to work? pass @@ -165,12 +174,10 @@ class SynchrotronPresence(object): if affect_presence: curr_sync = self.user_to_num_current_syncs.get(user_id, 0) self.user_to_num_current_syncs[user_id] = curr_sync + 1 - # TODO: Send this less frequently. - # TODO: Make sure this doesn't race. Currently we can lose updates - # if two users come online in quick sucession and the second http - # to the master completes before the first. - # TODO: Don't block the sync request on this HTTP hit. - yield self._send_syncing_users() + prev_states = yield self.current_state_for_users([user_id]) + if prev_states[user_id].state == PresenceState.OFFLINE: + # TODO: Don't block the sync request on this HTTP hit. + yield self._send_syncing_users_now() def _end(): if affect_presence: @@ -185,8 +192,24 @@ class SynchrotronPresence(object): defer.returnValue(_user_syncing()) - def _send_syncing_users(self): - return self.http_client.post_json_get_json(self.syncing_users_url, { + def _send_syncing_users_regularly(self): + # Only send an update if we aren't in the middle of sending one. + if not self._sending_sync: + preserve_fn(self._send_syncing_users_now)() + + @defer.inlineCallbacks + def _send_syncing_users_now(self): + if self._sending_sync: + # We don't want to race with sending another update. + # Instead we wait for that update to finish and send another + # update afterwards. + self._need_to_send_sync = True + return + + # Flag that we are sending an update. + self._sending_sync = True + + yield self.http_client.post_json_get_json(self.syncing_users_url, { "process_id": self.process_id, "syncing_users": [ user_id for user_id, count in self.user_to_num_current_syncs.items() @@ -194,6 +217,16 @@ class SynchrotronPresence(object): ], }) + # Unset the flag as we are no longer sending an update. + self._sending_sync = False + if self._need_to_send_sync: + # If something happened while we were sending the update then + # we might need to send another update. + # TODO: Check if the update that was sent matches the current state + # as we only need to send an update if they are different. + self._need_to_send_sync = False + yield self._send_syncing_users_now() + def process_replication(self, result): stream = result.get("presence", {"rows": []}) for row in stream["rows"]: -- cgit 1.4.1 From 0b3c80a234cd8f16c8714af7e7b719dc2e635b20 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 3 Jun 2016 14:55:01 +0100 Subject: Use ClientIpStore to record client ips --- synapse/app/synchrotron.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) (limited to 'synapse/app/synchrotron.py') diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 7b45c87a96..0446a1643d 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -27,6 +27,7 @@ from synapse.http.site import SynapseSite from synapse.http.server import JsonResource from synapse.metrics.resource import MetricsResource, METRICS_PREFIX from synapse.rest.client.v2_alpha import sync +from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.account_data import SlavedAccountDataStore @@ -36,6 +37,7 @@ from synapse.replication.slave.storage.filtering import SlavedFilteringStore from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore from synapse.replication.slave.storage.presence import SlavedPresenceStore from synapse.server import HomeServer +from synapse.storage.client_ips import ClientIpStore from synapse.storage.engines import create_engine from synapse.storage.presence import UserPresenceState from synapse.storage.roommember import RoomMemberStore @@ -119,13 +121,12 @@ class SynchrotronSlavedStore( SlavedRegistrationStore, SlavedFilteringStore, SlavedPresenceStore, + BaseSlavedStore, + ClientIpStore, # After BaseSlavedStre because the constructor is different ): def get_presence_list_accepted(self, user_localpart): return () - def insert_client_ip(self, user, access_token, ip, user_agent): - pass - # XXX: This is a bit broken because we don't persist forgotten rooms # in a way that they can be streamed. This means that we don't have a # way to invalidate the forgotten rooms cache correctly. -- cgit 1.4.1 From da491e75b2d46c885f7fbb9240501c223e7c59bd Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 3 Jun 2016 14:56:36 +0100 Subject: Appease flake8 --- synapse/app/synchrotron.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/app/synchrotron.py') diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 0446a1643d..af06ce70d1 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -122,7 +122,7 @@ class SynchrotronSlavedStore( SlavedFilteringStore, SlavedPresenceStore, BaseSlavedStore, - ClientIpStore, # After BaseSlavedStre because the constructor is different + ClientIpStore, # After BaseSlavedStre because the constructor is different ): def get_presence_list_accepted(self, user_localpart): return () -- cgit 1.4.1 From 48340e4f13a8090feac070ebb507e7629d03b530 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 3 Jun 2016 15:02:27 +0100 Subject: Clear the list of ongoing syncs on shutdown --- synapse/app/synchrotron.py | 9 +++++++++ 1 file changed, 9 insertions(+) (limited to 'synapse/app/synchrotron.py') diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index af06ce70d1..f4b416f777 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -163,6 +163,8 @@ class SynchrotronPresence(object): UPDATE_SYNCING_USERS_MS, ) + reactor.addSystemEventTrigger("before", "shutdown", self._on_shutdown) + def set_state(self, user, state): # TODO Hows this supposed to work? pass @@ -193,6 +195,13 @@ class SynchrotronPresence(object): defer.returnValue(_user_syncing()) + @defer.inlineCallbacks + def _on_shutdown(self): + # When the synchrotron is shutdown tell the master to clear the in + # progress syncs for this process + self.user_to_num_current_syncs.clear() + yield self._send_syncing_users_now() + def _send_syncing_users_regularly(self): # Only send an update if we aren't in the middle of sending one. if not self._sending_sync: -- cgit 1.4.1 From 8f79084bd44f76223048c1bd6d836f904edcc95e Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 3 Jun 2016 18:03:40 +0100 Subject: Add get_presence_list_accepted to the broken caches in synchrotron --- synapse/app/synchrotron.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) (limited to 'synapse/app/synchrotron.py') diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index f4b416f777..c77854fab1 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -39,7 +39,7 @@ from synapse.replication.slave.storage.presence import SlavedPresenceStore from synapse.server import HomeServer from synapse.storage.client_ips import ClientIpStore from synapse.storage.engines import create_engine -from synapse.storage.presence import UserPresenceState +from synapse.storage.presence import PresenceStore, UserPresenceState from synapse.storage.roommember import RoomMemberStore from synapse.util.async import sleep from synapse.util.httpresourcetree import create_resource_tree @@ -124,9 +124,6 @@ class SynchrotronSlavedStore( BaseSlavedStore, ClientIpStore, # After BaseSlavedStre because the constructor is different ): - def get_presence_list_accepted(self, user_localpart): - return () - # XXX: This is a bit broken because we don't persist forgotten rooms # in a way that they can be streamed. This means that we don't have a # way to invalidate the forgotten rooms cache correctly. @@ -136,6 +133,13 @@ class SynchrotronSlavedStore( RoomMemberStore.__dict__["who_forgot_in_room"] ) + # XXX: This is a bit broken because we don't persist the accepted list in a + # way that can be replicated. This means that we don't have a way to + # invalidate the cache correctly. + get_presence_list_accepted = PresenceStore.__dict__[ + "get_presence_list_accepted" + ] + UPDATE_SYNCING_USERS_MS = 10 * 1000 @@ -357,6 +361,7 @@ class SynchrotronServer(HomeServer): def expire_broken_caches(): store.who_forgot_in_room.invalidate_all() + store.get_presence_list_accepted.invalidate_all() def notify_from_stream( result, stream_name, stream_key, room=None, user=None -- cgit 1.4.1 From ac9716f1546ae486cac435b8a577cc2c54b666d6 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 3 Jun 2016 18:10:00 +0100 Subject: Fix spelling --- synapse/app/synchrotron.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/app/synchrotron.py') diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index c77854fab1..aa81e1c5da 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -122,7 +122,7 @@ class SynchrotronSlavedStore( SlavedFilteringStore, SlavedPresenceStore, BaseSlavedStore, - ClientIpStore, # After BaseSlavedStre because the constructor is different + ClientIpStore, # After BaseSlavedStore because the constructor is different ): # XXX: This is a bit broken because we don't persist forgotten rooms # in a way that they can be streamed. This means that we don't have a -- cgit 1.4.1 From 5ef84da4f11f1b1cceb0c44d9867bb597ee68e64 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 6 Jun 2016 16:05:28 +0100 Subject: Yield on the sleeps intended to backoff replication --- synapse/app/pusher.py | 2 +- synapse/app/synchrotron.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/app/synchrotron.py') diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index f1de1e7ce9..3c3fa38053 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -311,7 +311,7 @@ class PusherServer(HomeServer): poke_pushers(result) except: logger.exception("Error replicating from %r", replication_url) - sleep(30) + yield sleep(30) def setup(config_options): diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index aa81e1c5da..7273055cc1 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -443,7 +443,7 @@ class SynchrotronServer(HomeServer): notify(result) except: logger.exception("Error replicating from %r", replication_url) - sleep(5) + yield sleep(5) def build_presence_handler(self): return SynchrotronPresence(self) -- cgit 1.4.1 From 4a5bbb1941ae63f1d6632aa35e80274e56c8dbb9 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 6 Jun 2016 16:37:12 +0100 Subject: Fix a KeyError in the synchrotron presence --- synapse/app/synchrotron.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'synapse/app/synchrotron.py') diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index aa81e1c5da..3d0d5cc15a 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -187,7 +187,10 @@ class SynchrotronPresence(object): yield self._send_syncing_users_now() def _end(): - if affect_presence: + # We check that the user_id is in user_to_num_current_syncs because + # user_to_num_current_syncs may have been cleared if we are + # shutting down. + if affect_presence and user_id in self.user_to_num_current_syncs: self.user_to_num_current_syncs[user_id] -= 1 @contextlib.contextmanager -- cgit 1.4.1 From dded389ac16ec023c986df400d25ca94a4a28677 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 7 Jun 2016 15:45:56 +0100 Subject: Allow setting of gc.set_thresholds --- synapse/app/homeserver.py | 5 +++++ synapse/app/pusher.py | 5 +++++ synapse/app/synchrotron.py | 15 ++++++++++----- synapse/config/server.py | 19 ++++++++++++++++++- 4 files changed, 38 insertions(+), 6 deletions(-) (limited to 'synapse/app/synchrotron.py') diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index df675c0ed4..22e1721fc4 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -16,6 +16,7 @@ import synapse +import gc import logging import os import sys @@ -351,6 +352,8 @@ class SynapseService(service.Service): def startService(self): hs = setup(self.config) change_resource_limit(hs.config.soft_file_limit) + if hs.config.gc_thresholds: + gc.set_threshold(*hs.config.gc_thresholds) def stopService(self): return self._port.stopListening() @@ -422,6 +425,8 @@ def run(hs): # sys.settrace(logcontext_tracer) with LoggingContext("run"): change_resource_limit(hs.config.soft_file_limit) + if hs.config.gc_thresholds: + gc.set_threshold(*hs.config.gc_thresholds) reactor.run() if hs.config.daemonize: diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 3c3fa38053..7e2bf7ecc2 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -43,6 +43,7 @@ from twisted.web.resource import Resource from daemonize import Daemonize +import gc import sys import logging @@ -342,6 +343,8 @@ def setup(config_options): ps.start_listening() change_resource_limit(ps.config.soft_file_limit) + if ps.config.gc_thresholds: + gc.set_threshold(*ps.config.gc_thresholds) def start(): ps.replicate() @@ -361,6 +364,8 @@ if __name__ == '__main__': def run(): with LoggingContext("run"): change_resource_limit(ps.config.soft_file_limit) + if ps.config.gc_thresholds: + gc.set_threshold(*ps.config.gc_thresholds) reactor.run() daemon = Daemonize( diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 5c552ffb29..f9673ab8d8 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -57,6 +57,7 @@ from daemonize import Daemonize import sys import logging import contextlib +import gc import ujson as json logger = logging.getLogger("synapse.app.synchrotron") @@ -484,6 +485,8 @@ def setup(config_options): ss.start_listening() change_resource_limit(ss.config.soft_file_limit) + if ss.config.gc_thresholds: + ss.set_threshold(*ss.config.gc_thresholds) def start(): ss.get_datastore().start_profiling() @@ -496,17 +499,19 @@ def setup(config_options): if __name__ == '__main__': with LoggingContext("main"): - ps = setup(sys.argv[1:]) + ss = setup(sys.argv[1:]) - if ps.config.daemonize: + if ss.config.daemonize: def run(): with LoggingContext("run"): - change_resource_limit(ps.config.soft_file_limit) + change_resource_limit(ss.config.soft_file_limit) + if ss.config.gc_thresholds: + gc.set_threshold(*ss.config.gc_thresholds) reactor.run() daemon = Daemonize( - app="synapse-pusher", - pid=ps.config.pid_file, + app="synapse-synchrotron", + pid=ss.config.pid_file, action=run, auto_close_fds=False, verbose=True, diff --git a/synapse/config/server.py b/synapse/config/server.py index c2d8f8a52f..44b8d422e0 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ._base import Config +from ._base import Config, ConfigError class ServerConfig(Config): @@ -38,6 +38,20 @@ class ServerConfig(Config): self.listeners = config.get("listeners", []) + thresholds = config.get("gc_thresholds", None) + if thresholds is not None: + try: + assert len(thresholds) == 3 + self.gc_thresholds = ( + int(thresholds[0]), int(thresholds[1]), int(thresholds[2]), + ) + except: + raise ConfigError( + "Value of `gc_threshold` must be a list of three integers if set" + ) + else: + self.gc_thresholds = None + bind_port = config.get("bind_port") if bind_port: self.listeners = [] @@ -157,6 +171,9 @@ class ServerConfig(Config): # hard limit. soft_file_limit: 0 + # The GC threshold parameters to pass to `gc.set_threshold`, if defined + # gc_thresholds: [700, 10, 10] + # A list of other Home Servers to fetch the public room directory from # and include in the public room directory of this home server # This is a temporary stopgap solution to populate new server with a -- cgit 1.4.1 From 2d1d1025fac846e2746dc627c0ebb6542c1488d3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 7 Jun 2016 16:26:25 +0100 Subject: Add gc_threshold to pusher and synchrotron --- synapse/app/pusher.py | 14 ++++++++++++++ synapse/app/synchrotron.py | 14 ++++++++++++++ 2 files changed, 28 insertions(+) (limited to 'synapse/app/synchrotron.py') diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 7e2bf7ecc2..4ec23d84c1 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -65,6 +65,20 @@ class SlaveConfig(DatabaseConfig): self.pid_file = self.abspath(config.get("pid_file")) self.public_baseurl = config["public_baseurl"] + thresholds = config.get("gc_thresholds", None) + if thresholds is not None: + try: + assert len(thresholds) == 3 + self.gc_thresholds = ( + int(thresholds[0]), int(thresholds[1]), int(thresholds[2]), + ) + except: + raise ConfigError( + "Value of `gc_threshold` must be a list of three integers if set" + ) + else: + self.gc_thresholds = None + # some things used by the auth handler but not actually used in the # pusher codebase self.bcrypt_rounds = None diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index f9673ab8d8..297e199453 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -78,6 +78,20 @@ class SynchrotronConfig(DatabaseConfig, LoggingConfig, AppServiceConfig): self.macaroon_secret_key = config["macaroon_secret_key"] self.expire_access_token = config.get("expire_access_token", False) + thresholds = config.get("gc_thresholds", None) + if thresholds is not None: + try: + assert len(thresholds) == 3 + self.gc_thresholds = ( + int(thresholds[0]), int(thresholds[1]), int(thresholds[2]), + ) + except: + raise ConfigError( + "Value of `gc_threshold` must be a list of three integers if set" + ) + else: + self.gc_thresholds = None + def default_config(self, server_name, **kwargs): pid_file = self.abspath("synchroton.pid") return """\ -- cgit 1.4.1