diff options
Diffstat (limited to 'synapse/federation')
-rw-r--r-- | synapse/federation/replication.py | 52 |
1 files changed, 35 insertions, 17 deletions
diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index 589a3f581b..0cb632fb08 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -728,7 +728,7 @@ class _TransactionQueue(object): self.pending_pdus_by_dest.setdefault(destination, []).append( (pdu, deferred, order) ) - + def eb(failure): if not deferred.called: deferred.errback(failure) @@ -745,7 +745,7 @@ class _TransactionQueue(object): # NO inlineCallbacks def enqueue_edu(self, edu): destination = edu.destination - + if destination == self.server_name: return @@ -776,7 +776,7 @@ class _TransactionQueue(object): ) yield deferred - + @defer.inlineCallbacks @log_function def _attempt_new_transaction(self, destination): @@ -790,12 +790,15 @@ class _TransactionQueue(object): retry_timings.retry_last_ts, retry_timings.retry_interval ) if retry_last_ts + retry_interval > int(self._clock.time_msec()): - logger.info("TX [%s] not ready for retry yet - " - "dropping transaction for now", destination) + logger.info( + "TX [%s] not ready for retry yet - " + "dropping transaction for now", + destination, + ) return else: logger.info("TX [%s] is ready for retry", destination) - + if destination in self.pending_transactions: # XXX: pending_transactions can get stuck on by a never-ending # request at which point pending_pdus_by_dest just keeps growing. @@ -811,10 +814,14 @@ class _TransactionQueue(object): if not pending_pdus and not pending_edus and not pending_failures: return - logger.debug("TX [%s] Attempting new transaction " - "(pdus: %d, edus: %d, failures: %d)", + logger.debug( + "TX [%s] Attempting new transaction " + "(pdus: %d, edus: %d, failures: %d)", destination, - len(pending_pdus), len(pending_edus), len(pending_failures)) + len(pending_pdus), + len(pending_edus), + len(pending_failures) + ) # Sort based on the order field pending_pdus.sort(key=lambda t: t[2]) @@ -847,8 +854,11 @@ class _TransactionQueue(object): yield self.transaction_actions.prepare_to_send(transaction) logger.debug("TX [%s] Persisted transaction", destination) - logger.info("TX [%s] Sending transaction [%s]", destination, - transaction.transaction_id) + logger.info( + "TX [%s] Sending transaction [%s]", + destination, + transaction.transaction_id, + ) # Actually send the transaction @@ -905,11 +915,14 @@ class _TransactionQueue(object): except Exception as e: # We capture this here as there as nothing actually listens # for this finishing functions deferred. - logger.warn("TX [%s] Problem in _attempt_transaction: %s", - destination, e) + logger.warn( + "TX [%s] Problem in _attempt_transaction: %s", + destination, + e, + ) self.set_retrying(destination, retry_interval) - + for deferred in deferreds: if not deferred.called: deferred.errback(e) @@ -925,12 +938,17 @@ class _TransactionQueue(object): def set_retrying(self, destination, retry_interval): # track that this destination is having problems and we should # give it a chance to recover before trying it again + if retry_interval: retry_interval *= 2 # plateau at hourly retries for now if retry_interval >= 60 * 60 * 1000: retry_interval = 60 * 60 * 1000 else: - retry_interval = 2000 # try again at first after 2 seconds - yield self.store.set_destination_retry_timings(destination, - int(self._clock.time_msec()), retry_interval) + retry_interval = 2000 # try again at first after 2 seconds + + yield self.store.set_destination_retry_timings( + destination, + int(self._clock.time_msec()), + retry_interval + ) |