summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/federation/send_queue.py2
-rw-r--r--synapse/federation/sender/__init__.py26
-rw-r--r--synapse/federation/sender/per_destination_queue.py10
-rw-r--r--synapse/handlers/device.py2
-rw-r--r--synapse/replication/tcp/client.py2
5 files changed, 30 insertions, 12 deletions
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 0d7c4f5067..d720b5fd3f 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -244,7 +244,7 @@ class FederationRemoteSendQueue(AbstractFederationSender):
 
         self.notifier.on_new_replication_data()
 
-    def send_device_messages(self, destination: str) -> None:
+    def send_device_messages(self, destination: str, immediate: bool = False) -> None:
         """As per FederationSender"""
         # We don't need to replicate this as it gets sent down a different
         # stream.
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 6106a486d1..30e2421efc 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -118,7 +118,12 @@ class AbstractFederationSender(metaclass=abc.ABCMeta):
         raise NotImplementedError()
 
     @abc.abstractmethod
-    def send_device_messages(self, destination: str) -> None:
+    def send_device_messages(self, destination: str, immediate: bool = True) -> None:
+        """Tells the sender that a new device message is ready to be sent to the
+        destination. The `immediate` flag specifies whether the messages should
+        be tried to be sent immediately, or whether it can be delayed for a
+        short while (to aid performance).
+        """
         raise NotImplementedError()
 
     @abc.abstractmethod
@@ -146,9 +151,8 @@ class AbstractFederationSender(metaclass=abc.ABCMeta):
 
 
 @attr.s
-class _PresenceQueue:
-    """A queue of destinations that need to be woken up due to new presence
-    updates.
+class _DestinationWakeupQueue:
+    """A queue of destinations that need to be woken up due to new updates.
 
     Staggers waking up of per destination queues to ensure that we don't attempt
     to start TLS connections with many hosts all at once, leading to pinned CPU.
@@ -175,7 +179,7 @@ class _PresenceQueue:
         if not self.processing:
             self._handle()
 
-    @wrap_as_background_process("_PresenceQueue.handle")
+    @wrap_as_background_process("_DestinationWakeupQueue.handle")
     async def _handle(self) -> None:
         """Background process to drain the queue."""
 
@@ -297,7 +301,7 @@ class FederationSender(AbstractFederationSender):
 
         self._external_cache = hs.get_external_cache()
 
-        self._presence_queue = _PresenceQueue(self, self.clock)
+        self._destination_wakeup_queue = _DestinationWakeupQueue(self, self.clock)
 
     def _get_per_destination_queue(self, destination: str) -> PerDestinationQueue:
         """Get or create a PerDestinationQueue for the given destination
@@ -614,7 +618,7 @@ class FederationSender(AbstractFederationSender):
                 states, start_loop=False
             )
 
-            self._presence_queue.add_to_queue(destination)
+            self._destination_wakeup_queue.add_to_queue(destination)
 
     def build_and_send_edu(
         self,
@@ -667,7 +671,7 @@ class FederationSender(AbstractFederationSender):
         else:
             queue.send_edu(edu)
 
-    def send_device_messages(self, destination: str) -> None:
+    def send_device_messages(self, destination: str, immediate: bool = False) -> None:
         if destination == self.server_name:
             logger.warning("Not sending device update to ourselves")
             return
@@ -677,7 +681,11 @@ class FederationSender(AbstractFederationSender):
         ):
             return
 
-        self._get_per_destination_queue(destination).attempt_new_transaction()
+        if immediate:
+            self._get_per_destination_queue(destination).attempt_new_transaction()
+        else:
+            self._get_per_destination_queue(destination).mark_new_data()
+            self._destination_wakeup_queue.add_to_queue(destination)
 
     def wake_destination(self, destination: str) -> None:
         """Called when we want to retry sending transactions to a remote.
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index c8768f22bc..d80f0ac5e8 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -219,6 +219,16 @@ class PerDestinationQueue:
         self._pending_edus.append(edu)
         self.attempt_new_transaction()
 
+    def mark_new_data(self) -> None:
+        """Marks that the destination has new data to send, without starting a
+        new transaction.
+
+        If a transaction loop is already in progress then a new transcation will
+        be attempted when the current one finishes.
+        """
+
+        self._new_data_to_send = True
+
     def attempt_new_transaction(self) -> None:
         """Try to start a new transaction to this destination
 
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 934b5bd734..d90cb259a6 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -506,7 +506,7 @@ class DeviceHandler(DeviceWorkerHandler):
                 "Sending device list update notif for %r to: %r", user_id, hosts
             )
             for host in hosts:
-                self.federation_sender.send_device_messages(host)
+                self.federation_sender.send_device_messages(host, immediate=False)
                 log_kv({"message": "sent device update to host", "host": host})
 
     async def notify_user_signature_update(
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index 1b8479b0b4..b8fc1d4db9 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -380,7 +380,7 @@ class FederationSenderHandler:
             # changes.
             hosts = {row.entity for row in rows if not row.entity.startswith("@")}
             for host in hosts:
-                self.federation_sender.send_device_messages(host)
+                self.federation_sender.send_device_messages(host, immediate=False)
 
         elif stream_name == ToDeviceStream.NAME:
             # The to_device stream includes stuff to be pushed to both local