summary refs log tree commit diff
path: root/synapse/federation
diff options
context:
space:
mode:
authorErik Johnston <erikj@jki.re>2016-09-09 13:49:56 +0100
committerGitHub <noreply@github.com>2016-09-09 13:49:56 +0100
commit685da5a3b0a24516922e149b17c7a7ab98356273 (patch)
treee3638b1c7c6fd4a4d9ee0cc0483aa2330408937c /synapse/federation
parentMerge pull request #1089 from matrix-org/markjh/direct_to_device_stream (diff)
parentCheck if destination is ready for retry earlier (diff)
downloadsynapse-685da5a3b0a24516922e149b17c7a7ab98356273.tar.xz
Merge pull request #1092 from matrix-org/erikj/transaction_queue_check
Check if destination is ready for retry earlier
Diffstat (limited to 'synapse/federation')
-rw-r--r--synapse/federation/transaction_queue.py31
1 files changed, 16 insertions, 15 deletions
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index f8d3fffe95..d9b8b3fc1d 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -192,6 +192,12 @@ class TransactionQueue(object):
                     pending_edus = self.pending_edus_by_dest.pop(destination, [])
                     pending_failures = self.pending_failures_by_dest.pop(destination, [])
 
+                    limiter = yield get_retry_limiter(
+                        destination,
+                        self.clock,
+                        self.store,
+                    )
+
                     device_message_edus, device_stream_id = (
                         yield self._get_new_device_messages(destination)
                     )
@@ -212,10 +218,18 @@ class TransactionQueue(object):
                     success = yield self._send_new_transaction(
                         destination, pending_pdus, pending_edus, pending_failures,
                         device_stream_id,
-                        should_delete_from_device_stream=bool(device_message_edus)
+                        should_delete_from_device_stream=bool(device_message_edus),
+                        limiter=limiter,
                     )
                     if not success:
                         break
+        except NotRetryingDestination:
+            logger.info(
+                "TX [%s] not ready for retry yet - "
+                "dropping transaction for now",
+                destination,
+            )
+            success = False
         finally:
             # We want to be *very* sure we delete this after we stop processing
             self.pending_transactions.pop(destination, None)
@@ -242,7 +256,7 @@ class TransactionQueue(object):
     @defer.inlineCallbacks
     def _send_new_transaction(self, destination, pending_pdus, pending_edus,
                               pending_failures, device_stream_id,
-                              should_delete_from_device_stream):
+                              should_delete_from_device_stream, limiter):
 
         # Sort based on the order field
         pending_pdus.sort(key=lambda t: t[1])
@@ -257,12 +271,6 @@ class TransactionQueue(object):
 
             txn_id = str(self._next_txn_id)
 
-            limiter = yield get_retry_limiter(
-                destination,
-                self.clock,
-                self.store,
-            )
-
             logger.debug(
                 "TX [%s] {%s} Attempting new transaction"
                 " (pdus: %d, edus: %d, failures: %d)",
@@ -359,13 +367,6 @@ class TransactionQueue(object):
                         destination, device_stream_id
                     )
                 self.last_device_stream_id_by_dest[destination] = device_stream_id
-        except NotRetryingDestination:
-            logger.info(
-                "TX [%s] not ready for retry yet - "
-                "dropping transaction for now",
-                destination,
-            )
-            success = False
         except RuntimeError as e:
             # We capture this here as there as nothing actually listens
             # for this finishing functions deferred.