From 36c28bc467e53cf7318964a4dc986428644f27ed Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 27 Mar 2017 16:33:44 +0100 Subject: Update all the workers and master to use TCP replication --- synapse/app/pusher.py | 125 +++++++++++++++++++------------------------------- 1 file changed, 46 insertions(+), 79 deletions(-) (limited to 'synapse/app/pusher.py') diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index ab682e52ec..cb76f058b0 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -27,9 +27,9 @@ from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.pushers import SlavedPusherStore from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.account_data import SlavedAccountDataStore +from synapse.replication.tcp.client import ReplicationClientHandler from synapse.storage.engines import create_engine from synapse.storage import DataStore -from synapse.util.async import sleep from synapse.util.httpresourcetree import create_resource_tree from synapse.util.logcontext import LoggingContext, preserve_fn, \ PreserveLoggingContext @@ -39,7 +39,7 @@ from synapse.util.versionstring import get_version_string from synapse import events -from twisted.internet import reactor, defer +from twisted.internet import reactor from twisted.web.resource import Resource from daemonize import Daemonize @@ -89,7 +89,6 @@ class PusherSlaveStore( class PusherServer(HomeServer): - def get_db_conn(self, run_new_connection=True): # Any param beginning with cp_ is a parameter for adbapi, and should # not be passed to the database engine. @@ -109,16 +108,7 @@ class PusherServer(HomeServer): logger.info("Finished setting up.") def remove_pusher(self, app_id, push_key, user_id): - http_client = self.get_simple_http_client() - replication_url = self.config.worker_replication_url - url = replication_url + "/remove_pushers" - return http_client.post_json_get_json(url, { - "remove": [{ - "app_id": app_id, - "push_key": push_key, - "user_id": user_id, - }] - }) + self.get_tcp_replication().send_remove_pusher(app_id, push_key, user_id) def _listen_http(self, listener_config): port = listener_config["port"] @@ -166,73 +156,51 @@ class PusherServer(HomeServer): else: logger.warn("Unrecognized listener type: %s", listener["type"]) - @defer.inlineCallbacks - def replicate(self): - http_client = self.get_simple_http_client() - store = self.get_datastore() - replication_url = self.config.worker_replication_url - pusher_pool = self.get_pusherpool() - - def stop_pusher(user_id, app_id, pushkey): - key = "%s:%s" % (app_id, pushkey) - pushers_for_user = pusher_pool.pushers.get(user_id, {}) - pusher = pushers_for_user.pop(key, None) - if pusher is None: - return - logger.info("Stopping pusher %r / %r", user_id, key) - pusher.on_stop() - - def start_pusher(user_id, app_id, pushkey): - key = "%s:%s" % (app_id, pushkey) - logger.info("Starting pusher %r / %r", user_id, key) - return pusher_pool._refresh_pusher(app_id, pushkey, user_id) - - @defer.inlineCallbacks - def poke_pushers(results): - pushers_rows = set( - map(tuple, results.get("pushers", {}).get("rows", [])) + self.get_tcp_replication().start_replication(self) + + def build_tcp_replication(self): + return PusherReplicationHandler(self) + + +class PusherReplicationHandler(ReplicationClientHandler): + def __init__(self, hs): + super(PusherReplicationHandler, self).__init__(hs.get_datastore()) + + self.pusher_pool = hs.get_pusherpool() + + def on_rdata(self, stream_name, token, rows): + super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows) + self.poke_pushers(stream_name, token, rows) + + def poke_pushers(self, stream_name, token, rows): + if stream_name == "pushers": + for row in rows: + if row.deleted: + self.stop_pusher(row.user_id, row.app_id, row.pushkey) + else: + self.start_pusher(row.user_id, row.app_id, row.pushkey) + elif stream_name == "events": + preserve_fn(self.pusher_pool.on_new_notifications)( + token, token, ) - deleted_pushers_rows = set( - map(tuple, results.get("deleted_pushers", {}).get("rows", [])) + elif stream_name == "receipts": + preserve_fn(self.pusher_pool.on_new_receipts)( + token, token, set(row.room_id for row in rows) ) - for row in sorted(pushers_rows | deleted_pushers_rows): - if row in deleted_pushers_rows: - user_id, app_id, pushkey = row[1:4] - stop_pusher(user_id, app_id, pushkey) - elif row in pushers_rows: - user_id = row[1] - app_id = row[5] - pushkey = row[8] - yield start_pusher(user_id, app_id, pushkey) - - stream = results.get("events") - if stream and stream["rows"]: - min_stream_id = stream["rows"][0][0] - max_stream_id = stream["position"] - preserve_fn(pusher_pool.on_new_notifications)( - min_stream_id, max_stream_id - ) - - stream = results.get("receipts") - if stream and stream["rows"]: - rows = stream["rows"] - affected_room_ids = set(row[1] for row in rows) - min_stream_id = rows[0][0] - max_stream_id = stream["position"] - preserve_fn(pusher_pool.on_new_receipts)( - min_stream_id, max_stream_id, affected_room_ids - ) - - while True: - try: - args = store.stream_positions() - args["timeout"] = 30000 - result = yield http_client.get_json(replication_url, args=args) - yield store.process_replication(result) - poke_pushers(result) - except: - logger.exception("Error replicating from %r", replication_url) - yield sleep(30) + + def stop_pusher(self, user_id, app_id, pushkey): + key = "%s:%s" % (app_id, pushkey) + pushers_for_user = self.pusher_pool.pushers.get(user_id, {}) + pusher = pushers_for_user.pop(key, None) + if pusher is None: + return + logger.info("Stopping pusher %r / %r", user_id, key) + pusher.on_stop() + + def start_pusher(self, user_id, app_id, pushkey): + key = "%s:%s" % (app_id, pushkey) + logger.info("Starting pusher %r / %r", user_id, key) + return self.pusher_pool._refresh_pusher(app_id, pushkey, user_id) def start(config_options): @@ -288,7 +256,6 @@ def start(config_options): reactor.run() def start(): - ps.replicate() ps.get_pusherpool().start() ps.get_datastore().start_profiling() ps.get_state_handler().start_caching() -- cgit 1.4.1 From ac66e11f2b1235f801195ad0065008cdca2f1b0b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 4 Apr 2017 15:22:54 +0100 Subject: Add the appropriate amount of preserve_fn --- synapse/app/appservice.py | 6 ++++-- synapse/app/federation_sender.py | 4 ++-- synapse/app/pusher.py | 13 +++++++------ synapse/app/synchrotron.py | 13 ++++++------- 4 files changed, 19 insertions(+), 17 deletions(-) (limited to 'synapse/app/pusher.py') diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py index 13b69d8dee..9a476efa63 100644 --- a/synapse/app/appservice.py +++ b/synapse/app/appservice.py @@ -29,7 +29,7 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto from synapse.replication.tcp.client import ReplicationClientHandler from synapse.storage.engines import create_engine from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, PreserveLoggingContext +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn from synapse.util.manhole import manhole from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string @@ -136,7 +136,9 @@ class ASReplicationHandler(ReplicationClientHandler): if stream_name == "events": max_stream_id = self.store.get_room_max_stream_ordering() - self.appservice_handler.notify_interested_services(max_stream_id) + preserve_fn( + self.appservice_handler.notify_interested_services + )(max_stream_id) def start(config_options): diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 705fe5e3df..fa7b19e53a 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -36,7 +36,7 @@ from synapse.storage.engines import create_engine from synapse.storage.presence import UserPresenceState from synapse.util.async import Linearizer from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, PreserveLoggingContext +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn from synapse.util.manhole import manhole from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string @@ -337,7 +337,7 @@ class FederationSenderHandler(object): for destination in device_destinations: self.federation_sender.send_device_messages(destination) - self.update_token(token) + preserve_fn(self.update_token)(token) # We also need to poke the federation sender when new events happen elif stream_name == "events": diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index cb76f058b0..f9114acfcb 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -39,7 +39,7 @@ from synapse.util.versionstring import get_version_string from synapse import events -from twisted.internet import reactor +from twisted.internet import reactor, defer from twisted.web.resource import Resource from daemonize import Daemonize @@ -170,21 +170,22 @@ class PusherReplicationHandler(ReplicationClientHandler): def on_rdata(self, stream_name, token, rows): super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows) - self.poke_pushers(stream_name, token, rows) + preserve_fn(self.poke_pushers)(stream_name, token, rows) + @defer.inlineCallbacks def poke_pushers(self, stream_name, token, rows): if stream_name == "pushers": for row in rows: if row.deleted: - self.stop_pusher(row.user_id, row.app_id, row.pushkey) + yield self.stop_pusher(row.user_id, row.app_id, row.pushkey) else: - self.start_pusher(row.user_id, row.app_id, row.pushkey) + yield self.start_pusher(row.user_id, row.app_id, row.pushkey) elif stream_name == "events": - preserve_fn(self.pusher_pool.on_new_notifications)( + yield self.pusher_pool.on_new_notifications( token, token, ) elif stream_name == "receipts": - preserve_fn(self.pusher_pool.on_new_receipts)( + yield self.pusher_pool.on_new_receipts( token, token, set(row.room_id for row in rows) ) diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 7341970a81..a1ef5dfa77 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -47,7 +47,7 @@ from synapse.storage.engines import create_engine from synapse.storage.presence import PresenceStore, UserPresenceState from synapse.storage.roommember import RoomMemberStore from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, PreserveLoggingContext +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn from synapse.util.manhole import manhole from synapse.util.rlimit import change_resource_limit from synapse.util.stringutils import random_string @@ -328,11 +328,7 @@ class SyncReplicationHandler(ReplicationClientHandler): def on_rdata(self, stream_name, token, rows): super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows) - if stream_name == "typing": - self.typing_handler.process_replication_rows(token, rows) - elif stream_name == "presence": - self.presence_handler.process_replication_rows(token, rows) - self.notify(stream_name, token, rows) + preserve_fn(self.process_and_notify)(stream_name, token, rows) def get_streams_to_replicate(self): args = super(SyncReplicationHandler, self).get_streams_to_replicate() @@ -343,7 +339,7 @@ class SyncReplicationHandler(ReplicationClientHandler): return self.presence_handler.get_currently_syncing_users() @defer.inlineCallbacks - def notify(self, stream_name, token, rows): + def process_and_notify(self, stream_name, token, rows): if stream_name == "events": # We shouldn't get multiple rows per token for events stream, so # we don't need to optimise this for multiple rows. @@ -369,6 +365,7 @@ class SyncReplicationHandler(ReplicationClientHandler): "receipt_key", token, rooms=[row.room_id for row in rows], ) elif stream_name == "typing": + self.typing_handler.process_replication_rows(token, rows) self.notifier.on_new_event( "typing_key", token, rooms=[row.room_id for row in rows], ) @@ -386,6 +383,8 @@ class SyncReplicationHandler(ReplicationClientHandler): self.notifier.on_new_event( "device_list_key", token, rooms=all_room_ids, ) + elif stream_name == "presence": + yield self.presence_handler.process_replication_rows(token, rows) def start(config_options): -- cgit 1.4.1