diff options
Diffstat (limited to '')
-rw-r--r-- | synapse/federation/transaction_queue.py | 47 |
1 files changed, 24 insertions, 23 deletions
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 32fa5e8c15..2a7dd343f3 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -202,19 +202,6 @@ class TransactionQueue(object): @defer.inlineCallbacks @log_function def _attempt_new_transaction(self, destination): - if destination in self.pending_transactions: - # XXX: pending_transactions can get stuck on by a never-ending - # request at which point pending_pdus_by_dest just keeps growing. - # we need application-layer timeouts of some flavour of these - # requests - logger.debug( - "TX [%s] Transaction already in progress", - destination - ) - return - - logger.debug("TX [%s] _attempt_new_transaction", destination) - # list of (pending_pdu, deferred, order) pending_pdus = self.pending_pdus_by_dest.pop(destination, []) pending_edus = self.pending_edus_by_dest.pop(destination, []) @@ -228,20 +215,34 @@ class TransactionQueue(object): logger.debug("TX [%s] Nothing to send", destination) return - # Sort based on the order field - pending_pdus.sort(key=lambda t: t[2]) - - pdus = [x[0] for x in pending_pdus] - edus = [x[0] for x in pending_edus] - failures = [x[0].get_dict() for x in pending_failures] - deferreds = [ - x[1] - for x in pending_pdus + pending_edus + pending_failures - ] + if destination in self.pending_transactions: + # XXX: pending_transactions can get stuck on by a never-ending + # request at which point pending_pdus_by_dest just keeps growing. + # we need application-layer timeouts of some flavour of these + # requests + logger.debug( + "TX [%s] Transaction already in progress", + destination + ) + return + # NOTE: Nothing should be between the above check and the insertion below try: self.pending_transactions[destination] = 1 + logger.debug("TX [%s] _attempt_new_transaction", destination) + + # Sort based on the order field + pending_pdus.sort(key=lambda t: t[2]) + + pdus = [x[0] for x in pending_pdus] + edus = [x[0] for x in pending_edus] + failures = [x[0].get_dict() for x in pending_failures] + deferreds = [ + x[1] + for x in pending_pdus + pending_edus + pending_failures + ] + txn_id = str(self._next_txn_id) limiter = yield get_retry_limiter( |