summary refs log tree commit diff
path: root/synapse/federation
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation')
-rw-r--r--synapse/federation/federation_client.py3
-rw-r--r--synapse/federation/transaction_queue.py30
2 files changed, 22 insertions, 11 deletions
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index b5bcfd705a..5dcd4eecce 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -206,8 +206,7 @@ class FederationClient(FederationBase):
 
         Args:
             destinations (list): Which home servers to query
-            pdu_origin (str): The home server that originally sent the pdu.
-            event_id (str)
+            event_id (str): event to fetch
             outlier (bool): Indicates whether the PDU is an `outlier`, i.e. if
                 it's from an arbitary point in the context as opposed to part
                 of the current block of PDUs. Defaults to `False`
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index bb3d9258a6..90235ff098 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -303,18 +303,10 @@ class TransactionQueue(object):
         try:
             self.pending_transactions[destination] = 1
 
+            # XXX: what's this for?
             yield run_on_reactor()
 
             while True:
-                pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
-                pending_edus = self.pending_edus_by_dest.pop(destination, [])
-                pending_presence = self.pending_presence_by_dest.pop(destination, {})
-                pending_failures = self.pending_failures_by_dest.pop(destination, [])
-
-                pending_edus.extend(
-                    self.pending_edus_keyed_by_dest.pop(destination, {}).values()
-                )
-
                 limiter = yield get_retry_limiter(
                     destination,
                     self.clock,
@@ -326,6 +318,24 @@ class TransactionQueue(object):
                     yield self._get_new_device_messages(destination)
                 )
 
+                # BEGIN CRITICAL SECTION
+                #
+                # In order to avoid a race condition, we need to make sure that
+                # the following code (from popping the queues up to the point
+                # where we decide if we actually have any pending messages) is
+                # atomic - otherwise new PDUs or EDUs might arrive in the
+                # meantime, but not get sent because we hold the
+                # pending_transactions flag.
+
+                pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
+                pending_edus = self.pending_edus_by_dest.pop(destination, [])
+                pending_presence = self.pending_presence_by_dest.pop(destination, {})
+                pending_failures = self.pending_failures_by_dest.pop(destination, [])
+
+                pending_edus.extend(
+                    self.pending_edus_keyed_by_dest.pop(destination, {}).values()
+                )
+
                 pending_edus.extend(device_message_edus)
                 if pending_presence:
                     pending_edus.append(
@@ -355,6 +365,8 @@ class TransactionQueue(object):
                     )
                     return
 
+                # END CRITICAL SECTION
+
                 success = yield self._send_new_transaction(
                     destination, pending_pdus, pending_edus, pending_failures,
                     limiter=limiter,