diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 5621655098..3fa7b2315c 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -188,7 +188,7 @@ class FederationServer(FederationBase):
except SynapseError as e:
logger.info("Failed to handle edu %r: %r", edu_type, e)
except Exception as e:
- logger.exception("Failed to handle edu %r", edu_type, e)
+ logger.exception("Failed to handle edu %r", edu_type)
else:
logger.warn("Received EDU of type %s with no handler", edu_type)
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index cb2ef0210c..5e86141f86 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -17,7 +17,7 @@
from twisted.internet import defer
from .persistence import TransactionActions
-from .units import Transaction
+from .units import Transaction, Edu
from synapse.api.errors import HttpResponseException
from synapse.util.async import run_on_reactor
@@ -187,6 +187,24 @@ class TransactionQueue(object):
destination, pending_pdus, pending_edus, pending_failures
)
+ @defer.inlineCallbacks
+ def _get_new_device_messages(self, destination):
+ last_device_stream_id = 0
+ to_device_stream_id = self.store.get_to_device_stream_token()
+ contents, stream_id = yield self.store.get_new_device_msgs_for_remote(
+ destination, last_device_stream_id, to_device_stream_id
+ )
+ edus = [
+ Edu(
+ origin=self.server_name,
+ destination=destination,
+ edu_type="m.direct_to_device",
+ content=content,
+ )
+ for content in contents
+ ]
+ defer.returnValue((edus, stream_id))
+
@measure_func("_send_new_transaction")
@defer.inlineCallbacks
def _send_new_transaction(self, destination, pending_pdus, pending_edus,
@@ -211,13 +229,19 @@ class TransactionQueue(object):
self.store,
)
+ device_message_edus, device_stream_id = (
+ yield self._get_new_device_messages(destination)
+ )
+
+ edus.extend(device_message_edus)
+
logger.debug(
"TX [%s] {%s} Attempting new transaction"
" (pdus: %d, edus: %d, failures: %d)",
destination, txn_id,
- len(pending_pdus),
- len(pending_edus),
- len(pending_failures)
+ len(pdus),
+ len(edus),
+ len(failures)
)
logger.debug("TX [%s] Persisting transaction...", destination)
@@ -242,9 +266,9 @@ class TransactionQueue(object):
" (PDUs: %d, EDUs: %d, failures: %d)",
destination, txn_id,
transaction.transaction_id,
- len(pending_pdus),
- len(pending_edus),
- len(pending_failures),
+ len(pdus),
+ len(edus),
+ len(failures),
)
with limiter:
@@ -299,6 +323,11 @@ class TransactionQueue(object):
logger.info(
"Failed to send event %s to %s", p.event_id, destination
)
+ else:
+ # Remove the acknowledged device messages from the database
+ yield self.store.delete_device_msgs_for_remote(
+ destination, device_stream_id
+ )
except NotRetryingDestination:
logger.info(
"TX [%s] not ready for retry yet - "
|