Don't wake up destination transaction queue if they're not due for retry. (#16223)
1 files changed, 3 insertions, 5 deletions
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index 3b88dc68ea..51285e6d33 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -422,7 +422,7 @@ class FederationSenderHandler:
# The federation stream contains things that we want to send out, e.g.
# presence, typing, etc.
if stream_name == "federation":
- send_queue.process_rows_for_federation(self.federation_sender, rows)
+ await send_queue.process_rows_for_federation(self.federation_sender, rows)
await self.update_token(token)
# ... and when new receipts happen
@@ -439,16 +439,14 @@ class FederationSenderHandler:
for row in rows
if not row.entity.startswith("@") and not row.is_signature
}
- for host in hosts:
- self.federation_sender.send_device_messages(host, immediate=False)
+ await self.federation_sender.send_device_messages(hosts, immediate=False)
elif stream_name == ToDeviceStream.NAME:
# The to_device stream includes stuff to be pushed to both local
# clients and remote servers, so we ignore entities that start with
# '@' (since they'll be local users rather than destinations).
hosts = {row.entity for row in rows if not row.entity.startswith("@")}
- for host in hosts:
- self.federation_sender.send_device_messages(host)
+ await self.federation_sender.send_device_messages(hosts)
async def _on_new_receipts(
self, rows: Iterable[ReceiptsStream.ReceiptsStreamRow]
|