diff options
-rw-r--r-- | synapse/federation/sender/per_destination_queue.py | 74 |
1 files changed, 39 insertions, 35 deletions
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index 4d96f026c6..fae8bea392 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -40,8 +40,7 @@ logger = logging.getLogger(__name__) sent_edus_counter = Counter( - "synapse_federation_client_sent_edus", - "Total number of EDUs successfully sent", + "synapse_federation_client_sent_edus", "Total number of EDUs successfully sent" ) sent_edus_by_type = Counter( @@ -61,6 +60,7 @@ class PerDestinationQueue(object): destination (str): the server_name of the destination that we are managing transmission for. """ + def __init__(self, hs, transaction_manager, destination): self._server_name = hs.hostname self._clock = hs.get_clock() @@ -71,17 +71,17 @@ class PerDestinationQueue(object): self.transmission_loop_running = False # a list of tuples of (pending pdu, order) - self._pending_pdus = [] # type: list[tuple[EventBase, int]] - self._pending_edus = [] # type: list[Edu] + self._pending_pdus = [] # type: list[tuple[EventBase, int]] + self._pending_edus = [] # type: list[Edu] # Pending EDUs by their "key". Keyed EDUs are EDUs that get clobbered # based on their key (e.g. typing events by room_id) # Map of (edu_type, key) -> Edu - self._pending_edus_keyed = {} # type: dict[tuple[str, str], Edu] + self._pending_edus_keyed = {} # type: dict[tuple[str, str], Edu] # Map of user_id -> UserPresenceState of pending presence to be sent to this # destination - self._pending_presence = {} # type: dict[str, UserPresenceState] + self._pending_presence = {} # type: dict[str, UserPresenceState] # room_id -> receipt_type -> user_id -> receipt_dict self._pending_rrs = {} @@ -123,9 +123,7 @@ class PerDestinationQueue(object): Args: states (iterable[UserPresenceState]): presence to send """ - self._pending_presence.update({ - state.user_id: state for state in states - }) + self._pending_presence.update({state.user_id: state for state in states}) self.attempt_new_transaction() def queue_read_receipt(self, receipt): @@ -135,14 +133,9 @@ class PerDestinationQueue(object): Args: receipt (synapse.api.receipt_info.ReceiptInfo): receipt to be queued """ - self._pending_rrs.setdefault( - receipt.room_id, {}, - ).setdefault( + self._pending_rrs.setdefault(receipt.room_id, {}).setdefault( receipt.receipt_type, {} - )[receipt.user_id] = { - "event_ids": receipt.event_ids, - "data": receipt.data, - } + )[receipt.user_id] = {"event_ids": receipt.event_ids, "data": receipt.data} def flush_read_receipts_for_room(self, room_id): # if we don't have any read-receipts for this room, it may be that we've already @@ -173,10 +166,7 @@ class PerDestinationQueue(object): # request at which point pending_pdus just keeps growing. # we need application-layer timeouts of some flavour of these # requests - logger.debug( - "TX [%s] Transaction already in progress", - self._destination - ) + logger.debug("TX [%s] Transaction already in progress", self._destination) return logger.debug("TX [%s] Starting transaction loop", self._destination) @@ -243,14 +233,22 @@ class PerDestinationQueue(object): ) pending_edus.extend(device_message_edus) - pending_edus.extend(self._pop_pending_edus(MAX_EDUS_PER_TRANSACTION - len(pending_edus))) - while len(pending_edus) < MAX_EDUS_PER_TRANSACTION and self._pending_edus_keyed: + pending_edus.extend( + self._pop_pending_edus(MAX_EDUS_PER_TRANSACTION - len(pending_edus)) + ) + while ( + len(pending_edus) < MAX_EDUS_PER_TRANSACTION + and self._pending_edus_keyed + ): _, val = self._pending_edus_keyed.popitem() pending_edus.append(val) if pending_pdus: - logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d", - self._destination, len(pending_pdus)) + logger.debug( + "TX [%s] len(pending_pdus_by_dest[dest]) = %d", + self._destination, + len(pending_pdus), + ) if not pending_pdus and not pending_edus: logger.debug("TX [%s] Nothing to send", self._destination) @@ -303,22 +301,25 @@ class PerDestinationQueue(object): except HttpResponseException as e: logger.warning( "TX [%s] Received %d response to transaction: %s", - self._destination, e.code, e, + self._destination, + e.code, + e, ) except RequestSendFailed as e: - logger.warning("TX [%s] Failed to send transaction: %s", self._destination, e) + logger.warning( + "TX [%s] Failed to send transaction: %s", self._destination, e + ) for p, _ in pending_pdus: - logger.info("Failed to send event %s to %s", p.event_id, - self._destination) + logger.info( + "Failed to send event %s to %s", p.event_id, self._destination + ) except Exception: - logger.exception( - "TX [%s] Failed to send transaction", - self._destination, - ) + logger.exception("TX [%s] Failed to send transaction", self._destination) for p, _ in pending_pdus: - logger.info("Failed to send event %s to %s", p.event_id, - self._destination) + logger.info( + "Failed to send event %s to %s", p.event_id, self._destination + ) finally: # We want to be *very* sure we clear this after we stop processing self.transmission_loop_running = False @@ -367,7 +368,10 @@ class PerDestinationQueue(object): last_device_stream_id = self._last_device_stream_id to_device_stream_id = self._store.get_to_device_stream_token() contents, stream_id = yield self._store.get_new_device_msgs_for_remote( - self._destination, last_device_stream_id, to_device_stream_id, limit - len(edus) + self._destination, + last_device_stream_id, + to_device_stream_id, + limit - len(edus), ) edus.extend( Edu( |