summary refs log tree commit diff
path: root/synapse/federation/transaction_queue.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2017-03-13 09:59:54 +0000
committerErik Johnston <erik@matrix.org>2017-03-13 09:59:54 +0000
commit672dcf59d3f9da12da43a77e316e2eec76d3ee4c (patch)
tree9186644b06ef01e609ca018e02100ceecf0f3ea4 /synapse/federation/transaction_queue.py
parentMerge tag 'v0.19.3-rc1' into release-v0.19.3 (diff)
parentRevert "Support registration & login with phone number" (diff)
downloadsynapse-672dcf59d3f9da12da43a77e316e2eec76d3ee4c.tar.xz
Merge branch 'develop' of github.com:matrix-org/synapse into release-v0.19.3
Diffstat (limited to 'synapse/federation/transaction_queue.py')
-rw-r--r--synapse/federation/transaction_queue.py30
1 files changed, 21 insertions, 9 deletions
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index bb3d9258a6..90235ff098 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -303,18 +303,10 @@ class TransactionQueue(object):
         try:
             self.pending_transactions[destination] = 1
 
+            # XXX: what's this for?
             yield run_on_reactor()
 
             while True:
-                pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
-                pending_edus = self.pending_edus_by_dest.pop(destination, [])
-                pending_presence = self.pending_presence_by_dest.pop(destination, {})
-                pending_failures = self.pending_failures_by_dest.pop(destination, [])
-
-                pending_edus.extend(
-                    self.pending_edus_keyed_by_dest.pop(destination, {}).values()
-                )
-
                 limiter = yield get_retry_limiter(
                     destination,
                     self.clock,
@@ -326,6 +318,24 @@ class TransactionQueue(object):
                     yield self._get_new_device_messages(destination)
                 )
 
+                # BEGIN CRITICAL SECTION
+                #
+                # In order to avoid a race condition, we need to make sure that
+                # the following code (from popping the queues up to the point
+                # where we decide if we actually have any pending messages) is
+                # atomic - otherwise new PDUs or EDUs might arrive in the
+                # meantime, but not get sent because we hold the
+                # pending_transactions flag.
+
+                pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
+                pending_edus = self.pending_edus_by_dest.pop(destination, [])
+                pending_presence = self.pending_presence_by_dest.pop(destination, {})
+                pending_failures = self.pending_failures_by_dest.pop(destination, [])
+
+                pending_edus.extend(
+                    self.pending_edus_keyed_by_dest.pop(destination, {}).values()
+                )
+
                 pending_edus.extend(device_message_edus)
                 if pending_presence:
                     pending_edus.append(
@@ -355,6 +365,8 @@ class TransactionQueue(object):
                     )
                     return
 
+                # END CRITICAL SECTION
+
                 success = yield self._send_new_transaction(
                     destination, pending_pdus, pending_edus, pending_failures,
                     limiter=limiter,