diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 5ae427d52c..763f56dfc1 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -836,17 +836,16 @@ class DeviceHandler(DeviceWorkerHandler):
user_id,
hosts,
)
- for host in hosts:
- self.federation_sender.send_device_messages(
- host, immediate=False
- )
- # TODO: when called, this isn't in a logging context.
- # This leads to log spam, sentry event spam, and massive
- # memory usage.
- # See https://github.com/matrix-org/synapse/issues/12552.
- # log_kv(
- # {"message": "sent device update to host", "host": host}
- # )
+ await self.federation_sender.send_device_messages(
+ hosts, immediate=False
+ )
+ # TODO: when called, this isn't in a logging context.
+ # This leads to log spam, sentry event spam, and massive
+ # memory usage.
+ # See https://github.com/matrix-org/synapse/issues/12552.
+ # log_kv(
+ # {"message": "sent device update to host", "host": host}
+ # )
if current_stream_id != stream_id:
# Clear the set of hosts we've already sent to as we're
@@ -951,8 +950,9 @@ class DeviceHandler(DeviceWorkerHandler):
# Notify things that device lists need to be sent out.
self.notifier.notify_replication()
- for host in potentially_changed_hosts:
- self.federation_sender.send_device_messages(host, immediate=False)
+ await self.federation_sender.send_device_messages(
+ potentially_changed_hosts, immediate=False
+ )
def _update_device_from_client_ips(
diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py
index 798c7039f9..1c79f7a61e 100644
--- a/synapse/handlers/devicemessage.py
+++ b/synapse/handlers/devicemessage.py
@@ -302,10 +302,9 @@ class DeviceMessageHandler:
)
if self.federation_sender:
- for destination in remote_messages.keys():
- # Enqueue a new federation transaction to send the new
- # device messages to each remote destination.
- self.federation_sender.send_device_messages(destination)
+ # Enqueue a new federation transaction to send the new
+ # device messages to each remote destination.
+ await self.federation_sender.send_device_messages(remote_messages.keys())
async def get_events_for_dehydrated_device(
self,
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 2f841863ae..f31e18328b 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -354,7 +354,9 @@ class BasePresenceHandler(abc.ABC):
)
for destination, host_states in hosts_to_states.items():
- self._federation.send_presence_to_destinations(host_states, [destination])
+ await self._federation.send_presence_to_destinations(
+ host_states, [destination]
+ )
async def send_full_presence_to_users(self, user_ids: StrCollection) -> None:
"""
@@ -936,7 +938,7 @@ class PresenceHandler(BasePresenceHandler):
)
for destination, states in hosts_to_states.items():
- self._federation_queue.send_presence_to_destinations(
+ await self._federation_queue.send_presence_to_destinations(
states, [destination]
)
@@ -1508,7 +1510,7 @@ class PresenceHandler(BasePresenceHandler):
or state.status_msg is not None
]
- self._federation_queue.send_presence_to_destinations(
+ await self._federation_queue.send_presence_to_destinations(
destinations=newly_joined_remote_hosts,
states=states,
)
@@ -1519,7 +1521,7 @@ class PresenceHandler(BasePresenceHandler):
prev_remote_hosts or newly_joined_remote_hosts
):
local_states = await self.current_state_for_users(newly_joined_local_users)
- self._federation_queue.send_presence_to_destinations(
+ await self._federation_queue.send_presence_to_destinations(
destinations=prev_remote_hosts | newly_joined_remote_hosts,
states=list(local_states.values()),
)
@@ -2182,7 +2184,7 @@ class PresenceFederationQueue:
index = bisect(self._queue, (clear_before,))
self._queue = self._queue[index:]
- def send_presence_to_destinations(
+ async def send_presence_to_destinations(
self, states: Collection[UserPresenceState], destinations: StrCollection
) -> None:
"""Send the presence states to the given destinations.
@@ -2202,7 +2204,7 @@ class PresenceFederationQueue:
return
if self._federation:
- self._federation.send_presence_to_destinations(
+ await self._federation.send_presence_to_destinations(
states=states,
destinations=destinations,
)
@@ -2325,7 +2327,7 @@ class PresenceFederationQueue:
for host, user_ids in hosts_to_users.items():
states = await self._presence_handler.current_state_for_users(user_ids)
- self._federation.send_presence_to_destinations(
+ await self._federation.send_presence_to_destinations(
states=states.values(),
destinations=[host],
)
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 7aeae5319c..4b4227003d 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -26,9 +26,10 @@ from synapse.metrics.background_process_metrics import (
)
from synapse.replication.tcp.streams import TypingStream
from synapse.streams import EventSource
-from synapse.types import JsonDict, Requester, StreamKeyType, UserID
+from synapse.types import JsonDict, Requester, StrCollection, StreamKeyType, UserID
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.metrics import Measure
+from synapse.util.retryutils import filter_destinations_by_retry_limiter
from synapse.util.wheel_timer import WheelTimer
if TYPE_CHECKING:
@@ -150,8 +151,15 @@ class FollowerTypingHandler:
now=now, obj=member, then=now + FEDERATION_PING_INTERVAL
)
- hosts = await self._storage_controllers.state.get_current_hosts_in_room(
- member.room_id
+ hosts: StrCollection = (
+ await self._storage_controllers.state.get_current_hosts_in_room(
+ member.room_id
+ )
+ )
+ hosts = await filter_destinations_by_retry_limiter(
+ hosts,
+ clock=self.clock,
+ store=self.store,
)
for domain in hosts:
if not self.is_mine_server_name(domain):
|