diff options
author | Erik Johnston <erik@matrix.org> | 2016-09-09 11:44:36 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2016-09-09 11:44:36 +0100 |
commit | d2688d7f03b006a1a4d340dce04550214ae86185 (patch) | |
tree | 8b4e726df061e9a51a7bae7ed3968238e5de66ce | |
parent | Merge branch 'release-v0.17.3' of github.com:matrix-org/synapse into develop (diff) | |
download | synapse-d2688d7f03b006a1a4d340dce04550214ae86185.tar.xz |
Correctly guard against multiple concurrent transactions
-rw-r--r-- | synapse/federation/transaction_queue.py | 79 |
1 files changed, 41 insertions, 38 deletions
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 5c7245d383..6900b0121b 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -170,44 +170,53 @@ class TransactionQueue(object): @defer.inlineCallbacks def _attempt_new_transaction(self, destination): - yield run_on_reactor() - while True: - # list of (pending_pdu, deferred, order) - if destination in self.pending_transactions: - # XXX: pending_transactions can get stuck on by a never-ending - # request at which point pending_pdus_by_dest just keeps growing. - # we need application-layer timeouts of some flavour of these - # requests - logger.debug( - "TX [%s] Transaction already in progress", - destination - ) - return + # list of (pending_pdu, deferred, order) + if destination in self.pending_transactions: + # XXX: pending_transactions can get stuck on by a never-ending + # request at which point pending_pdus_by_dest just keeps growing. + # we need application-layer timeouts of some flavour of these + # requests + logger.debug( + "TX [%s] Transaction already in progress", + destination + ) + return - pending_pdus = self.pending_pdus_by_dest.pop(destination, []) - pending_edus = self.pending_edus_by_dest.pop(destination, []) - pending_failures = self.pending_failures_by_dest.pop(destination, []) + try: + self.pending_transactions[destination] = 1 - device_message_edus, device_stream_id = ( - yield self._get_new_device_messages(destination) - ) + yield run_on_reactor() - pending_edus.extend(device_message_edus) + while True: + pending_pdus = self.pending_pdus_by_dest.pop(destination, []) + pending_edus = self.pending_edus_by_dest.pop(destination, []) + pending_failures = self.pending_failures_by_dest.pop(destination, []) - if pending_pdus: - logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d", - destination, len(pending_pdus)) + device_message_edus, device_stream_id = ( + yield self._get_new_device_messages(destination) + ) - if not pending_pdus and not pending_edus and not pending_failures: - logger.debug("TX [%s] Nothing to send", destination) - self.last_device_stream_id_by_dest[destination] = device_stream_id - return + pending_edus.extend(device_message_edus) - yield self._send_new_transaction( - destination, pending_pdus, pending_edus, pending_failures, - device_stream_id, - should_delete_from_device_stream=bool(device_message_edus) - ) + if pending_pdus: + logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d", + destination, len(pending_pdus)) + + if not pending_pdus and not pending_edus and not pending_failures: + logger.debug("TX [%s] Nothing to send", destination) + self.last_device_stream_id_by_dest[destination] = ( + device_stream_id + ) + return + + yield self._send_new_transaction( + destination, pending_pdus, pending_edus, pending_failures, + device_stream_id, + should_delete_from_device_stream=bool(device_message_edus) + ) + finally: + # We want to be *very* sure we delete this after we stop processing + self.pending_transactions.pop(destination, None) @defer.inlineCallbacks def _get_new_device_messages(self, destination): @@ -240,8 +249,6 @@ class TransactionQueue(object): failures = [x.get_dict() for x in pending_failures] try: - self.pending_transactions[destination] = 1 - logger.debug("TX [%s] _attempt_new_transaction", destination) txn_id = str(self._next_txn_id) @@ -375,7 +382,3 @@ class TransactionQueue(object): for p in pdus: logger.info("Failed to send event %s to %s", p.event_id, destination) - - finally: - # We want to be *very* sure we delete this after we stop processing - self.pending_transactions.pop(destination, None) |