summary refs log tree commit diff
path: root/synapse/federation/sender
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation/sender')
-rw-r--r--synapse/federation/sender/__init__.py26
-rw-r--r--synapse/federation/sender/per_destination_queue.py10
2 files changed, 27 insertions, 9 deletions
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 6106a486d1..30e2421efc 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -118,7 +118,12 @@ class AbstractFederationSender(metaclass=abc.ABCMeta):
         raise NotImplementedError()
 
     @abc.abstractmethod
-    def send_device_messages(self, destination: str) -> None:
+    def send_device_messages(self, destination: str, immediate: bool = True) -> None:
+        """Tells the sender that a new device message is ready to be sent to the
+        destination. The `immediate` flag specifies whether the messages should
+        be tried to be sent immediately, or whether it can be delayed for a
+        short while (to aid performance).
+        """
         raise NotImplementedError()
 
     @abc.abstractmethod
@@ -146,9 +151,8 @@ class AbstractFederationSender(metaclass=abc.ABCMeta):
 
 
 @attr.s
-class _PresenceQueue:
-    """A queue of destinations that need to be woken up due to new presence
-    updates.
+class _DestinationWakeupQueue:
+    """A queue of destinations that need to be woken up due to new updates.
 
     Staggers waking up of per destination queues to ensure that we don't attempt
     to start TLS connections with many hosts all at once, leading to pinned CPU.
@@ -175,7 +179,7 @@ class _PresenceQueue:
         if not self.processing:
             self._handle()
 
-    @wrap_as_background_process("_PresenceQueue.handle")
+    @wrap_as_background_process("_DestinationWakeupQueue.handle")
     async def _handle(self) -> None:
         """Background process to drain the queue."""
 
@@ -297,7 +301,7 @@ class FederationSender(AbstractFederationSender):
 
         self._external_cache = hs.get_external_cache()
 
-        self._presence_queue = _PresenceQueue(self, self.clock)
+        self._destination_wakeup_queue = _DestinationWakeupQueue(self, self.clock)
 
     def _get_per_destination_queue(self, destination: str) -> PerDestinationQueue:
         """Get or create a PerDestinationQueue for the given destination
@@ -614,7 +618,7 @@ class FederationSender(AbstractFederationSender):
                 states, start_loop=False
             )
 
-            self._presence_queue.add_to_queue(destination)
+            self._destination_wakeup_queue.add_to_queue(destination)
 
     def build_and_send_edu(
         self,
@@ -667,7 +671,7 @@ class FederationSender(AbstractFederationSender):
         else:
             queue.send_edu(edu)
 
-    def send_device_messages(self, destination: str) -> None:
+    def send_device_messages(self, destination: str, immediate: bool = False) -> None:
         if destination == self.server_name:
             logger.warning("Not sending device update to ourselves")
             return
@@ -677,7 +681,11 @@ class FederationSender(AbstractFederationSender):
         ):
             return
 
-        self._get_per_destination_queue(destination).attempt_new_transaction()
+        if immediate:
+            self._get_per_destination_queue(destination).attempt_new_transaction()
+        else:
+            self._get_per_destination_queue(destination).mark_new_data()
+            self._destination_wakeup_queue.add_to_queue(destination)
 
     def wake_destination(self, destination: str) -> None:
         """Called when we want to retry sending transactions to a remote.
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index c8768f22bc..d80f0ac5e8 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -219,6 +219,16 @@ class PerDestinationQueue:
         self._pending_edus.append(edu)
         self.attempt_new_transaction()
 
+    def mark_new_data(self) -> None:
+        """Marks that the destination has new data to send, without starting a
+        new transaction.
+
+        If a transaction loop is already in progress then a new transcation will
+        be attempted when the current one finishes.
+        """
+
+        self._new_data_to_send = True
+
     def attempt_new_transaction(self) -> None:
         """Try to start a new transaction to this destination