diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index bb3d9258a6..90235ff098 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -303,18 +303,10 @@ class TransactionQueue(object):
try:
self.pending_transactions[destination] = 1
+ # XXX: what's this for?
yield run_on_reactor()
while True:
- pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
- pending_edus = self.pending_edus_by_dest.pop(destination, [])
- pending_presence = self.pending_presence_by_dest.pop(destination, {})
- pending_failures = self.pending_failures_by_dest.pop(destination, [])
-
- pending_edus.extend(
- self.pending_edus_keyed_by_dest.pop(destination, {}).values()
- )
-
limiter = yield get_retry_limiter(
destination,
self.clock,
@@ -326,6 +318,24 @@ class TransactionQueue(object):
yield self._get_new_device_messages(destination)
)
+ # BEGIN CRITICAL SECTION
+ #
+ # In order to avoid a race condition, we need to make sure that
+ # the following code (from popping the queues up to the point
+ # where we decide if we actually have any pending messages) is
+ # atomic - otherwise new PDUs or EDUs might arrive in the
+ # meantime, but not get sent because we hold the
+ # pending_transactions flag.
+
+ pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
+ pending_edus = self.pending_edus_by_dest.pop(destination, [])
+ pending_presence = self.pending_presence_by_dest.pop(destination, {})
+ pending_failures = self.pending_failures_by_dest.pop(destination, [])
+
+ pending_edus.extend(
+ self.pending_edus_keyed_by_dest.pop(destination, {}).values()
+ )
+
pending_edus.extend(device_message_edus)
if pending_presence:
pending_edus.append(
@@ -355,6 +365,8 @@ class TransactionQueue(object):
)
return
+ # END CRITICAL SECTION
+
success = yield self._send_new_transaction(
destination, pending_pdus, pending_edus, pending_failures,
limiter=limiter,
|