summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
authorErik Johnston <erikj@matrix.org>2023-09-04 17:14:09 +0100
committerGitHub <noreply@github.com>2023-09-04 17:14:09 +0100
commitd35bed8369514fe727b4fe1afb68f48cc8b2655a (patch)
tree7343ece3b82ac87a594865c4074623b45b0297b4 /synapse/handlers
parentAdd last_seen_ts to the admin users API (#16218) (diff)
downloadsynapse-d35bed8369514fe727b4fe1afb68f48cc8b2655a.tar.xz
Don't wake up destination transaction queue if they're not due for retry. (#16223)
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/device.py26
-rw-r--r--synapse/handlers/devicemessage.py7
-rw-r--r--synapse/handlers/presence.py16
-rw-r--r--synapse/handlers/typing.py14
4 files changed, 36 insertions, 27 deletions
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 5ae427d52c..763f56dfc1 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -836,17 +836,16 @@ class DeviceHandler(DeviceWorkerHandler):
                             user_id,
                             hosts,
                         )
-                        for host in hosts:
-                            self.federation_sender.send_device_messages(
-                                host, immediate=False
-                            )
-                            # TODO: when called, this isn't in a logging context.
-                            # This leads to log spam, sentry event spam, and massive
-                            # memory usage.
-                            # See https://github.com/matrix-org/synapse/issues/12552.
-                            # log_kv(
-                            #     {"message": "sent device update to host", "host": host}
-                            # )
+                        await self.federation_sender.send_device_messages(
+                            hosts, immediate=False
+                        )
+                        # TODO: when called, this isn't in a logging context.
+                        # This leads to log spam, sentry event spam, and massive
+                        # memory usage.
+                        # See https://github.com/matrix-org/synapse/issues/12552.
+                        # log_kv(
+                        #     {"message": "sent device update to host", "host": host}
+                        # )
 
                     if current_stream_id != stream_id:
                         # Clear the set of hosts we've already sent to as we're
@@ -951,8 +950,9 @@ class DeviceHandler(DeviceWorkerHandler):
 
         # Notify things that device lists need to be sent out.
         self.notifier.notify_replication()
-        for host in potentially_changed_hosts:
-            self.federation_sender.send_device_messages(host, immediate=False)
+        await self.federation_sender.send_device_messages(
+            potentially_changed_hosts, immediate=False
+        )
 
 
 def _update_device_from_client_ips(
diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py
index 798c7039f9..1c79f7a61e 100644
--- a/synapse/handlers/devicemessage.py
+++ b/synapse/handlers/devicemessage.py
@@ -302,10 +302,9 @@ class DeviceMessageHandler:
         )
 
         if self.federation_sender:
-            for destination in remote_messages.keys():
-                # Enqueue a new federation transaction to send the new
-                # device messages to each remote destination.
-                self.federation_sender.send_device_messages(destination)
+            # Enqueue a new federation transaction to send the new
+            # device messages to each remote destination.
+            await self.federation_sender.send_device_messages(remote_messages.keys())
 
     async def get_events_for_dehydrated_device(
         self,
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 2f841863ae..f31e18328b 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -354,7 +354,9 @@ class BasePresenceHandler(abc.ABC):
         )
 
         for destination, host_states in hosts_to_states.items():
-            self._federation.send_presence_to_destinations(host_states, [destination])
+            await self._federation.send_presence_to_destinations(
+                host_states, [destination]
+            )
 
     async def send_full_presence_to_users(self, user_ids: StrCollection) -> None:
         """
@@ -936,7 +938,7 @@ class PresenceHandler(BasePresenceHandler):
                 )
 
                 for destination, states in hosts_to_states.items():
-                    self._federation_queue.send_presence_to_destinations(
+                    await self._federation_queue.send_presence_to_destinations(
                         states, [destination]
                     )
 
@@ -1508,7 +1510,7 @@ class PresenceHandler(BasePresenceHandler):
                 or state.status_msg is not None
             ]
 
-            self._federation_queue.send_presence_to_destinations(
+            await self._federation_queue.send_presence_to_destinations(
                 destinations=newly_joined_remote_hosts,
                 states=states,
             )
@@ -1519,7 +1521,7 @@ class PresenceHandler(BasePresenceHandler):
             prev_remote_hosts or newly_joined_remote_hosts
         ):
             local_states = await self.current_state_for_users(newly_joined_local_users)
-            self._federation_queue.send_presence_to_destinations(
+            await self._federation_queue.send_presence_to_destinations(
                 destinations=prev_remote_hosts | newly_joined_remote_hosts,
                 states=list(local_states.values()),
             )
@@ -2182,7 +2184,7 @@ class PresenceFederationQueue:
         index = bisect(self._queue, (clear_before,))
         self._queue = self._queue[index:]
 
-    def send_presence_to_destinations(
+    async def send_presence_to_destinations(
         self, states: Collection[UserPresenceState], destinations: StrCollection
     ) -> None:
         """Send the presence states to the given destinations.
@@ -2202,7 +2204,7 @@ class PresenceFederationQueue:
             return
 
         if self._federation:
-            self._federation.send_presence_to_destinations(
+            await self._federation.send_presence_to_destinations(
                 states=states,
                 destinations=destinations,
             )
@@ -2325,7 +2327,7 @@ class PresenceFederationQueue:
 
         for host, user_ids in hosts_to_users.items():
             states = await self._presence_handler.current_state_for_users(user_ids)
-            self._federation.send_presence_to_destinations(
+            await self._federation.send_presence_to_destinations(
                 states=states.values(),
                 destinations=[host],
             )
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 7aeae5319c..4b4227003d 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -26,9 +26,10 @@ from synapse.metrics.background_process_metrics import (
 )
 from synapse.replication.tcp.streams import TypingStream
 from synapse.streams import EventSource
-from synapse.types import JsonDict, Requester, StreamKeyType, UserID
+from synapse.types import JsonDict, Requester, StrCollection, StreamKeyType, UserID
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 from synapse.util.metrics import Measure
+from synapse.util.retryutils import filter_destinations_by_retry_limiter
 from synapse.util.wheel_timer import WheelTimer
 
 if TYPE_CHECKING:
@@ -150,8 +151,15 @@ class FollowerTypingHandler:
                 now=now, obj=member, then=now + FEDERATION_PING_INTERVAL
             )
 
-            hosts = await self._storage_controllers.state.get_current_hosts_in_room(
-                member.room_id
+            hosts: StrCollection = (
+                await self._storage_controllers.state.get_current_hosts_in_room(
+                    member.room_id
+                )
+            )
+            hosts = await filter_destinations_by_retry_limiter(
+                hosts,
+                clock=self.clock,
+                store=self.store,
             )
             for domain in hosts:
                 if not self.is_mine_server_name(domain):