summary refs log tree commit diff
path: root/synapse/federation/transaction_queue.py
diff options
context:
space:
mode:
authorErik Johnston <erikj@jki.re>2016-09-09 11:49:06 +0100
committerGitHub <noreply@github.com>2016-09-09 11:49:06 +0100
commita15ba15e641714b7d2d89de94e967427a1b22163 (patch)
treeaad1d059eab29d356dc4cec45e0b8655b25e2286 /synapse/federation/transaction_queue.py
parentMerge pull request #1087 from matrix-org/markjh/reapply_delta (diff)
parentCorrectly guard against multiple concurrent transactions (diff)
downloadsynapse-a15ba15e641714b7d2d89de94e967427a1b22163.tar.xz
Merge pull request #1088 from matrix-org/erikj/transaction_queue_check
Correctly guard against multiple concurrent transactions
Diffstat (limited to 'synapse/federation/transaction_queue.py')
-rw-r--r--synapse/federation/transaction_queue.py79
1 files changed, 41 insertions, 38 deletions
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 5c7245d383..6900b0121b 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -170,44 +170,53 @@ class TransactionQueue(object):
 
     @defer.inlineCallbacks
     def _attempt_new_transaction(self, destination):
-        yield run_on_reactor()
-        while True:
-            # list of (pending_pdu, deferred, order)
-            if destination in self.pending_transactions:
-                # XXX: pending_transactions can get stuck on by a never-ending
-                # request at which point pending_pdus_by_dest just keeps growing.
-                # we need application-layer timeouts of some flavour of these
-                # requests
-                logger.debug(
-                    "TX [%s] Transaction already in progress",
-                    destination
-                )
-                return
+        # list of (pending_pdu, deferred, order)
+        if destination in self.pending_transactions:
+            # XXX: pending_transactions can get stuck on by a never-ending
+            # request at which point pending_pdus_by_dest just keeps growing.
+            # we need application-layer timeouts of some flavour of these
+            # requests
+            logger.debug(
+                "TX [%s] Transaction already in progress",
+                destination
+            )
+            return
 
-            pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
-            pending_edus = self.pending_edus_by_dest.pop(destination, [])
-            pending_failures = self.pending_failures_by_dest.pop(destination, [])
+        try:
+            self.pending_transactions[destination] = 1
 
-            device_message_edus, device_stream_id = (
-                yield self._get_new_device_messages(destination)
-            )
+            yield run_on_reactor()
 
-            pending_edus.extend(device_message_edus)
+            while True:
+                    pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
+                    pending_edus = self.pending_edus_by_dest.pop(destination, [])
+                    pending_failures = self.pending_failures_by_dest.pop(destination, [])
 
-            if pending_pdus:
-                logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d",
-                             destination, len(pending_pdus))
+                    device_message_edus, device_stream_id = (
+                        yield self._get_new_device_messages(destination)
+                    )
 
-            if not pending_pdus and not pending_edus and not pending_failures:
-                logger.debug("TX [%s] Nothing to send", destination)
-                self.last_device_stream_id_by_dest[destination] = device_stream_id
-                return
+                    pending_edus.extend(device_message_edus)
 
-            yield self._send_new_transaction(
-                destination, pending_pdus, pending_edus, pending_failures,
-                device_stream_id,
-                should_delete_from_device_stream=bool(device_message_edus)
-            )
+                    if pending_pdus:
+                        logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d",
+                                     destination, len(pending_pdus))
+
+                    if not pending_pdus and not pending_edus and not pending_failures:
+                        logger.debug("TX [%s] Nothing to send", destination)
+                        self.last_device_stream_id_by_dest[destination] = (
+                            device_stream_id
+                        )
+                        return
+
+                    yield self._send_new_transaction(
+                        destination, pending_pdus, pending_edus, pending_failures,
+                        device_stream_id,
+                        should_delete_from_device_stream=bool(device_message_edus)
+                    )
+        finally:
+            # We want to be *very* sure we delete this after we stop processing
+            self.pending_transactions.pop(destination, None)
 
     @defer.inlineCallbacks
     def _get_new_device_messages(self, destination):
@@ -240,8 +249,6 @@ class TransactionQueue(object):
             failures = [x.get_dict() for x in pending_failures]
 
             try:
-                self.pending_transactions[destination] = 1
-
                 logger.debug("TX [%s] _attempt_new_transaction", destination)
 
                 txn_id = str(self._next_txn_id)
@@ -375,7 +382,3 @@ class TransactionQueue(object):
 
                 for p in pdus:
                     logger.info("Failed to send event %s to %s", p.event_id, destination)
-
-            finally:
-                # We want to be *very* sure we delete this after we stop processing
-                self.pending_transactions.pop(destination, None)