summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/federation/replication.py52
-rw-r--r--synapse/http/matrixfederationclient.py29
-rw-r--r--synapse/storage/transactions.py50
3 files changed, 85 insertions, 46 deletions
diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py
index 589a3f581b..0cb632fb08 100644
--- a/synapse/federation/replication.py
+++ b/synapse/federation/replication.py
@@ -728,7 +728,7 @@ class _TransactionQueue(object):
             self.pending_pdus_by_dest.setdefault(destination, []).append(
                 (pdu, deferred, order)
             )
-            
+
             def eb(failure):
                 if not deferred.called:
                     deferred.errback(failure)
@@ -745,7 +745,7 @@ class _TransactionQueue(object):
     # NO inlineCallbacks
     def enqueue_edu(self, edu):
         destination = edu.destination
-        
+
         if destination == self.server_name:
             return
 
@@ -776,7 +776,7 @@ class _TransactionQueue(object):
         )
 
         yield deferred
-        
+
     @defer.inlineCallbacks
     @log_function
     def _attempt_new_transaction(self, destination):
@@ -790,12 +790,15 @@ class _TransactionQueue(object):
                 retry_timings.retry_last_ts, retry_timings.retry_interval
             )
             if retry_last_ts + retry_interval > int(self._clock.time_msec()):
-                logger.info("TX [%s] not ready for retry yet - "
-                            "dropping transaction for now", destination)
+                logger.info(
+                    "TX [%s] not ready for retry yet - "
+                    "dropping transaction for now",
+                    destination,
+                )
                 return
             else:
                 logger.info("TX [%s] is ready for retry", destination)
-        
+
         if destination in self.pending_transactions:
             # XXX: pending_transactions can get stuck on by a never-ending
             # request at which point pending_pdus_by_dest just keeps growing.
@@ -811,10 +814,14 @@ class _TransactionQueue(object):
         if not pending_pdus and not pending_edus and not pending_failures:
             return
 
-        logger.debug("TX [%s] Attempting new transaction "
-                    "(pdus: %d, edus: %d, failures: %d)",
+        logger.debug(
+            "TX [%s] Attempting new transaction "
+            "(pdus: %d, edus: %d, failures: %d)",
             destination,
-            len(pending_pdus), len(pending_edus), len(pending_failures))
+            len(pending_pdus),
+            len(pending_edus),
+            len(pending_failures)
+        )
 
         # Sort based on the order field
         pending_pdus.sort(key=lambda t: t[2])
@@ -847,8 +854,11 @@ class _TransactionQueue(object):
             yield self.transaction_actions.prepare_to_send(transaction)
 
             logger.debug("TX [%s] Persisted transaction", destination)
-            logger.info("TX [%s] Sending transaction [%s]", destination,
-                        transaction.transaction_id)
+            logger.info(
+                "TX [%s] Sending transaction [%s]",
+                destination,
+                transaction.transaction_id,
+            )
 
             # Actually send the transaction
 
@@ -905,11 +915,14 @@ class _TransactionQueue(object):
         except Exception as e:
             # We capture this here as there as nothing actually listens
             # for this finishing functions deferred.
-            logger.warn("TX [%s] Problem in _attempt_transaction: %s",
-                             destination, e)
+            logger.warn(
+                "TX [%s] Problem in _attempt_transaction: %s",
+                destination,
+                e,
+            )
 
             self.set_retrying(destination, retry_interval)
-            
+
             for deferred in deferreds:
                 if not deferred.called:
                     deferred.errback(e)
@@ -925,12 +938,17 @@ class _TransactionQueue(object):
     def set_retrying(self, destination, retry_interval):
         # track that this destination is having problems and we should
         # give it a chance to recover before trying it again
+
         if retry_interval:
             retry_interval *= 2
             # plateau at hourly retries for now
             if retry_interval >= 60 * 60 * 1000:
                 retry_interval = 60 * 60 * 1000
         else:
-            retry_interval = 2000 # try again at first after 2 seconds
-        yield self.store.set_destination_retry_timings(destination,
-            int(self._clock.time_msec()), retry_interval)
+            retry_interval = 2000  # try again at first after 2 seconds
+
+        yield self.store.set_destination_retry_timings(
+            destination,
+            int(self._clock.time_msec()),
+            retry_interval
+        )
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 8fc6bf8f97..16fb2adab5 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -130,12 +130,20 @@ class MatrixFederationHttpClient(object):
                 break
             except Exception as e:
                 if not retry_on_dns_fail and isinstance(e, DNSLookupError):
-                    logger.warn("DNS Lookup failed to %s with %s", destination,
-                                e)
+                    logger.warn(
+                        "DNS Lookup failed to %s with %s",
+                        destination,
+                        e
+                    )
                     raise SynapseError(400, "Domain specified not found.")
 
-                logger.warn("Sending request failed to %s: %s %s : %s",
-                                 destination, method, url_bytes, e)
+                logger.warn(
+                    "Sending request failed to %s: %s %s : %s",
+                    destination,
+                    method,
+                    url_bytes,
+                    e
+                )
                 _print_ex(e)
 
                 if retries_left:
@@ -144,10 +152,15 @@ class MatrixFederationHttpClient(object):
                 else:
                     raise
 
-        logger.info("Received response %d %s for %s: %s %s",
-                    response.code, response.phrase,
-                    destination, method, url_bytes)
-        
+        logger.info(
+            "Received response %d %s for %s: %s %s",
+            response.code,
+            response.phrase,
+            destination,
+            method,
+            url_bytes
+        )
+
         if 200 <= response.code < 300:
             # We need to update the transactions table to say it was sent?
             pass
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index 2b16787695..423cc3f02a 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -27,7 +27,7 @@ logger = logging.getLogger(__name__)
 class TransactionStore(SQLBaseStore):
     """A collection of queries for handling PDUs.
     """
-    
+
     # a write-through cache of DestinationsTable.EntryType indexed by
     # destination string
     destination_retry_cache = {}
@@ -213,21 +213,21 @@ class TransactionStore(SQLBaseStore):
 
     def get_destination_retry_timings(self, destination):
         """Gets the current retry timings (if any) for a given destination.
-        
+
         Args:
             destination (str)
-        
+
         Returns:
             None if not retrying
             Otherwise a DestinationsTable.EntryType for the retry scheme
         """
         if destination in self.destination_retry_cache:
             return defer.succeed(self.destination_retry_cache[destination])
-        
+
         return self.runInteraction(
             "get_destination_retry_timings",
             self._get_destination_retry_timings, destination)
-            
+
     def _get_destination_retry_timings(cls, txn, destination):
         query = DestinationsTable.select_statement("destination = ?")
         txn.execute(query, (destination,))
@@ -238,30 +238,36 @@ class TransactionStore(SQLBaseStore):
                 return result
             else:
                 return None
-        
+
     def set_destination_retry_timings(self, destination,
                                       retry_last_ts, retry_interval):
         """Sets the current retry timings for a given destination.
         Both timings should be zero if retrying is no longer occuring.
-        
+
         Args:
             destination (str)
             retry_last_ts (int) - time of last retry attempt in unix epoch ms
             retry_interval (int) - how long until next retry in ms
         """
-        
+
         self.destination_retry_cache[destination] = (
-            DestinationsTable.EntryType(destination,
-                                        retry_last_ts, retry_interval)
+            DestinationsTable.EntryType(
+                destination,
+                retry_last_ts,
+                retry_interval
+            )
         )
-        
+
         # XXX: we could chose to not bother persisting this if our cache thinks
         # this is a NOOP
         return self.runInteraction(
             "set_destination_retry_timings",
-            self._set_destination_retry_timings, destination, 
-            retry_last_ts, retry_interval)
-            
+            self._set_destination_retry_timings,
+            destination,
+            retry_last_ts,
+            retry_interval,
+        )
+
     def _set_destination_retry_timings(cls, txn, destination,
                                        retry_last_ts, retry_interval):
 
@@ -275,21 +281,22 @@ class TransactionStore(SQLBaseStore):
 
     def get_destinations_needing_retry(self):
         """Get all destinations which are due a retry for sending a transaction.
-                    
+
         Returns:
             list: A list of `DestinationsTable.EntryType`
         """
-                
+
         return self.runInteraction(
             "get_destinations_needing_retry",
             self._get_destinations_needing_retry
         )
-        
+
     def _get_destinations_needing_retry(cls, txn):
         where = "retry_last_ts > 0 and retry_next_ts < now()"
         query = DestinationsTable.select_statement(where)
         txn.execute(query)
-        return DestinationsTable.decode_results(txn.fetchall())    
+        return DestinationsTable.decode_results(txn.fetchall())
+
 
 class ReceivedTransactionsTable(Table):
     table_name = "received_transactions"
@@ -332,14 +339,15 @@ class TransactionsToPduTable(Table):
     ]
 
     EntryType = namedtuple("TransactionsToPduEntry", fields)
-    
+
+
 class DestinationsTable(Table):
     table_name = "destinations"
-    
+
     fields = [
         "destination",
         "retry_last_ts",
         "retry_interval",
     ]
 
-    EntryType = namedtuple("DestinationsEntry", fields)
\ No newline at end of file
+    EntryType = namedtuple("DestinationsEntry", fields)