diff options
author | Erik Johnston <erikj@jki.re> | 2016-09-09 13:49:56 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2016-09-09 13:49:56 +0100 |
commit | 685da5a3b0a24516922e149b17c7a7ab98356273 (patch) | |
tree | e3638b1c7c6fd4a4d9ee0cc0483aa2330408937c | |
parent | Merge pull request #1089 from matrix-org/markjh/direct_to_device_stream (diff) | |
parent | Check if destination is ready for retry earlier (diff) | |
download | synapse-685da5a3b0a24516922e149b17c7a7ab98356273.tar.xz |
Merge pull request #1092 from matrix-org/erikj/transaction_queue_check
Check if destination is ready for retry earlier
-rw-r--r-- | synapse/federation/transaction_queue.py | 31 |
1 files changed, 16 insertions, 15 deletions
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index f8d3fffe95..d9b8b3fc1d 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -192,6 +192,12 @@ class TransactionQueue(object): pending_edus = self.pending_edus_by_dest.pop(destination, []) pending_failures = self.pending_failures_by_dest.pop(destination, []) + limiter = yield get_retry_limiter( + destination, + self.clock, + self.store, + ) + device_message_edus, device_stream_id = ( yield self._get_new_device_messages(destination) ) @@ -212,10 +218,18 @@ class TransactionQueue(object): success = yield self._send_new_transaction( destination, pending_pdus, pending_edus, pending_failures, device_stream_id, - should_delete_from_device_stream=bool(device_message_edus) + should_delete_from_device_stream=bool(device_message_edus), + limiter=limiter, ) if not success: break + except NotRetryingDestination: + logger.info( + "TX [%s] not ready for retry yet - " + "dropping transaction for now", + destination, + ) + success = False finally: # We want to be *very* sure we delete this after we stop processing self.pending_transactions.pop(destination, None) @@ -242,7 +256,7 @@ class TransactionQueue(object): @defer.inlineCallbacks def _send_new_transaction(self, destination, pending_pdus, pending_edus, pending_failures, device_stream_id, - should_delete_from_device_stream): + should_delete_from_device_stream, limiter): # Sort based on the order field pending_pdus.sort(key=lambda t: t[1]) @@ -257,12 +271,6 @@ class TransactionQueue(object): txn_id = str(self._next_txn_id) - limiter = yield get_retry_limiter( - destination, - self.clock, - self.store, - ) - logger.debug( "TX [%s] {%s} Attempting new transaction" " (pdus: %d, edus: %d, failures: %d)", @@ -359,13 +367,6 @@ class TransactionQueue(object): destination, device_stream_id ) self.last_device_stream_id_by_dest[destination] = device_stream_id - except NotRetryingDestination: - logger.info( - "TX [%s] not ready for retry yet - " - "dropping transaction for now", - destination, - ) - success = False except RuntimeError as e: # We capture this here as there as nothing actually listens # for this finishing functions deferred. |