From 3a1f3f838862cfd2486773079d01e30c2237e8c2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 27 Mar 2017 16:32:07 +0100 Subject: Change slave storage to use new replication interface As the TCP replication uses a slightly different API and streams than the HTTP replication. This breaks HTTP replication. --- synapse/replication/slave/storage/_base.py | 31 +++--------- synapse/replication/slave/storage/account_data.py | 49 +++++++------------ synapse/replication/slave/storage/deviceinbox.py | 23 ++++----- synapse/replication/slave/storage/devices.py | 24 ++++------ synapse/replication/slave/storage/events.py | 57 +++++++---------------- synapse/replication/slave/storage/presence.py | 19 ++++---- synapse/replication/slave/storage/push_rule.py | 23 ++++----- synapse/replication/slave/storage/pushers.py | 16 +++---- synapse/replication/slave/storage/receipts.py | 24 +++++----- synapse/replication/slave/storage/room.py | 11 +++-- 10 files changed, 106 insertions(+), 171 deletions(-) (limited to 'synapse/replication/slave') diff --git a/synapse/replication/slave/storage/_base.py b/synapse/replication/slave/storage/_base.py index ab133db872..b962641166 100644 --- a/synapse/replication/slave/storage/_base.py +++ b/synapse/replication/slave/storage/_base.py @@ -15,7 +15,6 @@ from synapse.storage._base import SQLBaseStore from synapse.storage.engines import PostgresEngine -from twisted.internet import defer from ._slaved_id_tracker import SlavedIdTracker @@ -34,8 +33,7 @@ class BaseSlavedStore(SQLBaseStore): else: self._cache_id_gen = None - self.expire_cache_url = hs.config.worker_replication_url + "/expire_cache" - self.http_client = hs.get_simple_http_client() + self.hs = hs def stream_positions(self): pos = {} @@ -43,35 +41,20 @@ class BaseSlavedStore(SQLBaseStore): pos["caches"] = self._cache_id_gen.get_current_token() return pos - def process_replication(self, result): - stream = result.get("caches") - if stream: - for row in stream["rows"]: - ( - position, cache_func, keys, invalidation_ts, - ) = row - + def process_replication_rows(self, stream_name, token, rows): + if stream_name == "caches": + self._cache_id_gen.advance(token) + for row in rows: try: - getattr(self, cache_func).invalidate(tuple(keys)) + getattr(self, row.cache_func).invalidate(tuple(row.keys)) except AttributeError: # We probably haven't pulled in the cache in this worker, # which is fine. pass - self._cache_id_gen.advance(int(stream["position"])) - return defer.succeed(None) def _invalidate_cache_and_stream(self, txn, cache_func, keys): txn.call_after(cache_func.invalidate, keys) txn.call_after(self._send_invalidation_poke, cache_func, keys) - @defer.inlineCallbacks def _send_invalidation_poke(self, cache_func, keys): - try: - yield self.http_client.post_json_get_json(self.expire_cache_url, { - "invalidate": [{ - "name": cache_func.__name__, - "keys": list(keys), - }] - }) - except: - logger.exception("Failed to poke on expire_cache") + self.hs.get_tcp_replication().send_invalidate_cache(cache_func, keys) diff --git a/synapse/replication/slave/storage/account_data.py b/synapse/replication/slave/storage/account_data.py index 77c64722c7..efbd87918e 100644 --- a/synapse/replication/slave/storage/account_data.py +++ b/synapse/replication/slave/storage/account_data.py @@ -69,38 +69,25 @@ class SlavedAccountDataStore(BaseSlavedStore): result["tag_account_data"] = position return result - def process_replication(self, result): - stream = result.get("user_account_data") - if stream: - self._account_data_id_gen.advance(int(stream["position"])) - for row in stream["rows"]: - position, user_id, data_type = row[:3] - self.get_global_account_data_by_type_for_user.invalidate( - (data_type, user_id,) - ) - self.get_account_data_for_user.invalidate((user_id,)) + def process_replication_rows(self, stream_name, token, rows): + if stream_name == "tag_account_data": + self._account_data_id_gen.advance(token) + for row in rows: + self.get_tags_for_user.invalidate((row.user_id,)) self._account_data_stream_cache.entity_has_changed( - user_id, position + row.user_id, token ) - - stream = result.get("room_account_data") - if stream: - self._account_data_id_gen.advance(int(stream["position"])) - for row in stream["rows"]: - position, user_id = row[:2] - self.get_account_data_for_user.invalidate((user_id,)) + elif stream_name == "account_data": + self._account_data_id_gen.advance(token) + for row in rows: + if not row.room_id: + self.get_global_account_data_by_type_for_user.invalidate( + (row.data_type, row.user_id,) + ) + self.get_account_data_for_user.invalidate((row.user_id,)) self._account_data_stream_cache.entity_has_changed( - user_id, position + row.user_id, token ) - - stream = result.get("tag_account_data") - if stream: - self._account_data_id_gen.advance(int(stream["position"])) - for row in stream["rows"]: - position, user_id = row[:2] - self.get_tags_for_user.invalidate((user_id,)) - self._account_data_stream_cache.entity_has_changed( - user_id, position - ) - - return super(SlavedAccountDataStore, self).process_replication(result) + return super(SlavedAccountDataStore, self).process_replication_rows( + stream_name, token, rows + ) diff --git a/synapse/replication/slave/storage/deviceinbox.py b/synapse/replication/slave/storage/deviceinbox.py index f9102e0d89..6f3fb64770 100644 --- a/synapse/replication/slave/storage/deviceinbox.py +++ b/synapse/replication/slave/storage/deviceinbox.py @@ -53,21 +53,18 @@ class SlavedDeviceInboxStore(BaseSlavedStore): result["to_device"] = self._device_inbox_id_gen.get_current_token() return result - def process_replication(self, result): - stream = result.get("to_device") - if stream: - self._device_inbox_id_gen.advance(int(stream["position"])) - for row in stream["rows"]: - stream_id = row[0] - entity = row[1] - - if entity.startswith("@"): + def process_replication_rows(self, stream_name, token, rows): + if stream_name == "to_device": + self._device_inbox_id_gen.advance(token) + for row in rows: + if row.entity.startswith("@"): self._device_inbox_stream_cache.entity_has_changed( - entity, stream_id + row.entity, token ) else: self._device_federation_outbox_stream_cache.entity_has_changed( - entity, stream_id + row.entity, token ) - - return super(SlavedDeviceInboxStore, self).process_replication(result) + return super(SlavedDeviceInboxStore, self).process_replication_rows( + stream_name, token, rows + ) diff --git a/synapse/replication/slave/storage/devices.py b/synapse/replication/slave/storage/devices.py index ca46aa17b6..4d4a435471 100644 --- a/synapse/replication/slave/storage/devices.py +++ b/synapse/replication/slave/storage/devices.py @@ -51,22 +51,18 @@ class SlavedDeviceStore(BaseSlavedStore): result["device_lists"] = self._device_list_id_gen.get_current_token() return result - def process_replication(self, result): - stream = result.get("device_lists") - if stream: - self._device_list_id_gen.advance(int(stream["position"])) - for row in stream["rows"]: - stream_id = row[0] - user_id = row[1] - destination = row[2] - + def process_replication_rows(self, stream_name, token, rows): + if stream_name == "device_lists": + self._device_list_id_gen.advance(token) + for row in rows: self._device_list_stream_cache.entity_has_changed( - user_id, stream_id + row.user_id, token ) - if destination: + if row.destination: self._device_list_federation_stream_cache.entity_has_changed( - destination, stream_id + row.destination, token ) - - return super(SlavedDeviceStore, self).process_replication(result) + return super(SlavedDeviceStore, self).process_replication_rows( + stream_name, token, rows + ) diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index d4db1e452e..5fd47706ef 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -201,48 +201,25 @@ class SlavedEventStore(BaseSlavedStore): result["backfill"] = -self._backfill_id_gen.get_current_token() return result - def process_replication(self, result): - stream = result.get("events") - if stream: - self._stream_id_gen.advance(int(stream["position"])) - - if stream["rows"]: - logger.info("Got %d event rows", len(stream["rows"])) - - for row in stream["rows"]: - self._process_replication_row( - row, backfilled=False, + def process_replication_rows(self, stream_name, token, rows): + if stream_name == "events": + self._stream_id_gen.advance(token) + for row in rows: + self.invalidate_caches_for_event( + token, row.event_id, row.room_id, row.type, row.state_key, + row.redacts, + backfilled=False, ) - - stream = result.get("backfill") - if stream: - self._backfill_id_gen.advance(-int(stream["position"])) - for row in stream["rows"]: - self._process_replication_row( - row, backfilled=True, + elif stream_name == "backfill": + self._backfill_id_gen.advance(-token) + for row in rows: + self.invalidate_caches_for_event( + -token, row.event_id, row.room_id, row.type, row.state_key, + row.redacts, + backfilled=True, ) - - stream = result.get("forward_ex_outliers") - if stream: - self._stream_id_gen.advance(int(stream["position"])) - for row in stream["rows"]: - event_id = row[1] - self._invalidate_get_event_cache(event_id) - - stream = result.get("backward_ex_outliers") - if stream: - self._backfill_id_gen.advance(-int(stream["position"])) - for row in stream["rows"]: - event_id = row[1] - self._invalidate_get_event_cache(event_id) - - return super(SlavedEventStore, self).process_replication(result) - - def _process_replication_row(self, row, backfilled): - stream_ordering = row[0] if not backfilled else -row[0] - self.invalidate_caches_for_event( - stream_ordering, row[1], row[2], row[3], row[4], row[5], - backfilled=backfilled, + return super(SlavedEventStore, self).process_replication_rows( + stream_name, token, rows ) def invalidate_caches_for_event(self, stream_ordering, event_id, room_id, diff --git a/synapse/replication/slave/storage/presence.py b/synapse/replication/slave/storage/presence.py index e4a2414d78..dffc80adc3 100644 --- a/synapse/replication/slave/storage/presence.py +++ b/synapse/replication/slave/storage/presence.py @@ -48,15 +48,14 @@ class SlavedPresenceStore(BaseSlavedStore): result["presence"] = position return result - def process_replication(self, result): - stream = result.get("presence") - if stream: - self._presence_id_gen.advance(int(stream["position"])) - for row in stream["rows"]: - position, user_id = row[:2] + def process_replication_rows(self, stream_name, token, rows): + if stream_name == "presence": + self._presence_id_gen.advance(token) + for row in rows: self.presence_stream_cache.entity_has_changed( - user_id, position + row.user_id, token ) - self._get_presence_for_user.invalidate((user_id,)) - - return super(SlavedPresenceStore, self).process_replication(result) + self._get_presence_for_user.invalidate((row.user_id,)) + return super(SlavedPresenceStore, self).process_replication_rows( + stream_name, token, rows + ) diff --git a/synapse/replication/slave/storage/push_rule.py b/synapse/replication/slave/storage/push_rule.py index 21ceb0213a..83e880fdd2 100644 --- a/synapse/replication/slave/storage/push_rule.py +++ b/synapse/replication/slave/storage/push_rule.py @@ -50,18 +50,15 @@ class SlavedPushRuleStore(SlavedEventStore): result["push_rules"] = self._push_rules_stream_id_gen.get_current_token() return result - def process_replication(self, result): - stream = result.get("push_rules") - if stream: - for row in stream["rows"]: - position = row[0] - user_id = row[2] - self.get_push_rules_for_user.invalidate((user_id,)) - self.get_push_rules_enabled_for_user.invalidate((user_id,)) + def process_replication_rows(self, stream_name, token, rows): + if stream_name == "push_rules": + self._push_rules_stream_id_gen.advance(token) + for row in rows: + self.get_push_rules_for_user.invalidate((row.user_id,)) + self.get_push_rules_enabled_for_user.invalidate((row.user_id,)) self.push_rules_stream_cache.entity_has_changed( - user_id, position + row.user_id, token ) - - self._push_rules_stream_id_gen.advance(int(stream["position"])) - - return super(SlavedPushRuleStore, self).process_replication(result) + return super(SlavedPushRuleStore, self).process_replication_rows( + stream_name, token, rows + ) diff --git a/synapse/replication/slave/storage/pushers.py b/synapse/replication/slave/storage/pushers.py index d88206b3bb..4e8d68ece9 100644 --- a/synapse/replication/slave/storage/pushers.py +++ b/synapse/replication/slave/storage/pushers.py @@ -40,13 +40,9 @@ class SlavedPusherStore(BaseSlavedStore): result["pushers"] = self._pushers_id_gen.get_current_token() return result - def process_replication(self, result): - stream = result.get("pushers") - if stream: - self._pushers_id_gen.advance(int(stream["position"])) - - stream = result.get("deleted_pushers") - if stream: - self._pushers_id_gen.advance(int(stream["position"])) - - return super(SlavedPusherStore, self).process_replication(result) + def process_replication_rows(self, stream_name, token, rows): + if stream_name == "pushers": + self._pushers_id_gen.advance(token) + return super(SlavedPusherStore, self).process_replication_rows( + stream_name, token, rows + ) diff --git a/synapse/replication/slave/storage/receipts.py b/synapse/replication/slave/storage/receipts.py index ac9662d399..b371574ece 100644 --- a/synapse/replication/slave/storage/receipts.py +++ b/synapse/replication/slave/storage/receipts.py @@ -65,20 +65,22 @@ class SlavedReceiptsStore(BaseSlavedStore): result["receipts"] = self._receipts_id_gen.get_current_token() return result - def process_replication(self, result): - stream = result.get("receipts") - if stream: - self._receipts_id_gen.advance(int(stream["position"])) - for row in stream["rows"]: - position, room_id, receipt_type, user_id = row[:4] - self.invalidate_caches_for_receipt(room_id, receipt_type, user_id) - self._receipts_stream_cache.entity_has_changed(room_id, position) - - return super(SlavedReceiptsStore, self).process_replication(result) - def invalidate_caches_for_receipt(self, room_id, receipt_type, user_id): self.get_receipts_for_user.invalidate((user_id, receipt_type)) self.get_linearized_receipts_for_room.invalidate_many((room_id,)) self.get_last_receipt_event_id_for_user.invalidate( (user_id, room_id, receipt_type) ) + + def process_replication_rows(self, stream_name, token, rows): + if stream_name == "receipts": + self._receipts_id_gen.advance(token) + for row in rows: + self.invalidate_caches_for_receipt( + row.room_id, row.receipt_type, row.user_id + ) + self._receipts_stream_cache.entity_has_changed(row.room_id, token) + + return super(SlavedReceiptsStore, self).process_replication_rows( + stream_name, token, rows + ) diff --git a/synapse/replication/slave/storage/room.py b/synapse/replication/slave/storage/room.py index 6df9a25ef3..f510384033 100644 --- a/synapse/replication/slave/storage/room.py +++ b/synapse/replication/slave/storage/room.py @@ -46,9 +46,10 @@ class RoomStore(BaseSlavedStore): result["public_rooms"] = self._public_room_id_gen.get_current_token() return result - def process_replication(self, result): - stream = result.get("public_rooms") - if stream: - self._public_room_id_gen.advance(int(stream["position"])) + def process_replication_rows(self, stream_name, token, rows): + if stream_name == "public_rooms": + self._public_room_id_gen.advance(token) - return super(RoomStore, self).process_replication(result) + return super(RoomStore, self).process_replication_rows( + stream_name, token, rows + ) -- cgit 1.4.1 From 29574fd5b3537cc272a4d792669b8d5be2a92b6f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 10 Apr 2017 16:48:30 +0100 Subject: Reduce federation presence replication traffic This is mainly done by moving the calculation of where to send presence updates from the presence handler to the transaction queue, so we only need to send the presence event (and not the destinations) across the replication connection. Before we were duplicating by sending the full state across once per destination. --- synapse/app/federation_sender.py | 12 ++++ synapse/app/synchrotron.py | 6 +- synapse/federation/send_queue.py | 47 ++++++-------- synapse/federation/transaction_queue.py | 99 ++++++++++++++++++++++++++--- synapse/handlers/presence.py | 54 ++++------------ synapse/replication/slave/storage/events.py | 1 + 6 files changed, 139 insertions(+), 80 deletions(-) (limited to 'synapse/replication/slave') diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 477e16e0fa..49efb602bc 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -32,6 +32,7 @@ from synapse.replication.slave.storage.transactions import TransactionStore from synapse.replication.slave.storage.devices import SlavedDeviceStore from synapse.replication.tcp.client import ReplicationClientHandler from synapse.storage.engines import create_engine +from synapse.storage.presence import PresenceStore from synapse.util.async import Linearizer from synapse.util.httpresourcetree import create_resource_tree from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn @@ -80,6 +81,17 @@ class FederationSenderSlaveStore( return rows[0][0] if rows else -1 + # XXX: This is a bit broken because we don't persist the accepted list in a + # way that can be replicated. This means that we don't have a way to + # invalidate the cache correctly. + # This is fine since in practice nobody uses the presence list stuff... + get_presence_list_accepted = PresenceStore.__dict__[ + "get_presence_list_accepted" + ] + get_presence_list_observers_accepted = PresenceStore.__dict__[ + "get_presence_list_observers_accepted" + ] + class FederationSenderServer(HomeServer): def get_db_conn(self, run_new_connection=True): diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index d39e3161fe..7b6f82abdc 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -206,10 +206,8 @@ class SynchrotronPresence(object): @defer.inlineCallbacks def notify_from_replication(self, states, stream_id): - parties = yield self._get_interested_parties( - states, calculate_remote_hosts=False - ) - room_ids_to_states, users_to_states, _ = parties + parties = yield self._get_interested_parties(states) + room_ids_to_states, users_to_states = parties self.notifier.on_new_event( "presence_key", stream_id, rooms=room_ids_to_states.keys(), diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 748548bbe2..a12c18f4df 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -53,6 +53,7 @@ class FederationRemoteSendQueue(object): self.server_name = hs.hostname self.clock = hs.get_clock() self.notifier = hs.get_notifier() + self.is_mine_id = hs.is_mine_id self.presence_map = {} self.presence_changed = sorteddict() @@ -120,7 +121,9 @@ class FederationRemoteSendQueue(object): del self.presence_changed[key] user_ids = set( - user_id for uids in self.presence_changed.values() for _, user_id in uids + user_id + for uids in self.presence_changed.itervalues() + for user_id in uids ) to_del = [ @@ -187,18 +190,14 @@ class FederationRemoteSendQueue(object): self.notifier.on_new_replication_data() - def send_presence(self, destination, states): + def send_presence(self, states): """As per TransactionQueue""" pos = self._next_pos() - self.presence_map.update({ - state.user_id: state - for state in states - }) + local_states = filter(lambda s: self.is_mine_id(s.user_id), states) - self.presence_changed[pos] = [ - (destination, state.user_id) for state in states - ] + self.presence_map.update({state.user_id: state for state in local_states}) + self.presence_changed[pos] = [state.user_id for state in local_states] self.notifier.on_new_replication_data() @@ -251,15 +250,14 @@ class FederationRemoteSendQueue(object): keys = self.presence_changed.keys() i = keys.bisect_right(from_token) j = keys.bisect_right(to_token) + 1 - dest_user_ids = set( - (pos, dest_user_id) + dest_user_ids = [ + (pos, user_id) for pos in keys[i:j] - for dest_user_id in self.presence_changed[pos] - ) + for user_id in self.presence_changed[pos] + ] - for (key, (dest, user_id)) in dest_user_ids: + for (key, user_id) in dest_user_ids: rows.append((key, PresenceRow( - destination=dest, state=self.presence_map[user_id], ))) @@ -354,7 +352,6 @@ class BaseFederationRow(object): class PresenceRow(BaseFederationRow, namedtuple("PresenceRow", ( - "destination", # str "state", # UserPresenceState ))): TypeId = "p" @@ -362,18 +359,14 @@ class PresenceRow(BaseFederationRow, namedtuple("PresenceRow", ( @staticmethod def from_data(data): return PresenceRow( - destination=data["destination"], - state=UserPresenceState.from_dict(data["state"]) + state=UserPresenceState.from_dict(data) ) def to_data(self): - return { - "destination": self.destination, - "state": self.state.as_dict() - } + return self.state.as_dict() def add_to_buffer(self, buff): - buff.presence.setdefault(self.destination, []).append(self.state) + buff.presence.append(self.state) class KeyedEduRow(BaseFederationRow, namedtuple("KeyedEduRow", ( @@ -469,7 +462,7 @@ TypeToRow = { ParsedFederationStreamData = namedtuple("ParsedFederationStreamData", ( - "presence", # dict of destination -> [UserPresenceState] + "presence", # list(UserPresenceState) "keyed_edus", # dict of destination -> { key -> Edu } "edus", # dict of destination -> [Edu] "failures", # dict of destination -> [failures] @@ -491,7 +484,7 @@ def process_rows_for_federation(transaction_queue, rows): # them into the appropriate collection and then send them off. buff = ParsedFederationStreamData( - presence={}, + presence=[], keyed_edus={}, edus={}, failures={}, @@ -508,8 +501,8 @@ def process_rows_for_federation(transaction_queue, rows): parsed_row = RowType.from_data(row.data) parsed_row.add_to_buffer(buff) - for destination, states in buff.presence.iteritems(): - transaction_queue.send_presence(destination, states) + if buff.presence: + transaction_queue.send_presence(buff.presence) for destination, edu_map in buff.keyed_edus.iteritems(): for key, edu in edu_map.items(): diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index c27ce7c5f3..fd9e1fa01c 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -21,7 +21,7 @@ from .units import Transaction, Edu from synapse.api.errors import HttpResponseException from synapse.util.async import run_on_reactor -from synapse.util.logcontext import preserve_context_over_fn +from synapse.util.logcontext import preserve_context_over_fn, preserve_fn from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter from synapse.util.metrics import measure_func from synapse.types import get_domain_from_id @@ -78,6 +78,7 @@ class TransactionQueue(object): self.pending_edus_by_dest = edus = {} # Presence needs to be separate as we send single aggragate EDUs + self.pending_presence = {} self.pending_presence_by_dest = presence = {} self.pending_edus_keyed_by_dest = edus_keyed = {} @@ -113,6 +114,8 @@ class TransactionQueue(object): self._is_processing = False self._last_poked_id = -1 + self._processing_pending_presence = False + def can_send_to(self, destination): """Can we send messages to the given server? @@ -224,17 +227,95 @@ class TransactionQueue(object): self._attempt_new_transaction, destination ) - def send_presence(self, destination, states): - if not self.can_send_to(destination): + @preserve_fn + @defer.inlineCallbacks + def send_presence(self, states): + """Send the new presence states to the appropriate destinations. + + Args: + states (list(UserPresenceState)) + """ + + # First we queue up the new presence by user ID, so multiple presence + # updates in quick successtion are correctly handled + self.pending_presence.update({state.user_id: state for state in states}) + + # We then handle the new pending presence in batches, first figuring + # out the destinations we need to send each state to and then poking it + # to attempt a new transaction. We linearize this so that we don't + # accidentally mess up the ordering and send multiple presence updates + # in the wrong order + if self._processing_pending_presence: return - self.pending_presence_by_dest.setdefault(destination, {}).update({ - state.user_id: state for state in states - }) + self._processing_pending_presence = True + try: + while True: + states = self.pending_presence + self.pending_presence = {} - preserve_context_over_fn( - self._attempt_new_transaction, destination - ) + if not states: + break + + yield self._process_presence_inner(states) + finally: + self._processing_pending_presence = False + + @measure_func("txnqueue._process_presence") + @defer.inlineCallbacks + def _process_presence_inner(self, states): + """Given a list of states populate self.pending_presence_by_dest and + poke to send a new transaction to each destination + + Args: + states (list(UserPresenceState)) + """ + # First we look up the rooms each user is in (as well as any explicit + # subscriptions), then for each distinct room we look up the remote + # hosts in those rooms. + room_ids_to_states = {} + users_to_states = {} + for state in states.itervalues(): + room_ids = yield self.store.get_rooms_for_user(state.user_id) + for room_id in room_ids: + room_ids_to_states.setdefault(room_id, []).append(state) + + plist = yield self.store.get_presence_list_observers_accepted( + state.user_id, + ) + for u in plist: + users_to_states.setdefault(u, []).append(state) + + hosts_and_states = [] + for room_id, states in room_ids_to_states.items(): + local_states = filter(lambda s: self.is_mine_id(s.user_id), states) + if not local_states: + continue + + hosts = yield self.store.get_hosts_in_room(room_id) + hosts_and_states.append((hosts, local_states)) + + for user_id, states in users_to_states.items(): + local_states = filter(lambda s: self.is_mine_id(s.user_id), states) + if not local_states: + continue + + host = get_domain_from_id(user_id) + hosts_and_states.append(([host], local_states)) + + # And now finally queue up new transactions + for destinations, states in hosts_and_states: + for destination in destinations: + if not self.can_send_to(destination): + continue + + self.pending_presence_by_dest.setdefault( + destination, {} + ).update({ + state.user_id: state for state in states + }) + + preserve_fn(self._attempt_new_transaction)(destination) def send_edu(self, destination, edu_type, content, key=None): edu = Edu( diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 9ed5af3cb4..c1c0dd4d3d 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -318,11 +318,7 @@ class PresenceHandler(object): if to_federation_ping: federation_presence_out_counter.inc_by(len(to_federation_ping)) - _, _, hosts_to_states = yield self._get_interested_parties( - to_federation_ping.values() - ) - - self._push_to_remotes(hosts_to_states) + self._push_to_remotes(to_federation_ping.values()) def _handle_timeouts(self): """Checks the presence of users that have timed out and updates as @@ -615,12 +611,12 @@ class PresenceHandler(object): defer.returnValue(states) @defer.inlineCallbacks - def _get_interested_parties(self, states, calculate_remote_hosts=True): + def _get_interested_parties(self, states): """Given a list of states return which entities (rooms, users, servers) are interested in the given states. Returns: - 3-tuple: `(room_ids_to_states, users_to_states, hosts_to_states)`, + 2-tuple: `(room_ids_to_states, users_to_states)`, with each item being a dict of `entity_name` -> `[UserPresenceState]` """ room_ids_to_states = {} @@ -637,30 +633,10 @@ class PresenceHandler(object): # Always notify self users_to_states.setdefault(state.user_id, []).append(state) - hosts_to_states = {} - if calculate_remote_hosts: - for room_id, states in room_ids_to_states.items(): - local_states = filter(lambda s: self.is_mine_id(s.user_id), states) - if not local_states: - continue - - hosts = yield self.store.get_hosts_in_room(room_id) - - for host in hosts: - hosts_to_states.setdefault(host, []).extend(local_states) - - for user_id, states in users_to_states.items(): - local_states = filter(lambda s: self.is_mine_id(s.user_id), states) - if not local_states: - continue - - host = get_domain_from_id(user_id) - hosts_to_states.setdefault(host, []).extend(local_states) - # TODO: de-dup hosts_to_states, as a single host might have multiple # of same presence - defer.returnValue((room_ids_to_states, users_to_states, hosts_to_states)) + defer.returnValue((room_ids_to_states, users_to_states)) @defer.inlineCallbacks def _persist_and_notify(self, states): @@ -670,33 +646,32 @@ class PresenceHandler(object): stream_id, max_token = yield self.store.update_presence(states) parties = yield self._get_interested_parties(states) - room_ids_to_states, users_to_states, hosts_to_states = parties + room_ids_to_states, users_to_states = parties self.notifier.on_new_event( "presence_key", stream_id, rooms=room_ids_to_states.keys(), - users=[UserID.from_string(u) for u in users_to_states.keys()] + users=[UserID.from_string(u) for u in users_to_states] ) - self._push_to_remotes(hosts_to_states) + self._push_to_remotes(states) @defer.inlineCallbacks def notify_for_states(self, state, stream_id): parties = yield self._get_interested_parties([state]) - room_ids_to_states, users_to_states, hosts_to_states = parties + room_ids_to_states, users_to_states = parties self.notifier.on_new_event( "presence_key", stream_id, rooms=room_ids_to_states.keys(), - users=[UserID.from_string(u) for u in users_to_states.keys()] + users=[UserID.from_string(u) for u in users_to_states] ) - def _push_to_remotes(self, hosts_to_states): + def _push_to_remotes(self, states): """Sends state updates to remote servers. Args: - hosts_to_states (dict): Mapping `server_name` -> `[UserPresenceState]` + hosts_to_states (list): list(state) """ - for host, states in hosts_to_states.items(): - self.federation.send_presence(host, states) + self.federation.send_presence(states) @defer.inlineCallbacks def incoming_presence(self, origin, content): @@ -837,14 +812,13 @@ class PresenceHandler(object): if self.is_mine(user): state = yield self.current_state_for_user(user.to_string()) - hosts = set(get_domain_from_id(u) for u in user_ids) - self._push_to_remotes({host: (state,) for host in hosts}) + self._push_to_remotes([state]) else: user_ids = filter(self.is_mine_id, user_ids) states = yield self.current_state_for_users(user_ids) - self._push_to_remotes({user.domain: states.values()}) + self._push_to_remotes(states.values()) @defer.inlineCallbacks def get_presence_list(self, observer_user, accepted=None): diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 5fd47706ef..4ca1e5aa8c 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -71,6 +71,7 @@ class SlavedEventStore(BaseSlavedStore): # to reach inside the __dict__ to extract them. get_rooms_for_user = RoomMemberStore.__dict__["get_rooms_for_user"] get_users_in_room = RoomMemberStore.__dict__["get_users_in_room"] + get_hosts_in_room = RoomMemberStore.__dict__["get_hosts_in_room"] get_users_who_share_room_with_user = ( RoomMemberStore.__dict__["get_users_who_share_room_with_user"] ) -- cgit 1.4.1 From 9c712a366fbbba90dbe18f7246c279d3cbb1cf10 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 11 Apr 2017 16:07:33 +0100 Subject: Move get_presence_list_* to SlaveStore --- synapse/app/federation_sender.py | 15 ++------------- synapse/app/synchrotron.py | 12 +----------- synapse/replication/slave/storage/presence.py | 10 ++++++++++ 3 files changed, 13 insertions(+), 24 deletions(-) (limited to 'synapse/replication/slave') diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 49efb602bc..e51a69074d 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -28,11 +28,11 @@ from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore +from synapse.replication.slave.storage.presence import SlavedPresenceStore from synapse.replication.slave.storage.transactions import TransactionStore from synapse.replication.slave.storage.devices import SlavedDeviceStore from synapse.replication.tcp.client import ReplicationClientHandler from synapse.storage.engines import create_engine -from synapse.storage.presence import PresenceStore from synapse.util.async import Linearizer from synapse.util.httpresourcetree import create_resource_tree from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn @@ -56,7 +56,7 @@ logger = logging.getLogger("synapse.app.appservice") class FederationSenderSlaveStore( SlavedDeviceInboxStore, TransactionStore, SlavedReceiptsStore, SlavedEventStore, - SlavedRegistrationStore, SlavedDeviceStore, + SlavedRegistrationStore, SlavedDeviceStore, SlavedPresenceStore, ): def __init__(self, db_conn, hs): super(FederationSenderSlaveStore, self).__init__(db_conn, hs) @@ -81,17 +81,6 @@ class FederationSenderSlaveStore( return rows[0][0] if rows else -1 - # XXX: This is a bit broken because we don't persist the accepted list in a - # way that can be replicated. This means that we don't have a way to - # invalidate the cache correctly. - # This is fine since in practice nobody uses the presence list stuff... - get_presence_list_accepted = PresenceStore.__dict__[ - "get_presence_list_accepted" - ] - get_presence_list_observers_accepted = PresenceStore.__dict__[ - "get_presence_list_observers_accepted" - ] - class FederationSenderServer(HomeServer): def get_db_conn(self, run_new_connection=True): diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index e3fbf02c9c..13c00ef2ba 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -44,7 +44,7 @@ from synapse.replication.tcp.client import ReplicationClientHandler from synapse.server import HomeServer from synapse.storage.client_ips import ClientIpStore from synapse.storage.engines import create_engine -from synapse.storage.presence import PresenceStore, UserPresenceState +from synapse.storage.presence import UserPresenceState from synapse.storage.roommember import RoomMemberStore from synapse.util.httpresourcetree import create_resource_tree from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn @@ -89,16 +89,6 @@ class SynchrotronSlavedStore( RoomMemberStore.__dict__["did_forget"] ) - # XXX: This is a bit broken because we don't persist the accepted list in a - # way that can be replicated. This means that we don't have a way to - # invalidate the cache correctly. - get_presence_list_accepted = PresenceStore.__dict__[ - "get_presence_list_accepted" - ] - get_presence_list_observers_accepted = PresenceStore.__dict__[ - "get_presence_list_observers_accepted" - ] - UPDATE_SYNCING_USERS_MS = 10 * 1000 diff --git a/synapse/replication/slave/storage/presence.py b/synapse/replication/slave/storage/presence.py index dffc80adc3..cfb9280181 100644 --- a/synapse/replication/slave/storage/presence.py +++ b/synapse/replication/slave/storage/presence.py @@ -39,6 +39,16 @@ class SlavedPresenceStore(BaseSlavedStore): _get_presence_for_user = PresenceStore.__dict__["_get_presence_for_user"] get_presence_for_users = PresenceStore.__dict__["get_presence_for_users"] + # XXX: This is a bit broken because we don't persist the accepted list in a + # way that can be replicated. This means that we don't have a way to + # invalidate the cache correctly. + get_presence_list_accepted = PresenceStore.__dict__[ + "get_presence_list_accepted" + ] + get_presence_list_observers_accepted = PresenceStore.__dict__[ + "get_presence_list_observers_accepted" + ] + def get_current_presence_token(self): return self._presence_id_gen.get_current_token() -- cgit 1.4.1 From e4f34311166cc93ad26fc12969867e1db30b4a30 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 24 Apr 2017 13:27:38 +0100 Subject: Remove unused cache --- synapse/replication/slave/storage/events.py | 3 --- synapse/storage/state.py | 7 +------ 2 files changed, 1 insertion(+), 9 deletions(-) (limited to 'synapse/replication/slave') diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 4ca1e5aa8c..ab48ff925e 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -102,9 +102,6 @@ class SlavedEventStore(BaseSlavedStore): _get_state_groups_from_groups_txn = ( DataStore._get_state_groups_from_groups_txn.__func__ ) - _get_state_group_from_group = ( - StateStore.__dict__["_get_state_group_from_group"] - ) get_recent_event_ids_for_room = ( StreamStore.__dict__["get_recent_event_ids_for_room"] ) diff --git a/synapse/storage/state.py b/synapse/storage/state.py index acd69944c4..927a936013 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -279,12 +279,7 @@ class StateStore(SQLBaseStore): return count - @cached(num_args=2, max_entries=100000, iterable=True) - def _get_state_group_from_group(self, group, types): - raise NotImplementedError() - - @cachedList(cached_method_name="_get_state_group_from_group", - list_name="groups", num_args=2, inlineCallbacks=True) + @defer.inlineCallbacks def _get_state_groups_from_groups(self, groups, types): """Returns dictionary state_group -> (dict of (type, state_key) -> event id) """ -- cgit 1.4.1 From 9ac263ed1b6f5dee46e85fe42ddfe7f9239e9690 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 4 May 2017 14:29:03 +0100 Subject: Add new storage functions to slave store --- synapse/replication/slave/storage/events.py | 3 +++ 1 file changed, 3 insertions(+) (limited to 'synapse/replication/slave') diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index ab48ff925e..fcaf58b93b 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -144,6 +144,9 @@ class SlavedEventStore(BaseSlavedStore): RoomMemberStore.__dict__["_get_joined_users_from_context"] ) + get_joined_hosts = DataStore.get_joined_hosts.__func__ + _get_joined_hosts = RoomMemberStore.__dict__["_get_joined_hosts"] + get_recent_events_for_room = DataStore.get_recent_events_for_room.__func__ get_room_events_stream_for_rooms = ( DataStore.get_room_events_stream_for_rooms.__func__ -- cgit 1.4.1