summary refs log tree commit diff
path: root/synapse/federation/send_queue.py
diff options
context:
space:
mode:
authorErik Johnston <erikj@matrix.org>2023-09-04 17:14:09 +0100
committerGitHub <noreply@github.com>2023-09-04 17:14:09 +0100
commitd35bed8369514fe727b4fe1afb68f48cc8b2655a (patch)
tree7343ece3b82ac87a594865c4074623b45b0297b4 /synapse/federation/send_queue.py
parentAdd last_seen_ts to the admin users API (#16218) (diff)
downloadsynapse-d35bed8369514fe727b4fe1afb68f48cc8b2655a.tar.xz
Don't wake up destination transaction queue if they're not due for retry. (#16223)
Diffstat (limited to 'synapse/federation/send_queue.py')
-rw-r--r--synapse/federation/send_queue.py12
1 files changed, 7 insertions, 5 deletions
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index fb448f2155..6520795635 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -49,7 +49,7 @@ from synapse.api.presence import UserPresenceState
 from synapse.federation.sender import AbstractFederationSender, FederationSender
 from synapse.metrics import LaterGauge
 from synapse.replication.tcp.streams.federation import FederationStream
-from synapse.types import JsonDict, ReadReceipt, RoomStreamToken
+from synapse.types import JsonDict, ReadReceipt, RoomStreamToken, StrCollection
 from synapse.util.metrics import Measure
 
 from .units import Edu
@@ -229,7 +229,7 @@ class FederationRemoteSendQueue(AbstractFederationSender):
         """
         # nothing to do here: the replication listener will handle it.
 
-    def send_presence_to_destinations(
+    async def send_presence_to_destinations(
         self, states: Iterable[UserPresenceState], destinations: Iterable[str]
     ) -> None:
         """As per FederationSender
@@ -245,7 +245,9 @@ class FederationRemoteSendQueue(AbstractFederationSender):
 
         self.notifier.on_new_replication_data()
 
-    def send_device_messages(self, destination: str, immediate: bool = True) -> None:
+    async def send_device_messages(
+        self, destinations: StrCollection, immediate: bool = True
+    ) -> None:
         """As per FederationSender"""
         # We don't need to replicate this as it gets sent down a different
         # stream.
@@ -463,7 +465,7 @@ class ParsedFederationStreamData:
     edus: Dict[str, List[Edu]]
 
 
-def process_rows_for_federation(
+async def process_rows_for_federation(
     transaction_queue: FederationSender,
     rows: List[FederationStream.FederationStreamRow],
 ) -> None:
@@ -496,7 +498,7 @@ def process_rows_for_federation(
         parsed_row.add_to_buffer(buff)
 
     for state, destinations in buff.presence_destinations:
-        transaction_queue.send_presence_to_destinations(
+        await transaction_queue.send_presence_to_destinations(
             states=[state], destinations=destinations
         )