summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/federation/replication.py30
-rw-r--r--synapse/http/matrixfederationclient.py7
-rw-r--r--synapse/storage/transactions.py18
3 files changed, 36 insertions, 19 deletions
diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py
index c4c6667b62..c242488483 100644
--- a/synapse/federation/replication.py
+++ b/synapse/federation/replication.py
@@ -778,21 +778,25 @@ class _TransactionQueue(object):
     def _attempt_new_transaction(self, destination):
 
         (retry_last_ts, retry_interval) = (0, 0)
-        retry_timings = yield self.store.get_destination_retry_timings(destination)
+        retry_timings = yield self.store.get_destination_retry_timings(
+            destination
+        )
         if retry_timings:
             (retry_last_ts, retry_interval) = (
                 retry_timings.retry_last_ts, retry_timings.retry_interval
             )
             if retry_last_ts + retry_interval > int(self._clock.time_msec()):
-                logger.info("TX [%s] not ready for retry yet - dropping transaction for now", destination)
+                logger.info("TX [%s] not ready for retry yet - "
+                            "dropping transaction for now", destination)
                 return
             else:
                 logger.info("TX [%s] is ready for retry", destination)
         
         if destination in self.pending_transactions:
-            # XXX: pending_transactions can get stuck on by a never-ending request
-            # at which point pending_pdus_by_dest just keeps growing.
-            # we need application-layer timeouts of some flavour of these requests
+            # XXX: pending_transactions can get stuck on by a never-ending
+            # request at which point pending_pdus_by_dest just keeps growing.
+            # we need application-layer timeouts of some flavour of these
+            # requests
             return
 
         # list of (pending_pdu, deferred, order)
@@ -803,8 +807,10 @@ class _TransactionQueue(object):
         if not pending_pdus and not pending_edus and not pending_failures:
             return
 
-        logger.debug("TX [%s] Attempting new transaction (pdus: %d, edus: %d, failures: %d)",
-            destination, len(pending_pdus), len(pending_edus), len(pending_failures))
+        logger.debug("TX [%s] Attempting new transaction "
+                    "(pdus: %d, edus: %d, failures: %d)",
+            destination,
+            len(pending_pdus), len(pending_edus), len(pending_failures))
 
         # Sort based on the order field
         pending_pdus.sort(key=lambda t: t[2])
@@ -837,7 +843,8 @@ class _TransactionQueue(object):
             yield self.transaction_actions.prepare_to_send(transaction)
 
             logger.debug("TX [%s] Persisted transaction", destination)
-            logger.info("TX [%s] Sending transaction [%s]", destination, transaction.transaction_id)
+            logger.info("TX [%s] Sending transaction [%s]", destination,
+                        transaction.transaction_id)
 
             # Actually send the transaction
 
@@ -874,7 +881,9 @@ class _TransactionQueue(object):
                 if code == 200:
                     if retry_last_ts:
                         # this host is alive! reset retry schedule
-                        yield self.store.set_destination_retry_timings(destination, 0, 0)
+                        yield self.store.set_destination_retry_timings(
+                            destination, 0, 0
+                        )
                     deferred.callback(None)
                 else:
                     self.start_retrying(destination, retry_interval)
@@ -892,7 +901,8 @@ class _TransactionQueue(object):
         except Exception as e:
             # We capture this here as there as nothing actually listens
             # for this finishing functions deferred.
-            logger.exception("TX [%s] Problem in _attempt_transaction: %s", destination, e)
+            logger.exception("TX [%s] Problem in _attempt_transaction: %s",
+                             destination, e)
 
             self.start_retrying(destination, retry_interval)
             
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 3edc59dbab..c76990904d 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -90,7 +90,7 @@ class MatrixFederationHttpClient(object):
         )
 
         logger.info("Sending request to %s: %s %s",
-                     destination, method, url_bytes)
+                    destination, method, url_bytes)
 
         logger.debug(
             "Types: %s",
@@ -135,7 +135,7 @@ class MatrixFederationHttpClient(object):
                     raise SynapseError(400, "Domain specified not found.")
 
                 logger.exception("Sending request failed to %s: %s %s : %s",
-                    destination, method, url_bytes, e)
+                                 destination, method, url_bytes, e)
                 _print_ex(e)
 
                 if retries_left:
@@ -145,7 +145,8 @@ class MatrixFederationHttpClient(object):
                     raise
 
         logger.info("Received response %d %s for %s: %s %s",
-            response.code, response.phrase, destination, method, url_bytes)
+                    response.code, response.phrase,
+                    destination, method, url_bytes)
         
         if 200 <= response.code < 300:
             # We need to update the transactions table to say it was sent?
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index 237b024451..2b16787695 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -28,7 +28,8 @@ class TransactionStore(SQLBaseStore):
     """A collection of queries for handling PDUs.
     """
     
-    # a write-through cache of DestinationsTable.EntryType indexed by destination string
+    # a write-through cache of DestinationsTable.EntryType indexed by
+    # destination string
     destination_retry_cache = {}
 
     def get_received_txn_response(self, transaction_id, origin):
@@ -238,7 +239,8 @@ class TransactionStore(SQLBaseStore):
             else:
                 return None
         
-    def set_destination_retry_timings(self, destination, retry_last_ts, retry_interval):
+    def set_destination_retry_timings(self, destination,
+                                      retry_last_ts, retry_interval):
         """Sets the current retry timings for a given destination.
         Both timings should be zero if retrying is no longer occuring.
         
@@ -249,15 +251,19 @@ class TransactionStore(SQLBaseStore):
         """
         
         self.destination_retry_cache[destination] = (
-            DestinationsTable.EntryType(destination, retry_last_ts, retry_interval)
+            DestinationsTable.EntryType(destination,
+                                        retry_last_ts, retry_interval)
         )
         
-        # xxx: we could chose to not bother persisting this if our cache things this is a NOOP
+        # XXX: we could chose to not bother persisting this if our cache thinks
+        # this is a NOOP
         return self.runInteraction(
             "set_destination_retry_timings",
-            self._set_destination_retry_timings, destination, retry_last_ts, retry_interval)
+            self._set_destination_retry_timings, destination, 
+            retry_last_ts, retry_interval)
             
-    def _set_destination_retry_timings(cls, txn, destination, retry_last_ts, retry_interval):
+    def _set_destination_retry_timings(cls, txn, destination,
+                                       retry_last_ts, retry_interval):
 
         query = (
             "INSERT OR REPLACE INTO %s "