diff options
author | Richard van der Hoff <richard@matrix.org> | 2017-02-20 16:43:29 +0000 |
---|---|---|
committer | Richard van der Hoff <richard@matrix.org> | 2017-02-20 16:46:25 +0000 |
commit | 0c4cf9372b3200325109dc2e4975197aa9b7b7ca (patch) | |
tree | d55aba4882c13192a1f122deebf4e60a0c5c3467 /synapse/federation | |
parent | Merge pull request #1929 from matrix-org/erikj/context_fix (diff) | |
download | synapse-0c4cf9372b3200325109dc2e4975197aa9b7b7ca.tar.xz |
Fix a race in transaction queue
It was theoretically possible for a PDU to get queued and not sent for ages. On closer inspection I think there were bigger problems elsewhere, but we might as well fix this since it's easy.
Diffstat (limited to 'synapse/federation')
-rw-r--r-- | synapse/federation/transaction_queue.py | 30 |
1 files changed, 21 insertions, 9 deletions
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index bb3d9258a6..90235ff098 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -303,18 +303,10 @@ class TransactionQueue(object): try: self.pending_transactions[destination] = 1 + # XXX: what's this for? yield run_on_reactor() while True: - pending_pdus = self.pending_pdus_by_dest.pop(destination, []) - pending_edus = self.pending_edus_by_dest.pop(destination, []) - pending_presence = self.pending_presence_by_dest.pop(destination, {}) - pending_failures = self.pending_failures_by_dest.pop(destination, []) - - pending_edus.extend( - self.pending_edus_keyed_by_dest.pop(destination, {}).values() - ) - limiter = yield get_retry_limiter( destination, self.clock, @@ -326,6 +318,24 @@ class TransactionQueue(object): yield self._get_new_device_messages(destination) ) + # BEGIN CRITICAL SECTION + # + # In order to avoid a race condition, we need to make sure that + # the following code (from popping the queues up to the point + # where we decide if we actually have any pending messages) is + # atomic - otherwise new PDUs or EDUs might arrive in the + # meantime, but not get sent because we hold the + # pending_transactions flag. + + pending_pdus = self.pending_pdus_by_dest.pop(destination, []) + pending_edus = self.pending_edus_by_dest.pop(destination, []) + pending_presence = self.pending_presence_by_dest.pop(destination, {}) + pending_failures = self.pending_failures_by_dest.pop(destination, []) + + pending_edus.extend( + self.pending_edus_keyed_by_dest.pop(destination, {}).values() + ) + pending_edus.extend(device_message_edus) if pending_presence: pending_edus.append( @@ -355,6 +365,8 @@ class TransactionQueue(object): ) return + # END CRITICAL SECTION + success = yield self._send_new_transaction( destination, pending_pdus, pending_edus, pending_failures, limiter=limiter, |