diff options
author | Richard van der Hoff <github@rvanderhoff.org.uk> | 2017-02-21 08:46:38 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-02-21 08:46:38 +0000 |
commit | c927d6de9bce64999e3e3b850031caa87629c391 (patch) | |
tree | d55aba4882c13192a1f122deebf4e60a0c5c3467 | |
parent | Merge pull request #1929 from matrix-org/erikj/context_fix (diff) | |
parent | Fix a race in transaction queue (diff) | |
download | synapse-c927d6de9bce64999e3e3b850031caa87629c391.tar.xz |
Merge pull request #1930 from matrix-org/rav/fix_txnq_race
Fix a race in transaction queue
-rw-r--r-- | synapse/federation/transaction_queue.py | 30 |
1 files changed, 21 insertions, 9 deletions
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index bb3d9258a6..90235ff098 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -303,18 +303,10 @@ class TransactionQueue(object): try: self.pending_transactions[destination] = 1 + # XXX: what's this for? yield run_on_reactor() while True: - pending_pdus = self.pending_pdus_by_dest.pop(destination, []) - pending_edus = self.pending_edus_by_dest.pop(destination, []) - pending_presence = self.pending_presence_by_dest.pop(destination, {}) - pending_failures = self.pending_failures_by_dest.pop(destination, []) - - pending_edus.extend( - self.pending_edus_keyed_by_dest.pop(destination, {}).values() - ) - limiter = yield get_retry_limiter( destination, self.clock, @@ -326,6 +318,24 @@ class TransactionQueue(object): yield self._get_new_device_messages(destination) ) + # BEGIN CRITICAL SECTION + # + # In order to avoid a race condition, we need to make sure that + # the following code (from popping the queues up to the point + # where we decide if we actually have any pending messages) is + # atomic - otherwise new PDUs or EDUs might arrive in the + # meantime, but not get sent because we hold the + # pending_transactions flag. + + pending_pdus = self.pending_pdus_by_dest.pop(destination, []) + pending_edus = self.pending_edus_by_dest.pop(destination, []) + pending_presence = self.pending_presence_by_dest.pop(destination, {}) + pending_failures = self.pending_failures_by_dest.pop(destination, []) + + pending_edus.extend( + self.pending_edus_keyed_by_dest.pop(destination, {}).values() + ) + pending_edus.extend(device_message_edus) if pending_presence: pending_edus.append( @@ -355,6 +365,8 @@ class TransactionQueue(object): ) return + # END CRITICAL SECTION + success = yield self._send_new_transaction( destination, pending_pdus, pending_edus, pending_failures, limiter=limiter, |