summary refs log tree commit diff
path: root/synapse/federation
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation')
-rw-r--r--synapse/federation/federation_server.py2
-rw-r--r--synapse/federation/transaction_queue.py43
2 files changed, 37 insertions, 8 deletions
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 5621655098..3fa7b2315c 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -188,7 +188,7 @@ class FederationServer(FederationBase):
             except SynapseError as e:
                 logger.info("Failed to handle edu %r: %r", edu_type, e)
             except Exception as e:
-                logger.exception("Failed to handle edu %r", edu_type, e)
+                logger.exception("Failed to handle edu %r", edu_type)
         else:
             logger.warn("Received EDU of type %s with no handler", edu_type)
 
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index cb2ef0210c..5e86141f86 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -17,7 +17,7 @@
 from twisted.internet import defer
 
 from .persistence import TransactionActions
-from .units import Transaction
+from .units import Transaction, Edu
 
 from synapse.api.errors import HttpResponseException
 from synapse.util.async import run_on_reactor
@@ -187,6 +187,24 @@ class TransactionQueue(object):
                 destination, pending_pdus, pending_edus, pending_failures
             )
 
+    @defer.inlineCallbacks
+    def _get_new_device_messages(self, destination):
+        last_device_stream_id = 0
+        to_device_stream_id = self.store.get_to_device_stream_token()
+        contents, stream_id = yield self.store.get_new_device_msgs_for_remote(
+            destination, last_device_stream_id, to_device_stream_id
+        )
+        edus = [
+            Edu(
+                origin=self.server_name,
+                destination=destination,
+                edu_type="m.direct_to_device",
+                content=content,
+            )
+            for content in contents
+        ]
+        defer.returnValue((edus, stream_id))
+
     @measure_func("_send_new_transaction")
     @defer.inlineCallbacks
     def _send_new_transaction(self, destination, pending_pdus, pending_edus,
@@ -211,13 +229,19 @@ class TransactionQueue(object):
                     self.store,
                 )
 
+                device_message_edus, device_stream_id = (
+                    yield self._get_new_device_messages(destination)
+                )
+
+                edus.extend(device_message_edus)
+
                 logger.debug(
                     "TX [%s] {%s} Attempting new transaction"
                     " (pdus: %d, edus: %d, failures: %d)",
                     destination, txn_id,
-                    len(pending_pdus),
-                    len(pending_edus),
-                    len(pending_failures)
+                    len(pdus),
+                    len(edus),
+                    len(failures)
                 )
 
                 logger.debug("TX [%s] Persisting transaction...", destination)
@@ -242,9 +266,9 @@ class TransactionQueue(object):
                     " (PDUs: %d, EDUs: %d, failures: %d)",
                     destination, txn_id,
                     transaction.transaction_id,
-                    len(pending_pdus),
-                    len(pending_edus),
-                    len(pending_failures),
+                    len(pdus),
+                    len(edus),
+                    len(failures),
                 )
 
                 with limiter:
@@ -299,6 +323,11 @@ class TransactionQueue(object):
                         logger.info(
                             "Failed to send event %s to %s", p.event_id, destination
                         )
+                else:
+                    # Remove the acknowledged device messages from the database
+                    yield self.store.delete_device_msgs_for_remote(
+                        destination, device_stream_id
+                    )
             except NotRetryingDestination:
                 logger.info(
                     "TX [%s] not ready for retry yet - "