From 36c28bc467e53cf7318964a4dc986428644f27ed Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 27 Mar 2017 16:33:44 +0100 Subject: Update all the workers and master to use TCP replication --- synapse/app/federation_sender.py | 110 ++++++++++++++++++++++----------------- 1 file changed, 62 insertions(+), 48 deletions(-) (limited to 'synapse/app/federation_sender.py') diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 76c4cc54d1..8994891aeb 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -31,9 +31,10 @@ from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.slave.storage.transactions import TransactionStore from synapse.replication.slave.storage.devices import SlavedDeviceStore +from synapse.replication.tcp.client import ReplicationClientHandler from synapse.storage.engines import create_engine from synapse.storage.presence import UserPresenceState -from synapse.util.async import sleep +from synapse.util.async import Linearizer from synapse.util.httpresourcetree import create_resource_tree from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.util.manhole import manhole @@ -59,7 +60,23 @@ class FederationSenderSlaveStore( SlavedDeviceInboxStore, TransactionStore, SlavedReceiptsStore, SlavedEventStore, SlavedRegistrationStore, SlavedDeviceStore, ): - pass + def __init__(self, db_conn, hs): + super(FederationSenderSlaveStore, self).__init__(db_conn, hs) + self.federation_out_pos_startup = self._get_federation_out_pos(db_conn) + + def _get_federation_out_pos(self, db_conn): + sql = ( + "SELECT stream_id FROM federation_stream_position" + " WHERE type = ?" + ) + sql = self.database_engine.convert_param_style(sql) + + txn = db_conn.cursor() + txn.execute(sql, ("federation",)) + rows = txn.fetchall() + txn.close() + + return rows[0][0] if rows else -1 class FederationSenderServer(HomeServer): @@ -127,26 +144,29 @@ class FederationSenderServer(HomeServer): else: logger.warn("Unrecognized listener type: %s", listener["type"]) - @defer.inlineCallbacks - def replicate(self): - http_client = self.get_simple_http_client() - store = self.get_datastore() - replication_url = self.config.worker_replication_url - send_handler = FederationSenderHandler(self) - - send_handler.on_start() - - while True: - try: - args = store.stream_positions() - args.update((yield send_handler.stream_positions())) - args["timeout"] = 30000 - result = yield http_client.get_json(replication_url, args=args) - yield store.process_replication(result) - yield send_handler.process_replication(result) - except: - logger.exception("Error replicating from %r", replication_url) - yield sleep(30) + self.get_tcp_replication().start_replication(self) + + def build_tcp_replication(self): + return FederationSenderReplicationHandler(self) + + +class FederationSenderReplicationHandler(ReplicationClientHandler): + def __init__(self, hs): + super(FederationSenderReplicationHandler, self).__init__(hs.get_datastore()) + self.send_handler = FederationSenderHandler(hs) + + def on_rdata(self, stream_name, token, rows): + super(FederationSenderReplicationHandler, self).on_rdata( + stream_name, token, rows + ) + self.send_handler.process_replication_rows(stream_name, token, rows) + if stream_name == "federation": + self.send_federation_ack(token) + + def get_streams_to_replicate(self): + args = super(FederationSenderReplicationHandler, self).get_streams_to_replicate() + args.update(self.send_handler.stream_positions()) + return args def start(config_options): @@ -205,7 +225,6 @@ def start(config_options): reactor.run() def start(): - ps.replicate() ps.get_datastore().start_profiling() ps.get_state_handler().start_caching() @@ -233,6 +252,9 @@ class FederationSenderHandler(object): self.store = hs.get_datastore() self.federation_sender = hs.get_federation_sender() + self.federation_position = self.store.federation_out_pos_startup + self._fed_position_linearizer = Linearizer(name="_fed_position_linearizer") + self._room_serials = {} self._room_typing = {} @@ -243,25 +265,13 @@ class FederationSenderHandler(object): self.store.get_room_max_stream_ordering() ) - @defer.inlineCallbacks def stream_positions(self): - stream_id = yield self.store.get_federation_out_pos("federation") - defer.returnValue({ - "federation": stream_id, - - # Ack stuff we've "processed", this should only be called from - # one process. - "federation_ack": stream_id, - }) + return {"federation": self.federation_position} - @defer.inlineCallbacks - def process_replication(self, result): + def process_replication_rows(self, stream_name, token, rows): # The federation stream contains things that we want to send out, e.g. # presence, typing, etc. - fed_stream = result.get("federation") - if fed_stream: - latest_id = int(fed_stream["position"]) - + if stream_name == "federation": # The federation stream containis a bunch of different types of # rows that need to be handled differently. We parse the rows, put # them into the appropriate collection and then send them off. @@ -272,8 +282,9 @@ class FederationSenderHandler(object): device_destinations = set() # Parse the rows in the stream - for row in fed_stream["rows"]: - position, typ, content_js = row + for row in rows: + typ = row.type + content_js = row.data content = json.loads(content_js) if typ == send_queue.PRESENCE_TYPE: @@ -325,16 +336,19 @@ class FederationSenderHandler(object): for destination in device_destinations: self.federation_sender.send_device_messages(destination) - # Record where we are in the stream. - yield self.store.update_federation_out_pos( - "federation", latest_id - ) + self.update_token(token) # We also need to poke the federation sender when new events happen - event_stream = result.get("events") - if event_stream: - latest_pos = event_stream["position"] - self.federation_sender.notify_new_events(latest_pos) + elif stream_name == "events": + self.federation_sender.notify_new_events(token) + + @defer.inlineCallbacks + def update_token(self, token): + self.federation_position = token + with (yield self._fed_position_linearizer.queue(None)): + yield self.store.update_federation_out_pos( + "federation", self.federation_position + ) if __name__ == '__main__': -- cgit 1.4.1 From 6ce6bbedcb8721fc99200ecace94eeedb51da6f2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 4 Apr 2017 10:29:29 +0100 Subject: Move where we ack federation --- synapse/app/federation_sender.py | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) (limited to 'synapse/app/federation_sender.py') diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 8994891aeb..705fe5e3df 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -153,15 +153,13 @@ class FederationSenderServer(HomeServer): class FederationSenderReplicationHandler(ReplicationClientHandler): def __init__(self, hs): super(FederationSenderReplicationHandler, self).__init__(hs.get_datastore()) - self.send_handler = FederationSenderHandler(hs) + self.send_handler = FederationSenderHandler(hs, self) def on_rdata(self, stream_name, token, rows): super(FederationSenderReplicationHandler, self).on_rdata( stream_name, token, rows ) self.send_handler.process_replication_rows(stream_name, token, rows) - if stream_name == "federation": - self.send_federation_ack(token) def get_streams_to_replicate(self): args = super(FederationSenderReplicationHandler, self).get_streams_to_replicate() @@ -248,13 +246,16 @@ class FederationSenderHandler(object): """Processes the replication stream and forwards the appropriate entries to the federation sender. """ - def __init__(self, hs): + def __init__(self, hs, replication_client): self.store = hs.get_datastore() self.federation_sender = hs.get_federation_sender() + self.replication_client = replication_client self.federation_position = self.store.federation_out_pos_startup self._fed_position_linearizer = Linearizer(name="_fed_position_linearizer") + self._last_ack = self.federation_position + self._room_serials = {} self._room_typing = {} @@ -345,10 +346,18 @@ class FederationSenderHandler(object): @defer.inlineCallbacks def update_token(self, token): self.federation_position = token + + # We linearize here to ensure we don't have races updating the token with (yield self._fed_position_linearizer.queue(None)): - yield self.store.update_federation_out_pos( - "federation", self.federation_position - ) + if self._last_ack < self.federation_position: + yield self.store.update_federation_out_pos( + "federation", self.federation_position + ) + + # We ACK this token over replication so that the master can drop + # its in memory queues + self.replication_client.send_federation_ack(self.federation_position) + self._last_ack = self.federation_position if __name__ == '__main__': -- cgit 1.4.1 From ac66e11f2b1235f801195ad0065008cdca2f1b0b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 4 Apr 2017 15:22:54 +0100 Subject: Add the appropriate amount of preserve_fn --- synapse/app/appservice.py | 6 ++++-- synapse/app/federation_sender.py | 4 ++-- synapse/app/pusher.py | 13 +++++++------ synapse/app/synchrotron.py | 13 ++++++------- 4 files changed, 19 insertions(+), 17 deletions(-) (limited to 'synapse/app/federation_sender.py') diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py index 13b69d8dee..9a476efa63 100644 --- a/synapse/app/appservice.py +++ b/synapse/app/appservice.py @@ -29,7 +29,7 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto from synapse.replication.tcp.client import ReplicationClientHandler from synapse.storage.engines import create_engine from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, PreserveLoggingContext +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn from synapse.util.manhole import manhole from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string @@ -136,7 +136,9 @@ class ASReplicationHandler(ReplicationClientHandler): if stream_name == "events": max_stream_id = self.store.get_room_max_stream_ordering() - self.appservice_handler.notify_interested_services(max_stream_id) + preserve_fn( + self.appservice_handler.notify_interested_services + )(max_stream_id) def start(config_options): diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 705fe5e3df..fa7b19e53a 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -36,7 +36,7 @@ from synapse.storage.engines import create_engine from synapse.storage.presence import UserPresenceState from synapse.util.async import Linearizer from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, PreserveLoggingContext +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn from synapse.util.manhole import manhole from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string @@ -337,7 +337,7 @@ class FederationSenderHandler(object): for destination in device_destinations: self.federation_sender.send_device_messages(destination) - self.update_token(token) + preserve_fn(self.update_token)(token) # We also need to poke the federation sender when new events happen elif stream_name == "events": diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index cb76f058b0..f9114acfcb 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -39,7 +39,7 @@ from synapse.util.versionstring import get_version_string from synapse import events -from twisted.internet import reactor +from twisted.internet import reactor, defer from twisted.web.resource import Resource from daemonize import Daemonize @@ -170,21 +170,22 @@ class PusherReplicationHandler(ReplicationClientHandler): def on_rdata(self, stream_name, token, rows): super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows) - self.poke_pushers(stream_name, token, rows) + preserve_fn(self.poke_pushers)(stream_name, token, rows) + @defer.inlineCallbacks def poke_pushers(self, stream_name, token, rows): if stream_name == "pushers": for row in rows: if row.deleted: - self.stop_pusher(row.user_id, row.app_id, row.pushkey) + yield self.stop_pusher(row.user_id, row.app_id, row.pushkey) else: - self.start_pusher(row.user_id, row.app_id, row.pushkey) + yield self.start_pusher(row.user_id, row.app_id, row.pushkey) elif stream_name == "events": - preserve_fn(self.pusher_pool.on_new_notifications)( + yield self.pusher_pool.on_new_notifications( token, token, ) elif stream_name == "receipts": - preserve_fn(self.pusher_pool.on_new_receipts)( + yield self.pusher_pool.on_new_receipts( token, token, set(row.room_id for row in rows) ) diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 7341970a81..a1ef5dfa77 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -47,7 +47,7 @@ from synapse.storage.engines import create_engine from synapse.storage.presence import PresenceStore, UserPresenceState from synapse.storage.roommember import RoomMemberStore from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, PreserveLoggingContext +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn from synapse.util.manhole import manhole from synapse.util.rlimit import change_resource_limit from synapse.util.stringutils import random_string @@ -328,11 +328,7 @@ class SyncReplicationHandler(ReplicationClientHandler): def on_rdata(self, stream_name, token, rows): super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows) - if stream_name == "typing": - self.typing_handler.process_replication_rows(token, rows) - elif stream_name == "presence": - self.presence_handler.process_replication_rows(token, rows) - self.notify(stream_name, token, rows) + preserve_fn(self.process_and_notify)(stream_name, token, rows) def get_streams_to_replicate(self): args = super(SyncReplicationHandler, self).get_streams_to_replicate() @@ -343,7 +339,7 @@ class SyncReplicationHandler(ReplicationClientHandler): return self.presence_handler.get_currently_syncing_users() @defer.inlineCallbacks - def notify(self, stream_name, token, rows): + def process_and_notify(self, stream_name, token, rows): if stream_name == "events": # We shouldn't get multiple rows per token for events stream, so # we don't need to optimise this for multiple rows. @@ -369,6 +365,7 @@ class SyncReplicationHandler(ReplicationClientHandler): "receipt_key", token, rooms=[row.room_id for row in rows], ) elif stream_name == "typing": + self.typing_handler.process_replication_rows(token, rows) self.notifier.on_new_event( "typing_key", token, rooms=[row.room_id for row in rows], ) @@ -386,6 +383,8 @@ class SyncReplicationHandler(ReplicationClientHandler): self.notifier.on_new_event( "device_list_key", token, rooms=all_room_ids, ) + elif stream_name == "presence": + yield self.presence_handler.process_replication_rows(token, rows) def start(config_options): -- cgit 1.4.1 From d1d5362267adb316e65f046787207aa7d862ec3b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 4 Apr 2017 16:41:03 +0100 Subject: Add comment --- synapse/app/federation_sender.py | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'synapse/app/federation_sender.py') diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index fa7b19e53a..cbddc80ca9 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -62,6 +62,11 @@ class FederationSenderSlaveStore( ): def __init__(self, db_conn, hs): super(FederationSenderSlaveStore, self).__init__(db_conn, hs) + + # We pull out the current federation stream position now so that we + # always have a known value for the federation position in memory so + # that we don't have to bounce via a deferred once when we start the + # replication streams. self.federation_out_pos_startup = self._get_federation_out_pos(db_conn) def _get_federation_out_pos(self, db_conn): -- cgit 1.4.1 From f10ce8944b8ef4c33694654c397bfcda44c6124a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 5 Apr 2017 11:10:28 +0100 Subject: Don't double json encode federation replication data --- synapse/app/federation_sender.py | 4 +--- synapse/federation/send_queue.py | 19 +++++++++---------- 2 files changed, 10 insertions(+), 13 deletions(-) (limited to 'synapse/app/federation_sender.py') diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index cbddc80ca9..145c01f3a3 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -51,7 +51,6 @@ from daemonize import Daemonize import sys import logging import gc -import ujson as json logger = logging.getLogger("synapse.app.appservice") @@ -290,8 +289,7 @@ class FederationSenderHandler(object): # Parse the rows in the stream for row in rows: typ = row.type - content_js = row.data - content = json.loads(content_js) + content = row.data if typ == send_queue.PRESENCE_TYPE: destination = content["destination"] diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 4bde66fbf8..78c852ed69 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -35,7 +35,6 @@ from synapse.util.metrics import Measure import synapse.metrics from blist import sorteddict -import ujson metrics = synapse.metrics.get_metrics_for(__name__) @@ -258,10 +257,10 @@ class FederationRemoteSendQueue(object): ) for (key, (dest, user_id)) in dest_user_ids: - rows.append((key, PRESENCE_TYPE, ujson.dumps({ + rows.append((key, PRESENCE_TYPE, { "destination": dest, "state": self.presence_map[user_id].as_dict(), - }))) + })) # Fetch changes keyed edus keys = self.keyed_edu_changed.keys() @@ -271,10 +270,10 @@ class FederationRemoteSendQueue(object): for (pos, (destination, edu_key)) in keyed_edus: rows.append( - (pos, KEYED_EDU_TYPE, ujson.dumps({ + (pos, KEYED_EDU_TYPE, { "key": edu_key, "edu": self.keyed_edu[(destination, edu_key)].get_internal_dict(), - })) + }) ) # Fetch changed edus @@ -284,7 +283,7 @@ class FederationRemoteSendQueue(object): edus = set((k, self.edus[k]) for k in keys[i:j]) for (pos, edu) in edus: - rows.append((pos, EDU_TYPE, ujson.dumps(edu.get_internal_dict()))) + rows.append((pos, EDU_TYPE, edu.get_internal_dict())) # Fetch changed failures keys = self.failures.keys() @@ -293,10 +292,10 @@ class FederationRemoteSendQueue(object): failures = set((k, self.failures[k]) for k in keys[i:j]) for (pos, (destination, failure)) in failures: - rows.append((pos, FAILURE_TYPE, ujson.dumps({ + rows.append((pos, FAILURE_TYPE, { "destination": destination, "failure": failure, - }))) + })) # Fetch changed device messages keys = self.device_messages.keys() @@ -305,9 +304,9 @@ class FederationRemoteSendQueue(object): device_messages = set((k, self.device_messages[k]) for k in keys[i:j]) for (pos, destination) in device_messages: - rows.append((pos, DEVICE_MESSAGE_TYPE, ujson.dumps({ + rows.append((pos, DEVICE_MESSAGE_TYPE, { "destination": destination, - }))) + })) # Sort rows based on pos rows.sort() -- cgit 1.4.1 From 449d1297cae8bb05140b257f6e08e4cc036bf481 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 7 Apr 2017 11:48:27 +0100 Subject: Fix up federation SendQueue and document types --- synapse/app/federation_sender.py | 66 +---------- synapse/federation/send_queue.py | 246 ++++++++++++++++++++++++++++++++++----- 2 files changed, 221 insertions(+), 91 deletions(-) (limited to 'synapse/app/federation_sender.py') diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 145c01f3a3..477e16e0fa 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -23,7 +23,6 @@ from synapse.config.homeserver import HomeServerConfig from synapse.crypto import context_factory from synapse.http.site import SynapseSite from synapse.federation import send_queue -from synapse.federation.units import Edu from synapse.metrics.resource import MetricsResource, METRICS_PREFIX from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore from synapse.replication.slave.storage.events import SlavedEventStore @@ -33,7 +32,6 @@ from synapse.replication.slave.storage.transactions import TransactionStore from synapse.replication.slave.storage.devices import SlavedDeviceStore from synapse.replication.tcp.client import ReplicationClientHandler from synapse.storage.engines import create_engine -from synapse.storage.presence import UserPresenceState from synapse.util.async import Linearizer from synapse.util.httpresourcetree import create_resource_tree from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn @@ -277,69 +275,7 @@ class FederationSenderHandler(object): # The federation stream contains things that we want to send out, e.g. # presence, typing, etc. if stream_name == "federation": - # The federation stream containis a bunch of different types of - # rows that need to be handled differently. We parse the rows, put - # them into the appropriate collection and then send them off. - presence_to_send = {} - keyed_edus = {} - edus = {} - failures = {} - device_destinations = set() - - # Parse the rows in the stream - for row in rows: - typ = row.type - content = row.data - - if typ == send_queue.PRESENCE_TYPE: - destination = content["destination"] - state = UserPresenceState.from_dict(content["state"]) - - presence_to_send.setdefault(destination, []).append(state) - elif typ == send_queue.KEYED_EDU_TYPE: - key = content["key"] - edu = Edu(**content["edu"]) - - keyed_edus.setdefault( - edu.destination, {} - )[(edu.destination, tuple(key))] = edu - elif typ == send_queue.EDU_TYPE: - edu = Edu(**content) - - edus.setdefault(edu.destination, []).append(edu) - elif typ == send_queue.FAILURE_TYPE: - destination = content["destination"] - failure = content["failure"] - - failures.setdefault(destination, []).append(failure) - elif typ == send_queue.DEVICE_MESSAGE_TYPE: - device_destinations.add(content["destination"]) - else: - raise Exception("Unrecognised federation type: %r", typ) - - # We've finished collecting, send everything off - for destination, states in presence_to_send.items(): - self.federation_sender.send_presence(destination, states) - - for destination, edu_map in keyed_edus.items(): - for key, edu in edu_map.items(): - self.federation_sender.send_edu( - edu.destination, edu.edu_type, edu.content, key=key, - ) - - for destination, edu_list in edus.items(): - for edu in edu_list: - self.federation_sender.send_edu( - edu.destination, edu.edu_type, edu.content, key=None, - ) - - for destination, failure_list in failures.items(): - for failure in failure_list: - self.federation_sender.send_failure(destination, failure) - - for destination in device_destinations: - self.federation_sender.send_device_messages(destination) - + send_queue.process_rows_for_federation(self.federation_sender, rows) preserve_fn(self.update_token)(token) # We also need to poke the federation sender when new events happen diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 78c852ed69..8a6392c697 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -31,22 +31,17 @@ Events are replicated via a separate events stream. from .units import Edu +from synapse.storage.presence import UserPresenceState from synapse.util.metrics import Measure import synapse.metrics from blist import sorteddict +from collections import namedtuple metrics = synapse.metrics.get_metrics_for(__name__) -PRESENCE_TYPE = "p" -KEYED_EDU_TYPE = "k" -EDU_TYPE = "e" -FAILURE_TYPE = "f" -DEVICE_MESSAGE_TYPE = "d" - - class FederationRemoteSendQueue(object): """A drop in replacement for TransactionQueue""" @@ -257,10 +252,10 @@ class FederationRemoteSendQueue(object): ) for (key, (dest, user_id)) in dest_user_ids: - rows.append((key, PRESENCE_TYPE, { - "destination": dest, - "state": self.presence_map[user_id].as_dict(), - })) + rows.append((key, PresenceRow( + destination=dest, + state=self.presence_map[user_id], + ))) # Fetch changes keyed edus keys = self.keyed_edu_changed.keys() @@ -269,12 +264,10 @@ class FederationRemoteSendQueue(object): keyed_edus = set((k, self.keyed_edu_changed[k]) for k in keys[i:j]) for (pos, (destination, edu_key)) in keyed_edus: - rows.append( - (pos, KEYED_EDU_TYPE, { - "key": edu_key, - "edu": self.keyed_edu[(destination, edu_key)].get_internal_dict(), - }) - ) + rows.append((pos, KeyedEduRow( + key=edu_key, + edu=self.keyed_edu[(destination, edu_key)], + ))) # Fetch changed edus keys = self.edus.keys() @@ -283,7 +276,7 @@ class FederationRemoteSendQueue(object): edus = set((k, self.edus[k]) for k in keys[i:j]) for (pos, edu) in edus: - rows.append((pos, EDU_TYPE, edu.get_internal_dict())) + rows.append((pos, EduRow(edu))) # Fetch changed failures keys = self.failures.keys() @@ -292,10 +285,10 @@ class FederationRemoteSendQueue(object): failures = set((k, self.failures[k]) for k in keys[i:j]) for (pos, (destination, failure)) in failures: - rows.append((pos, FAILURE_TYPE, { - "destination": destination, - "failure": failure, - })) + rows.append((pos, FailureRow( + destination=destination, + failure=failure, + ))) # Fetch changed device messages keys = self.device_messages.keys() @@ -304,11 +297,212 @@ class FederationRemoteSendQueue(object): device_messages = set((k, self.device_messages[k]) for k in keys[i:j]) for (pos, destination) in device_messages: - rows.append((pos, DEVICE_MESSAGE_TYPE, { - "destination": destination, - })) + rows.append((pos, DeviceRow( + destination=destination, + ))) # Sort rows based on pos rows.sort() - return rows + return [(pos, row.TypeId, row.to_data()) for pos, row in rows] + + +class BaseFederationRow(object): + TypeId = None + + @staticmethod + def from_data(data): + """Parse the data from the federation stream into a row. + """ + raise NotImplementedError() + + def to_data(self): + """Serialize this row to be sent over the federation stream + """ + raise NotImplementedError() + + def add_to_buffer(self, buff): + """Add this row to the appropriate field in the buffer ready for this + to be sent over federation. + + We use a buffer so that we can batch up events that have come in at + the same time and send them all at once. + + Args: + buff (BufferedToSend) + """ + raise NotImplementedError() + + +class PresenceRow(BaseFederationRow, namedtuple("PresenceRow", ( + "destination", # str + "state", # UserPresenceState +))): + TypeId = "p" + + @staticmethod + def from_data(data): + return PresenceRow( + destination=data["destination"], + state=UserPresenceState.from_dict(data["state"]) + ) + + def to_data(self): + return { + "destination": self.destination, + "state": self.state.as_dict() + } + + def add_to_buffer(self, buff): + buff.presence.setdefault(self.destination, []).append(self.state) + + +class KeyedEduRow(BaseFederationRow, namedtuple("KeyedEduRow", ( + "key", # tuple(str) - the edu key passed to send_edu + "edu", # Edu +))): + TypeId = "k" + + @staticmethod + def from_data(data): + return KeyedEduRow( + key=tuple(data["key"]), + edu=Edu(**data["edu"]), + ) + + def to_data(self): + return { + "key": self.key, + "edu": self.edu.get_internal_dict(), + } + + def add_to_buffer(self, buff): + buff.keyed_edus.setdefault( + self.edu.destination, {} + )[self.key] = self.edu + + +class EduRow(BaseFederationRow, namedtuple("EduRow", ( + "edu", # Edu +))): + TypeId = "e" + + @staticmethod + def from_data(data): + return EduRow(Edu(**data)) + + def to_data(self): + return self.edu.get_internal_dict() + + def add_to_buffer(self, buff): + buff.edus.setdefault(self.edu.destination, []).append(self.edu) + + +class FailureRow(BaseFederationRow, namedtuple("FailureRow", ( + "destination", # str + "failure", +))): + TypeId = "f" + + @staticmethod + def from_data(data): + return FailureRow( + destination=data["destination"], + failure=data["failure"], + ) + + def to_data(self): + return { + "destination": self.destination, + "failure": self.failure, + } + + def add_to_buffer(self, buff): + buff.failures.setdefault(self.destination, []).append(self.failure) + + +class DeviceRow(BaseFederationRow, namedtuple("DeviceRow", ( + "destination", # str +))): + TypeId = "d" + + @staticmethod + def from_data(data): + return DeviceRow(destination=data) + + def to_data(self): + return self.destination + + def add_to_buffer(self, buff): + buff.device_destinations.add(self.destination) + + +TypeToRow = { + Row.TypeId: Row + for Row in ( + PresenceRow, + KeyedEduRow, + EduRow, + FailureRow, + DeviceRow, + ) +} + + +BufferedToSend = namedtuple("BufferedToSend", ( + "presence", # dict of destination -> [UserPresenceState] + "keyed_edus", # dict of destination -> { key -> Edu } + "edus", # dict of destination -> [Edu] + "failures", # dict of destination -> [failures] + "device_destinations", # set of destinations +)) + + +def process_rows_for_federation(federation_sender, rows): + """Parse a list of rows from the federation stream and them send them out. + + Args: + federation_sender (TransactionQueue) + rows (list(FederationStreamRow)) + """ + + # The federation stream containis a bunch of different types of + # rows that need to be handled differently. We parse the rows, put + # them into the appropriate collection and then send them off. + + buff = BufferedToSend( + presence={}, + keyed_edus={}, + edus={}, + failures={}, + device_destinations=set(), + ) + + # Parse the rows in the stream and add to the buffer + for row in rows: + RowType = TypeToRow[row.type] + parsed_row = RowType.from_data(row.data) + parsed_row.add_to_buffer(buff) + + # We've finished collecting, send everything off + for destination, states in buff.presence.iteritems(): + federation_sender.send_presence(destination, states) + + for destination, edu_map in buff.keyed_edus.iteritems(): + for key, edu in edu_map.items(): + federation_sender.send_edu( + edu.destination, edu.edu_type, edu.content, key=key, + ) + + for destination, edu_list in buff.edus.iteritems(): + for edu in edu_list: + federation_sender.send_edu( + edu.destination, edu.edu_type, edu.content, key=None, + ) + + for destination, failure_list in buff.failures.iteritems(): + for failure in failure_list: + federation_sender.send_failure(destination, failure) + + for destination in buff.device_destinations: + federation_sender.send_device_messages(destination) -- cgit 1.4.1 From 29574fd5b3537cc272a4d792669b8d5be2a92b6f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 10 Apr 2017 16:48:30 +0100 Subject: Reduce federation presence replication traffic This is mainly done by moving the calculation of where to send presence updates from the presence handler to the transaction queue, so we only need to send the presence event (and not the destinations) across the replication connection. Before we were duplicating by sending the full state across once per destination. --- synapse/app/federation_sender.py | 12 ++++ synapse/app/synchrotron.py | 6 +- synapse/federation/send_queue.py | 47 ++++++-------- synapse/federation/transaction_queue.py | 99 ++++++++++++++++++++++++++--- synapse/handlers/presence.py | 54 ++++------------ synapse/replication/slave/storage/events.py | 1 + 6 files changed, 139 insertions(+), 80 deletions(-) (limited to 'synapse/app/federation_sender.py') diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 477e16e0fa..49efb602bc 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -32,6 +32,7 @@ from synapse.replication.slave.storage.transactions import TransactionStore from synapse.replication.slave.storage.devices import SlavedDeviceStore from synapse.replication.tcp.client import ReplicationClientHandler from synapse.storage.engines import create_engine +from synapse.storage.presence import PresenceStore from synapse.util.async import Linearizer from synapse.util.httpresourcetree import create_resource_tree from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn @@ -80,6 +81,17 @@ class FederationSenderSlaveStore( return rows[0][0] if rows else -1 + # XXX: This is a bit broken because we don't persist the accepted list in a + # way that can be replicated. This means that we don't have a way to + # invalidate the cache correctly. + # This is fine since in practice nobody uses the presence list stuff... + get_presence_list_accepted = PresenceStore.__dict__[ + "get_presence_list_accepted" + ] + get_presence_list_observers_accepted = PresenceStore.__dict__[ + "get_presence_list_observers_accepted" + ] + class FederationSenderServer(HomeServer): def get_db_conn(self, run_new_connection=True): diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index d39e3161fe..7b6f82abdc 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -206,10 +206,8 @@ class SynchrotronPresence(object): @defer.inlineCallbacks def notify_from_replication(self, states, stream_id): - parties = yield self._get_interested_parties( - states, calculate_remote_hosts=False - ) - room_ids_to_states, users_to_states, _ = parties + parties = yield self._get_interested_parties(states) + room_ids_to_states, users_to_states = parties self.notifier.on_new_event( "presence_key", stream_id, rooms=room_ids_to_states.keys(), diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 748548bbe2..a12c18f4df 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -53,6 +53,7 @@ class FederationRemoteSendQueue(object): self.server_name = hs.hostname self.clock = hs.get_clock() self.notifier = hs.get_notifier() + self.is_mine_id = hs.is_mine_id self.presence_map = {} self.presence_changed = sorteddict() @@ -120,7 +121,9 @@ class FederationRemoteSendQueue(object): del self.presence_changed[key] user_ids = set( - user_id for uids in self.presence_changed.values() for _, user_id in uids + user_id + for uids in self.presence_changed.itervalues() + for user_id in uids ) to_del = [ @@ -187,18 +190,14 @@ class FederationRemoteSendQueue(object): self.notifier.on_new_replication_data() - def send_presence(self, destination, states): + def send_presence(self, states): """As per TransactionQueue""" pos = self._next_pos() - self.presence_map.update({ - state.user_id: state - for state in states - }) + local_states = filter(lambda s: self.is_mine_id(s.user_id), states) - self.presence_changed[pos] = [ - (destination, state.user_id) for state in states - ] + self.presence_map.update({state.user_id: state for state in local_states}) + self.presence_changed[pos] = [state.user_id for state in local_states] self.notifier.on_new_replication_data() @@ -251,15 +250,14 @@ class FederationRemoteSendQueue(object): keys = self.presence_changed.keys() i = keys.bisect_right(from_token) j = keys.bisect_right(to_token) + 1 - dest_user_ids = set( - (pos, dest_user_id) + dest_user_ids = [ + (pos, user_id) for pos in keys[i:j] - for dest_user_id in self.presence_changed[pos] - ) + for user_id in self.presence_changed[pos] + ] - for (key, (dest, user_id)) in dest_user_ids: + for (key, user_id) in dest_user_ids: rows.append((key, PresenceRow( - destination=dest, state=self.presence_map[user_id], ))) @@ -354,7 +352,6 @@ class BaseFederationRow(object): class PresenceRow(BaseFederationRow, namedtuple("PresenceRow", ( - "destination", # str "state", # UserPresenceState ))): TypeId = "p" @@ -362,18 +359,14 @@ class PresenceRow(BaseFederationRow, namedtuple("PresenceRow", ( @staticmethod def from_data(data): return PresenceRow( - destination=data["destination"], - state=UserPresenceState.from_dict(data["state"]) + state=UserPresenceState.from_dict(data) ) def to_data(self): - return { - "destination": self.destination, - "state": self.state.as_dict() - } + return self.state.as_dict() def add_to_buffer(self, buff): - buff.presence.setdefault(self.destination, []).append(self.state) + buff.presence.append(self.state) class KeyedEduRow(BaseFederationRow, namedtuple("KeyedEduRow", ( @@ -469,7 +462,7 @@ TypeToRow = { ParsedFederationStreamData = namedtuple("ParsedFederationStreamData", ( - "presence", # dict of destination -> [UserPresenceState] + "presence", # list(UserPresenceState) "keyed_edus", # dict of destination -> { key -> Edu } "edus", # dict of destination -> [Edu] "failures", # dict of destination -> [failures] @@ -491,7 +484,7 @@ def process_rows_for_federation(transaction_queue, rows): # them into the appropriate collection and then send them off. buff = ParsedFederationStreamData( - presence={}, + presence=[], keyed_edus={}, edus={}, failures={}, @@ -508,8 +501,8 @@ def process_rows_for_federation(transaction_queue, rows): parsed_row = RowType.from_data(row.data) parsed_row.add_to_buffer(buff) - for destination, states in buff.presence.iteritems(): - transaction_queue.send_presence(destination, states) + if buff.presence: + transaction_queue.send_presence(buff.presence) for destination, edu_map in buff.keyed_edus.iteritems(): for key, edu in edu_map.items(): diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index c27ce7c5f3..fd9e1fa01c 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -21,7 +21,7 @@ from .units import Transaction, Edu from synapse.api.errors import HttpResponseException from synapse.util.async import run_on_reactor -from synapse.util.logcontext import preserve_context_over_fn +from synapse.util.logcontext import preserve_context_over_fn, preserve_fn from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter from synapse.util.metrics import measure_func from synapse.types import get_domain_from_id @@ -78,6 +78,7 @@ class TransactionQueue(object): self.pending_edus_by_dest = edus = {} # Presence needs to be separate as we send single aggragate EDUs + self.pending_presence = {} self.pending_presence_by_dest = presence = {} self.pending_edus_keyed_by_dest = edus_keyed = {} @@ -113,6 +114,8 @@ class TransactionQueue(object): self._is_processing = False self._last_poked_id = -1 + self._processing_pending_presence = False + def can_send_to(self, destination): """Can we send messages to the given server? @@ -224,17 +227,95 @@ class TransactionQueue(object): self._attempt_new_transaction, destination ) - def send_presence(self, destination, states): - if not self.can_send_to(destination): + @preserve_fn + @defer.inlineCallbacks + def send_presence(self, states): + """Send the new presence states to the appropriate destinations. + + Args: + states (list(UserPresenceState)) + """ + + # First we queue up the new presence by user ID, so multiple presence + # updates in quick successtion are correctly handled + self.pending_presence.update({state.user_id: state for state in states}) + + # We then handle the new pending presence in batches, first figuring + # out the destinations we need to send each state to and then poking it + # to attempt a new transaction. We linearize this so that we don't + # accidentally mess up the ordering and send multiple presence updates + # in the wrong order + if self._processing_pending_presence: return - self.pending_presence_by_dest.setdefault(destination, {}).update({ - state.user_id: state for state in states - }) + self._processing_pending_presence = True + try: + while True: + states = self.pending_presence + self.pending_presence = {} - preserve_context_over_fn( - self._attempt_new_transaction, destination - ) + if not states: + break + + yield self._process_presence_inner(states) + finally: + self._processing_pending_presence = False + + @measure_func("txnqueue._process_presence") + @defer.inlineCallbacks + def _process_presence_inner(self, states): + """Given a list of states populate self.pending_presence_by_dest and + poke to send a new transaction to each destination + + Args: + states (list(UserPresenceState)) + """ + # First we look up the rooms each user is in (as well as any explicit + # subscriptions), then for each distinct room we look up the remote + # hosts in those rooms. + room_ids_to_states = {} + users_to_states = {} + for state in states.itervalues(): + room_ids = yield self.store.get_rooms_for_user(state.user_id) + for room_id in room_ids: + room_ids_to_states.setdefault(room_id, []).append(state) + + plist = yield self.store.get_presence_list_observers_accepted( + state.user_id, + ) + for u in plist: + users_to_states.setdefault(u, []).append(state) + + hosts_and_states = [] + for room_id, states in room_ids_to_states.items(): + local_states = filter(lambda s: self.is_mine_id(s.user_id), states) + if not local_states: + continue + + hosts = yield self.store.get_hosts_in_room(room_id) + hosts_and_states.append((hosts, local_states)) + + for user_id, states in users_to_states.items(): + local_states = filter(lambda s: self.is_mine_id(s.user_id), states) + if not local_states: + continue + + host = get_domain_from_id(user_id) + hosts_and_states.append(([host], local_states)) + + # And now finally queue up new transactions + for destinations, states in hosts_and_states: + for destination in destinations: + if not self.can_send_to(destination): + continue + + self.pending_presence_by_dest.setdefault( + destination, {} + ).update({ + state.user_id: state for state in states + }) + + preserve_fn(self._attempt_new_transaction)(destination) def send_edu(self, destination, edu_type, content, key=None): edu = Edu( diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 9ed5af3cb4..c1c0dd4d3d 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -318,11 +318,7 @@ class PresenceHandler(object): if to_federation_ping: federation_presence_out_counter.inc_by(len(to_federation_ping)) - _, _, hosts_to_states = yield self._get_interested_parties( - to_federation_ping.values() - ) - - self._push_to_remotes(hosts_to_states) + self._push_to_remotes(to_federation_ping.values()) def _handle_timeouts(self): """Checks the presence of users that have timed out and updates as @@ -615,12 +611,12 @@ class PresenceHandler(object): defer.returnValue(states) @defer.inlineCallbacks - def _get_interested_parties(self, states, calculate_remote_hosts=True): + def _get_interested_parties(self, states): """Given a list of states return which entities (rooms, users, servers) are interested in the given states. Returns: - 3-tuple: `(room_ids_to_states, users_to_states, hosts_to_states)`, + 2-tuple: `(room_ids_to_states, users_to_states)`, with each item being a dict of `entity_name` -> `[UserPresenceState]` """ room_ids_to_states = {} @@ -637,30 +633,10 @@ class PresenceHandler(object): # Always notify self users_to_states.setdefault(state.user_id, []).append(state) - hosts_to_states = {} - if calculate_remote_hosts: - for room_id, states in room_ids_to_states.items(): - local_states = filter(lambda s: self.is_mine_id(s.user_id), states) - if not local_states: - continue - - hosts = yield self.store.get_hosts_in_room(room_id) - - for host in hosts: - hosts_to_states.setdefault(host, []).extend(local_states) - - for user_id, states in users_to_states.items(): - local_states = filter(lambda s: self.is_mine_id(s.user_id), states) - if not local_states: - continue - - host = get_domain_from_id(user_id) - hosts_to_states.setdefault(host, []).extend(local_states) - # TODO: de-dup hosts_to_states, as a single host might have multiple # of same presence - defer.returnValue((room_ids_to_states, users_to_states, hosts_to_states)) + defer.returnValue((room_ids_to_states, users_to_states)) @defer.inlineCallbacks def _persist_and_notify(self, states): @@ -670,33 +646,32 @@ class PresenceHandler(object): stream_id, max_token = yield self.store.update_presence(states) parties = yield self._get_interested_parties(states) - room_ids_to_states, users_to_states, hosts_to_states = parties + room_ids_to_states, users_to_states = parties self.notifier.on_new_event( "presence_key", stream_id, rooms=room_ids_to_states.keys(), - users=[UserID.from_string(u) for u in users_to_states.keys()] + users=[UserID.from_string(u) for u in users_to_states] ) - self._push_to_remotes(hosts_to_states) + self._push_to_remotes(states) @defer.inlineCallbacks def notify_for_states(self, state, stream_id): parties = yield self._get_interested_parties([state]) - room_ids_to_states, users_to_states, hosts_to_states = parties + room_ids_to_states, users_to_states = parties self.notifier.on_new_event( "presence_key", stream_id, rooms=room_ids_to_states.keys(), - users=[UserID.from_string(u) for u in users_to_states.keys()] + users=[UserID.from_string(u) for u in users_to_states] ) - def _push_to_remotes(self, hosts_to_states): + def _push_to_remotes(self, states): """Sends state updates to remote servers. Args: - hosts_to_states (dict): Mapping `server_name` -> `[UserPresenceState]` + hosts_to_states (list): list(state) """ - for host, states in hosts_to_states.items(): - self.federation.send_presence(host, states) + self.federation.send_presence(states) @defer.inlineCallbacks def incoming_presence(self, origin, content): @@ -837,14 +812,13 @@ class PresenceHandler(object): if self.is_mine(user): state = yield self.current_state_for_user(user.to_string()) - hosts = set(get_domain_from_id(u) for u in user_ids) - self._push_to_remotes({host: (state,) for host in hosts}) + self._push_to_remotes([state]) else: user_ids = filter(self.is_mine_id, user_ids) states = yield self.current_state_for_users(user_ids) - self._push_to_remotes({user.domain: states.values()}) + self._push_to_remotes(states.values()) @defer.inlineCallbacks def get_presence_list(self, observer_user, accepted=None): diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 5fd47706ef..4ca1e5aa8c 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -71,6 +71,7 @@ class SlavedEventStore(BaseSlavedStore): # to reach inside the __dict__ to extract them. get_rooms_for_user = RoomMemberStore.__dict__["get_rooms_for_user"] get_users_in_room = RoomMemberStore.__dict__["get_users_in_room"] + get_hosts_in_room = RoomMemberStore.__dict__["get_hosts_in_room"] get_users_who_share_room_with_user = ( RoomMemberStore.__dict__["get_users_who_share_room_with_user"] ) -- cgit 1.4.1 From 9c712a366fbbba90dbe18f7246c279d3cbb1cf10 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 11 Apr 2017 16:07:33 +0100 Subject: Move get_presence_list_* to SlaveStore --- synapse/app/federation_sender.py | 15 ++------------- synapse/app/synchrotron.py | 12 +----------- synapse/replication/slave/storage/presence.py | 10 ++++++++++ 3 files changed, 13 insertions(+), 24 deletions(-) (limited to 'synapse/app/federation_sender.py') diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 49efb602bc..e51a69074d 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -28,11 +28,11 @@ from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore +from synapse.replication.slave.storage.presence import SlavedPresenceStore from synapse.replication.slave.storage.transactions import TransactionStore from synapse.replication.slave.storage.devices import SlavedDeviceStore from synapse.replication.tcp.client import ReplicationClientHandler from synapse.storage.engines import create_engine -from synapse.storage.presence import PresenceStore from synapse.util.async import Linearizer from synapse.util.httpresourcetree import create_resource_tree from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn @@ -56,7 +56,7 @@ logger = logging.getLogger("synapse.app.appservice") class FederationSenderSlaveStore( SlavedDeviceInboxStore, TransactionStore, SlavedReceiptsStore, SlavedEventStore, - SlavedRegistrationStore, SlavedDeviceStore, + SlavedRegistrationStore, SlavedDeviceStore, SlavedPresenceStore, ): def __init__(self, db_conn, hs): super(FederationSenderSlaveStore, self).__init__(db_conn, hs) @@ -81,17 +81,6 @@ class FederationSenderSlaveStore( return rows[0][0] if rows else -1 - # XXX: This is a bit broken because we don't persist the accepted list in a - # way that can be replicated. This means that we don't have a way to - # invalidate the cache correctly. - # This is fine since in practice nobody uses the presence list stuff... - get_presence_list_accepted = PresenceStore.__dict__[ - "get_presence_list_accepted" - ] - get_presence_list_observers_accepted = PresenceStore.__dict__[ - "get_presence_list_observers_accepted" - ] - class FederationSenderServer(HomeServer): def get_db_conn(self, run_new_connection=True): diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index e3fbf02c9c..13c00ef2ba 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -44,7 +44,7 @@ from synapse.replication.tcp.client import ReplicationClientHandler from synapse.server import HomeServer from synapse.storage.client_ips import ClientIpStore from synapse.storage.engines import create_engine -from synapse.storage.presence import PresenceStore, UserPresenceState +from synapse.storage.presence import UserPresenceState from synapse.storage.roommember import RoomMemberStore from synapse.util.httpresourcetree import create_resource_tree from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn @@ -89,16 +89,6 @@ class SynchrotronSlavedStore( RoomMemberStore.__dict__["did_forget"] ) - # XXX: This is a bit broken because we don't persist the accepted list in a - # way that can be replicated. This means that we don't have a way to - # invalidate the cache correctly. - get_presence_list_accepted = PresenceStore.__dict__[ - "get_presence_list_accepted" - ] - get_presence_list_observers_accepted = PresenceStore.__dict__[ - "get_presence_list_observers_accepted" - ] - UPDATE_SYNCING_USERS_MS = 10 * 1000 diff --git a/synapse/replication/slave/storage/presence.py b/synapse/replication/slave/storage/presence.py index dffc80adc3..cfb9280181 100644 --- a/synapse/replication/slave/storage/presence.py +++ b/synapse/replication/slave/storage/presence.py @@ -39,6 +39,16 @@ class SlavedPresenceStore(BaseSlavedStore): _get_presence_for_user = PresenceStore.__dict__["_get_presence_for_user"] get_presence_for_users = PresenceStore.__dict__["get_presence_for_users"] + # XXX: This is a bit broken because we don't persist the accepted list in a + # way that can be replicated. This means that we don't have a way to + # invalidate the cache correctly. + get_presence_list_accepted = PresenceStore.__dict__[ + "get_presence_list_accepted" + ] + get_presence_list_observers_accepted = PresenceStore.__dict__[ + "get_presence_list_observers_accepted" + ] + def get_current_presence_token(self): return self._presence_id_gen.get_current_token() -- cgit 1.4.1