diff options
Diffstat (limited to 'synapse/federation')
-rw-r--r-- | synapse/federation/send_queue.py | 69 | ||||
-rw-r--r-- | synapse/federation/transaction_queue.py | 86 |
2 files changed, 110 insertions, 45 deletions
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index b952e59518..93e5acebc1 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -53,18 +53,19 @@ class FederationRemoteSendQueue(object): self.server_name = hs.hostname self.clock = hs.get_clock() self.notifier = hs.get_notifier() + self.is_mine_id = hs.is_mine_id - self.presence_map = {} - self.presence_changed = sorteddict() + self.presence_map = {} # Pending presence map user_id -> UserPresenceState + self.presence_changed = sorteddict() # Stream position -> user_id - self.keyed_edu = {} - self.keyed_edu_changed = sorteddict() + self.keyed_edu = {} # (destination, key) -> EDU + self.keyed_edu_changed = sorteddict() # stream position -> (destination, key) - self.edus = sorteddict() + self.edus = sorteddict() # stream position -> Edu - self.failures = sorteddict() + self.failures = sorteddict() # stream position -> (destination, Failure) - self.device_messages = sorteddict() + self.device_messages = sorteddict() # stream position -> destination self.pos = 1 self.pos_time = sorteddict() @@ -120,7 +121,9 @@ class FederationRemoteSendQueue(object): del self.presence_changed[key] user_ids = set( - user_id for uids in self.presence_changed.values() for _, user_id in uids + user_id + for uids in self.presence_changed.itervalues() + for user_id in uids ) to_del = [ @@ -187,18 +190,20 @@ class FederationRemoteSendQueue(object): self.notifier.on_new_replication_data() - def send_presence(self, destination, states): - """As per TransactionQueue""" + def send_presence(self, states): + """As per TransactionQueue + + Args: + states (list(UserPresenceState)) + """ pos = self._next_pos() - self.presence_map.update({ - state.user_id: state - for state in states - }) + # We only want to send presence for our own users, so lets always just + # filter here just in case. + local_states = filter(lambda s: self.is_mine_id(s.user_id), states) - self.presence_changed[pos] = [ - (destination, state.user_id) for state in states - ] + self.presence_map.update({state.user_id: state for state in local_states}) + self.presence_changed[pos] = [state.user_id for state in local_states] self.notifier.on_new_replication_data() @@ -251,15 +256,14 @@ class FederationRemoteSendQueue(object): keys = self.presence_changed.keys() i = keys.bisect_right(from_token) j = keys.bisect_right(to_token) + 1 - dest_user_ids = set( - (pos, dest_user_id) + dest_user_ids = [ + (pos, user_id) for pos in keys[i:j] - for dest_user_id in self.presence_changed[pos] - ) + for user_id in self.presence_changed[pos] + ] - for (key, (dest, user_id)) in dest_user_ids: + for (key, user_id) in dest_user_ids: rows.append((key, PresenceRow( - destination=dest, state=self.presence_map[user_id], ))) @@ -357,7 +361,6 @@ class BaseFederationRow(object): class PresenceRow(BaseFederationRow, namedtuple("PresenceRow", ( - "destination", # str "state", # UserPresenceState ))): TypeId = "p" @@ -365,18 +368,14 @@ class PresenceRow(BaseFederationRow, namedtuple("PresenceRow", ( @staticmethod def from_data(data): return PresenceRow( - destination=data["destination"], - state=UserPresenceState.from_dict(data["state"]) + state=UserPresenceState.from_dict(data) ) def to_data(self): - return { - "destination": self.destination, - "state": self.state.as_dict() - } + return self.state.as_dict() def add_to_buffer(self, buff): - buff.presence.setdefault(self.destination, []).append(self.state) + buff.presence.append(self.state) class KeyedEduRow(BaseFederationRow, namedtuple("KeyedEduRow", ( @@ -487,7 +486,7 @@ TypeToRow = { ParsedFederationStreamData = namedtuple("ParsedFederationStreamData", ( - "presence", # dict of destination -> [UserPresenceState] + "presence", # list(UserPresenceState) "keyed_edus", # dict of destination -> { key -> Edu } "edus", # dict of destination -> [Edu] "failures", # dict of destination -> [failures] @@ -509,7 +508,7 @@ def process_rows_for_federation(transaction_queue, rows): # them into the appropriate collection and then send them off. buff = ParsedFederationStreamData( - presence={}, + presence=[], keyed_edus={}, edus={}, failures={}, @@ -526,8 +525,8 @@ def process_rows_for_federation(transaction_queue, rows): parsed_row = RowType.from_data(row.data) parsed_row.add_to_buffer(buff) - for destination, states in buff.presence.iteritems(): - transaction_queue.send_presence(destination, states) + if buff.presence: + transaction_queue.send_presence(buff.presence) for destination, edu_map in buff.keyed_edus.iteritems(): for key, edu in edu_map.items(): diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 0e43891647..dee387eb7f 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -21,11 +21,11 @@ from .units import Transaction, Edu from synapse.api.errors import HttpResponseException from synapse.util.async import run_on_reactor -from synapse.util.logcontext import preserve_context_over_fn +from synapse.util.logcontext import preserve_context_over_fn, preserve_fn from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter from synapse.util.metrics import measure_func from synapse.types import get_domain_from_id -from synapse.handlers.presence import format_user_presence_state +from synapse.handlers.presence import format_user_presence_state, get_interested_remotes import synapse.metrics import logging @@ -79,8 +79,18 @@ class TransactionQueue(object): # destination -> list of tuple(edu, deferred) self.pending_edus_by_dest = edus = {} - # Presence needs to be separate as we send single aggragate EDUs + # Map of user_id -> UserPresenceState for all the pending presence + # to be sent out by user_id. Entries here get processed and put in + # pending_presence_by_dest + self.pending_presence = {} + + # Map of destination -> user_id -> UserPresenceState of pending presence + # to be sent to each destinations self.pending_presence_by_dest = presence = {} + + # Pending EDUs by their "key". Keyed EDUs are EDUs that get clobbered + # based on their key (e.g. typing events by room_id) + # Map of destination -> (edu_type, key) -> Edu self.pending_edus_keyed_by_dest = edus_keyed = {} metrics.register_callback( @@ -115,6 +125,8 @@ class TransactionQueue(object): self._is_processing = False self._last_poked_id = -1 + self._processing_pending_presence = False + def can_send_to(self, destination): """Can we send messages to the given server? @@ -226,17 +238,71 @@ class TransactionQueue(object): self._attempt_new_transaction, destination ) - def send_presence(self, destination, states): - if not self.can_send_to(destination): - return + @preserve_fn # the caller should not yield on this + @defer.inlineCallbacks + def send_presence(self, states): + """Send the new presence states to the appropriate destinations. - self.pending_presence_by_dest.setdefault(destination, {}).update({ + This actually queues up the presence states ready for sending and + triggers a background task to process them and send out the transactions. + + Args: + states (list(UserPresenceState)) + """ + + # First we queue up the new presence by user ID, so multiple presence + # updates in quick successtion are correctly handled + # We only want to send presence for our own users, so lets always just + # filter here just in case. + self.pending_presence.update({ state.user_id: state for state in states + if self.is_mine_id(state.user_id) }) - preserve_context_over_fn( - self._attempt_new_transaction, destination - ) + # We then handle the new pending presence in batches, first figuring + # out the destinations we need to send each state to and then poking it + # to attempt a new transaction. We linearize this so that we don't + # accidentally mess up the ordering and send multiple presence updates + # in the wrong order + if self._processing_pending_presence: + return + + self._processing_pending_presence = True + try: + while True: + states_map = self.pending_presence + self.pending_presence = {} + + if not states_map: + break + + yield self._process_presence_inner(states_map.values()) + finally: + self._processing_pending_presence = False + + @measure_func("txnqueue._process_presence") + @defer.inlineCallbacks + def _process_presence_inner(self, states): + """Given a list of states populate self.pending_presence_by_dest and + poke to send a new transaction to each destination + + Args: + states (list(UserPresenceState)) + """ + hosts_and_states = yield get_interested_remotes(self.store, states) + + for destinations, states in hosts_and_states: + for destination in destinations: + if not self.can_send_to(destination): + continue + + self.pending_presence_by_dest.setdefault( + destination, {} + ).update({ + state.user_id: state for state in states + }) + + preserve_fn(self._attempt_new_transaction)(destination) def send_edu(self, destination, edu_type, content, key=None): edu = Edu( |