diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 233c6606a9..c0ee946ac0 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -177,6 +177,12 @@ class TransactionQueue(object):
pending_edus = self.pending_edus_by_dest.pop(destination, [])
pending_failures = self.pending_failures_by_dest.pop(destination, [])
+ device_message_edus, device_stream_id = (
+ yield self._get_new_device_messages(destination)
+ )
+
+ pending_edus.extend(device_message_edus)
+
if pending_pdus:
logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d",
destination, len(pending_pdus))
@@ -186,7 +192,9 @@ class TransactionQueue(object):
return
yield self._send_new_transaction(
- destination, pending_pdus, pending_edus, pending_failures
+ destination, pending_pdus, pending_edus, pending_failures,
+ device_stream_id,
+ should_delete_from_device_stream=bool(device_message_edus)
)
@defer.inlineCallbacks
@@ -210,7 +218,8 @@ class TransactionQueue(object):
@measure_func("_send_new_transaction")
@defer.inlineCallbacks
def _send_new_transaction(self, destination, pending_pdus, pending_edus,
- pending_failures):
+ pending_failures, device_stream_id,
+ should_delete_from_device_stream):
# Sort based on the order field
pending_pdus.sort(key=lambda t: t[1])
@@ -231,12 +240,6 @@ class TransactionQueue(object):
self.store,
)
- device_message_edus, device_stream_id = (
- yield self._get_new_device_messages(destination)
- )
-
- edus.extend(device_message_edus)
-
logger.debug(
"TX [%s] {%s} Attempting new transaction"
" (pdus: %d, edus: %d, failures: %d)",
@@ -327,9 +330,10 @@ class TransactionQueue(object):
)
else:
# Remove the acknowledged device messages from the database
- yield self.store.delete_device_msgs_for_remote(
- destination, device_stream_id
- )
+ if should_delete_from_device_stream:
+ yield self.store.delete_device_msgs_for_remote(
+ destination, device_stream_id
+ )
self.last_device_stream_id_by_dest[destination] = device_stream_id
except NotRetryingDestination:
logger.info(
|