Don't wake up destination transaction queue if they're not due for retry. (#16223)
1 files changed, 3 insertions, 5 deletions
| diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index 3b88dc68ea..51285e6d33 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -422,7 +422,7 @@ class FederationSenderHandler:
         # The federation stream contains things that we want to send out, e.g.
         # presence, typing, etc.
         if stream_name == "federation":
-            send_queue.process_rows_for_federation(self.federation_sender, rows)
+            await send_queue.process_rows_for_federation(self.federation_sender, rows)
             await self.update_token(token)
 
         # ... and when new receipts happen
@@ -439,16 +439,14 @@ class FederationSenderHandler:
                 for row in rows
                 if not row.entity.startswith("@") and not row.is_signature
             }
-            for host in hosts:
-                self.federation_sender.send_device_messages(host, immediate=False)
+            await self.federation_sender.send_device_messages(hosts, immediate=False)
 
         elif stream_name == ToDeviceStream.NAME:
             # The to_device stream includes stuff to be pushed to both local
             # clients and remote servers, so we ignore entities that start with
             # '@' (since they'll be local users rather than destinations).
             hosts = {row.entity for row in rows if not row.entity.startswith("@")}
-            for host in hosts:
-                self.federation_sender.send_device_messages(host)
+            await self.federation_sender.send_device_messages(hosts)
 
     async def _on_new_receipts(
         self, rows: Iterable[ReceiptsStream.ReceiptsStreamRow]
 |