diff options
author | Erik Johnston <erik@matrix.org> | 2016-09-09 13:46:05 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2016-09-09 13:46:05 +0100 |
commit | a6c67501666c0fefeae8edec2c5f7755a9d24fb8 (patch) | |
tree | 241fb3ff7ea164411d1dcc6f23968cc2d90e2446 | |
parent | Fix tightloop on sending transaction (diff) | |
download | synapse-a6c67501666c0fefeae8edec2c5f7755a9d24fb8.tar.xz |
Check if destination is ready for retry earlier
Diffstat (limited to '')
-rw-r--r-- | synapse/federation/transaction_queue.py | 31 |
1 files changed, 16 insertions, 15 deletions
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index f8d3fffe95..d9b8b3fc1d 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -192,6 +192,12 @@ class TransactionQueue(object): pending_edus = self.pending_edus_by_dest.pop(destination, []) pending_failures = self.pending_failures_by_dest.pop(destination, []) + limiter = yield get_retry_limiter( + destination, + self.clock, + self.store, + ) + device_message_edus, device_stream_id = ( yield self._get_new_device_messages(destination) ) @@ -212,10 +218,18 @@ class TransactionQueue(object): success = yield self._send_new_transaction( destination, pending_pdus, pending_edus, pending_failures, device_stream_id, - should_delete_from_device_stream=bool(device_message_edus) + should_delete_from_device_stream=bool(device_message_edus), + limiter=limiter, ) if not success: break + except NotRetryingDestination: + logger.info( + "TX [%s] not ready for retry yet - " + "dropping transaction for now", + destination, + ) + success = False finally: # We want to be *very* sure we delete this after we stop processing self.pending_transactions.pop(destination, None) @@ -242,7 +256,7 @@ class TransactionQueue(object): @defer.inlineCallbacks def _send_new_transaction(self, destination, pending_pdus, pending_edus, pending_failures, device_stream_id, - should_delete_from_device_stream): + should_delete_from_device_stream, limiter): # Sort based on the order field pending_pdus.sort(key=lambda t: t[1]) @@ -257,12 +271,6 @@ class TransactionQueue(object): txn_id = str(self._next_txn_id) - limiter = yield get_retry_limiter( - destination, - self.clock, - self.store, - ) - logger.debug( "TX [%s] {%s} Attempting new transaction" " (pdus: %d, edus: %d, failures: %d)", @@ -359,13 +367,6 @@ class TransactionQueue(object): destination, device_stream_id ) self.last_device_stream_id_by_dest[destination] = device_stream_id - except NotRetryingDestination: - logger.info( - "TX [%s] not ready for retry yet - " - "dropping transaction for now", - destination, - ) - success = False except RuntimeError as e: # We capture this here as there as nothing actually listens # for this finishing functions deferred. |