diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 5c7245d383..6900b0121b 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -170,44 +170,53 @@ class TransactionQueue(object):
@defer.inlineCallbacks
def _attempt_new_transaction(self, destination):
- yield run_on_reactor()
- while True:
- # list of (pending_pdu, deferred, order)
- if destination in self.pending_transactions:
- # XXX: pending_transactions can get stuck on by a never-ending
- # request at which point pending_pdus_by_dest just keeps growing.
- # we need application-layer timeouts of some flavour of these
- # requests
- logger.debug(
- "TX [%s] Transaction already in progress",
- destination
- )
- return
+ # list of (pending_pdu, deferred, order)
+ if destination in self.pending_transactions:
+ # XXX: pending_transactions can get stuck on by a never-ending
+ # request at which point pending_pdus_by_dest just keeps growing.
+ # we need application-layer timeouts of some flavour of these
+ # requests
+ logger.debug(
+ "TX [%s] Transaction already in progress",
+ destination
+ )
+ return
- pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
- pending_edus = self.pending_edus_by_dest.pop(destination, [])
- pending_failures = self.pending_failures_by_dest.pop(destination, [])
+ try:
+ self.pending_transactions[destination] = 1
- device_message_edus, device_stream_id = (
- yield self._get_new_device_messages(destination)
- )
+ yield run_on_reactor()
- pending_edus.extend(device_message_edus)
+ while True:
+ pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
+ pending_edus = self.pending_edus_by_dest.pop(destination, [])
+ pending_failures = self.pending_failures_by_dest.pop(destination, [])
- if pending_pdus:
- logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d",
- destination, len(pending_pdus))
+ device_message_edus, device_stream_id = (
+ yield self._get_new_device_messages(destination)
+ )
- if not pending_pdus and not pending_edus and not pending_failures:
- logger.debug("TX [%s] Nothing to send", destination)
- self.last_device_stream_id_by_dest[destination] = device_stream_id
- return
+ pending_edus.extend(device_message_edus)
- yield self._send_new_transaction(
- destination, pending_pdus, pending_edus, pending_failures,
- device_stream_id,
- should_delete_from_device_stream=bool(device_message_edus)
- )
+ if pending_pdus:
+ logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d",
+ destination, len(pending_pdus))
+
+ if not pending_pdus and not pending_edus and not pending_failures:
+ logger.debug("TX [%s] Nothing to send", destination)
+ self.last_device_stream_id_by_dest[destination] = (
+ device_stream_id
+ )
+ return
+
+ yield self._send_new_transaction(
+ destination, pending_pdus, pending_edus, pending_failures,
+ device_stream_id,
+ should_delete_from_device_stream=bool(device_message_edus)
+ )
+ finally:
+ # We want to be *very* sure we delete this after we stop processing
+ self.pending_transactions.pop(destination, None)
@defer.inlineCallbacks
def _get_new_device_messages(self, destination):
@@ -240,8 +249,6 @@ class TransactionQueue(object):
failures = [x.get_dict() for x in pending_failures]
try:
- self.pending_transactions[destination] = 1
-
logger.debug("TX [%s] _attempt_new_transaction", destination)
txn_id = str(self._next_txn_id)
@@ -375,7 +382,3 @@ class TransactionQueue(object):
for p in pdus:
logger.info("Failed to send event %s to %s", p.event_id, destination)
-
- finally:
- # We want to be *very* sure we delete this after we stop processing
- self.pending_transactions.pop(destination, None)
|