diff options
author | Mark Haines <mark.haines@matrix.org> | 2016-09-07 15:39:13 +0100 |
---|---|---|
committer | Mark Haines <mark.haines@matrix.org> | 2016-09-07 15:39:13 +0100 |
commit | cb98ac261bbda859574ec33cab934a3269e11e17 (patch) | |
tree | 3bec49e42c63aa2f5d7f1b9f5b2569a4e677b587 | |
parent | Add stream change caches for device messages (diff) | |
download | synapse-cb98ac261bbda859574ec33cab934a3269e11e17.tar.xz |
Move the check for federated device_messages.
Move the check into _attempt_new_transaction. Only delete messages if there were messages to delete.
-rw-r--r-- | synapse/federation/transaction_queue.py | 26 |
1 files changed, 15 insertions, 11 deletions
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 233c6606a9..c0ee946ac0 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -177,6 +177,12 @@ class TransactionQueue(object): pending_edus = self.pending_edus_by_dest.pop(destination, []) pending_failures = self.pending_failures_by_dest.pop(destination, []) + device_message_edus, device_stream_id = ( + yield self._get_new_device_messages(destination) + ) + + pending_edus.extend(device_message_edus) + if pending_pdus: logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d", destination, len(pending_pdus)) @@ -186,7 +192,9 @@ class TransactionQueue(object): return yield self._send_new_transaction( - destination, pending_pdus, pending_edus, pending_failures + destination, pending_pdus, pending_edus, pending_failures, + device_stream_id, + should_delete_from_device_stream=bool(device_message_edus) ) @defer.inlineCallbacks @@ -210,7 +218,8 @@ class TransactionQueue(object): @measure_func("_send_new_transaction") @defer.inlineCallbacks def _send_new_transaction(self, destination, pending_pdus, pending_edus, - pending_failures): + pending_failures, device_stream_id, + should_delete_from_device_stream): # Sort based on the order field pending_pdus.sort(key=lambda t: t[1]) @@ -231,12 +240,6 @@ class TransactionQueue(object): self.store, ) - device_message_edus, device_stream_id = ( - yield self._get_new_device_messages(destination) - ) - - edus.extend(device_message_edus) - logger.debug( "TX [%s] {%s} Attempting new transaction" " (pdus: %d, edus: %d, failures: %d)", @@ -327,9 +330,10 @@ class TransactionQueue(object): ) else: # Remove the acknowledged device messages from the database - yield self.store.delete_device_msgs_for_remote( - destination, device_stream_id - ) + if should_delete_from_device_stream: + yield self.store.delete_device_msgs_for_remote( + destination, device_stream_id + ) self.last_device_stream_id_by_dest[destination] = device_stream_id except NotRetryingDestination: logger.info( |