From 0c4cf9372b3200325109dc2e4975197aa9b7b7ca Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 20 Feb 2017 16:43:29 +0000 Subject: Fix a race in transaction queue It was theoretically possible for a PDU to get queued and not sent for ages. On closer inspection I think there were bigger problems elsewhere, but we might as well fix this since it's easy. --- synapse/federation/transaction_queue.py | 30 +++++++++++++++++++++--------- 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index bb3d9258a6..90235ff098 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -303,18 +303,10 @@ class TransactionQueue(object): try: self.pending_transactions[destination] = 1 + # XXX: what's this for? yield run_on_reactor() while True: - pending_pdus = self.pending_pdus_by_dest.pop(destination, []) - pending_edus = self.pending_edus_by_dest.pop(destination, []) - pending_presence = self.pending_presence_by_dest.pop(destination, {}) - pending_failures = self.pending_failures_by_dest.pop(destination, []) - - pending_edus.extend( - self.pending_edus_keyed_by_dest.pop(destination, {}).values() - ) - limiter = yield get_retry_limiter( destination, self.clock, @@ -326,6 +318,24 @@ class TransactionQueue(object): yield self._get_new_device_messages(destination) ) + # BEGIN CRITICAL SECTION + # + # In order to avoid a race condition, we need to make sure that + # the following code (from popping the queues up to the point + # where we decide if we actually have any pending messages) is + # atomic - otherwise new PDUs or EDUs might arrive in the + # meantime, but not get sent because we hold the + # pending_transactions flag. + + pending_pdus = self.pending_pdus_by_dest.pop(destination, []) + pending_edus = self.pending_edus_by_dest.pop(destination, []) + pending_presence = self.pending_presence_by_dest.pop(destination, {}) + pending_failures = self.pending_failures_by_dest.pop(destination, []) + + pending_edus.extend( + self.pending_edus_keyed_by_dest.pop(destination, {}).values() + ) + pending_edus.extend(device_message_edus) if pending_presence: pending_edus.append( @@ -355,6 +365,8 @@ class TransactionQueue(object): ) return + # END CRITICAL SECTION + success = yield self._send_new_transaction( destination, pending_pdus, pending_edus, pending_failures, limiter=limiter, -- cgit 1.4.1