summary refs log tree commit diff
path: root/synapse/federation/transaction_queue.py
diff options
context:
space:
mode:
authorMark Haines <mark.haines@matrix.org>2016-09-07 15:39:13 +0100
committerMark Haines <mark.haines@matrix.org>2016-09-07 15:39:13 +0100
commitcb98ac261bbda859574ec33cab934a3269e11e17 (patch)
tree3bec49e42c63aa2f5d7f1b9f5b2569a4e677b587 /synapse/federation/transaction_queue.py
parentAdd stream change caches for device messages (diff)
downloadsynapse-cb98ac261bbda859574ec33cab934a3269e11e17.tar.xz
Move the check for federated device_messages.
Move the check into _attempt_new_transaction.
Only delete messages if there were messages to delete.
Diffstat (limited to '')
-rw-r--r--synapse/federation/transaction_queue.py26
1 files changed, 15 insertions, 11 deletions
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 233c6606a9..c0ee946ac0 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -177,6 +177,12 @@ class TransactionQueue(object):
             pending_edus = self.pending_edus_by_dest.pop(destination, [])
             pending_failures = self.pending_failures_by_dest.pop(destination, [])
 
+            device_message_edus, device_stream_id = (
+                yield self._get_new_device_messages(destination)
+            )
+
+            pending_edus.extend(device_message_edus)
+
             if pending_pdus:
                 logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d",
                              destination, len(pending_pdus))
@@ -186,7 +192,9 @@ class TransactionQueue(object):
                 return
 
             yield self._send_new_transaction(
-                destination, pending_pdus, pending_edus, pending_failures
+                destination, pending_pdus, pending_edus, pending_failures,
+                device_stream_id,
+                should_delete_from_device_stream=bool(device_message_edus)
             )
 
     @defer.inlineCallbacks
@@ -210,7 +218,8 @@ class TransactionQueue(object):
     @measure_func("_send_new_transaction")
     @defer.inlineCallbacks
     def _send_new_transaction(self, destination, pending_pdus, pending_edus,
-                              pending_failures):
+                              pending_failures, device_stream_id,
+                              should_delete_from_device_stream):
 
             # Sort based on the order field
             pending_pdus.sort(key=lambda t: t[1])
@@ -231,12 +240,6 @@ class TransactionQueue(object):
                     self.store,
                 )
 
-                device_message_edus, device_stream_id = (
-                    yield self._get_new_device_messages(destination)
-                )
-
-                edus.extend(device_message_edus)
-
                 logger.debug(
                     "TX [%s] {%s} Attempting new transaction"
                     " (pdus: %d, edus: %d, failures: %d)",
@@ -327,9 +330,10 @@ class TransactionQueue(object):
                         )
                 else:
                     # Remove the acknowledged device messages from the database
-                    yield self.store.delete_device_msgs_for_remote(
-                        destination, device_stream_id
-                    )
+                    if should_delete_from_device_stream:
+                        yield self.store.delete_device_msgs_for_remote(
+                            destination, device_stream_id
+                        )
                     self.last_device_stream_id_by_dest[destination] = device_stream_id
             except NotRetryingDestination:
                 logger.info(