summary refs log tree commit diff
path: root/synapse/federation/replication.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation/replication.py')
-rw-r--r--synapse/federation/replication.py52
1 files changed, 35 insertions, 17 deletions
diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py
index 589a3f581b..0cb632fb08 100644
--- a/synapse/federation/replication.py
+++ b/synapse/federation/replication.py
@@ -728,7 +728,7 @@ class _TransactionQueue(object):
             self.pending_pdus_by_dest.setdefault(destination, []).append(
                 (pdu, deferred, order)
             )
-            
+
             def eb(failure):
                 if not deferred.called:
                     deferred.errback(failure)
@@ -745,7 +745,7 @@ class _TransactionQueue(object):
     # NO inlineCallbacks
     def enqueue_edu(self, edu):
         destination = edu.destination
-        
+
         if destination == self.server_name:
             return
 
@@ -776,7 +776,7 @@ class _TransactionQueue(object):
         )
 
         yield deferred
-        
+
     @defer.inlineCallbacks
     @log_function
     def _attempt_new_transaction(self, destination):
@@ -790,12 +790,15 @@ class _TransactionQueue(object):
                 retry_timings.retry_last_ts, retry_timings.retry_interval
             )
             if retry_last_ts + retry_interval > int(self._clock.time_msec()):
-                logger.info("TX [%s] not ready for retry yet - "
-                            "dropping transaction for now", destination)
+                logger.info(
+                    "TX [%s] not ready for retry yet - "
+                    "dropping transaction for now",
+                    destination,
+                )
                 return
             else:
                 logger.info("TX [%s] is ready for retry", destination)
-        
+
         if destination in self.pending_transactions:
             # XXX: pending_transactions can get stuck on by a never-ending
             # request at which point pending_pdus_by_dest just keeps growing.
@@ -811,10 +814,14 @@ class _TransactionQueue(object):
         if not pending_pdus and not pending_edus and not pending_failures:
             return
 
-        logger.debug("TX [%s] Attempting new transaction "
-                    "(pdus: %d, edus: %d, failures: %d)",
+        logger.debug(
+            "TX [%s] Attempting new transaction "
+            "(pdus: %d, edus: %d, failures: %d)",
             destination,
-            len(pending_pdus), len(pending_edus), len(pending_failures))
+            len(pending_pdus),
+            len(pending_edus),
+            len(pending_failures)
+        )
 
         # Sort based on the order field
         pending_pdus.sort(key=lambda t: t[2])
@@ -847,8 +854,11 @@ class _TransactionQueue(object):
             yield self.transaction_actions.prepare_to_send(transaction)
 
             logger.debug("TX [%s] Persisted transaction", destination)
-            logger.info("TX [%s] Sending transaction [%s]", destination,
-                        transaction.transaction_id)
+            logger.info(
+                "TX [%s] Sending transaction [%s]",
+                destination,
+                transaction.transaction_id,
+            )
 
             # Actually send the transaction
 
@@ -905,11 +915,14 @@ class _TransactionQueue(object):
         except Exception as e:
             # We capture this here as there as nothing actually listens
             # for this finishing functions deferred.
-            logger.warn("TX [%s] Problem in _attempt_transaction: %s",
-                             destination, e)
+            logger.warn(
+                "TX [%s] Problem in _attempt_transaction: %s",
+                destination,
+                e,
+            )
 
             self.set_retrying(destination, retry_interval)
-            
+
             for deferred in deferreds:
                 if not deferred.called:
                     deferred.errback(e)
@@ -925,12 +938,17 @@ class _TransactionQueue(object):
     def set_retrying(self, destination, retry_interval):
         # track that this destination is having problems and we should
         # give it a chance to recover before trying it again
+
         if retry_interval:
             retry_interval *= 2
             # plateau at hourly retries for now
             if retry_interval >= 60 * 60 * 1000:
                 retry_interval = 60 * 60 * 1000
         else:
-            retry_interval = 2000 # try again at first after 2 seconds
-        yield self.store.set_destination_retry_timings(destination,
-            int(self._clock.time_msec()), retry_interval)
+            retry_interval = 2000  # try again at first after 2 seconds
+
+        yield self.store.set_destination_retry_timings(
+            destination,
+            int(self._clock.time_msec()),
+            retry_interval
+        )