diff options
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/device.py | 26 | ||||
-rw-r--r-- | synapse/handlers/devicemessage.py | 7 | ||||
-rw-r--r-- | synapse/handlers/presence.py | 16 | ||||
-rw-r--r-- | synapse/handlers/typing.py | 14 |
4 files changed, 36 insertions, 27 deletions
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 5ae427d52c..763f56dfc1 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -836,17 +836,16 @@ class DeviceHandler(DeviceWorkerHandler): user_id, hosts, ) - for host in hosts: - self.federation_sender.send_device_messages( - host, immediate=False - ) - # TODO: when called, this isn't in a logging context. - # This leads to log spam, sentry event spam, and massive - # memory usage. - # See https://github.com/matrix-org/synapse/issues/12552. - # log_kv( - # {"message": "sent device update to host", "host": host} - # ) + await self.federation_sender.send_device_messages( + hosts, immediate=False + ) + # TODO: when called, this isn't in a logging context. + # This leads to log spam, sentry event spam, and massive + # memory usage. + # See https://github.com/matrix-org/synapse/issues/12552. + # log_kv( + # {"message": "sent device update to host", "host": host} + # ) if current_stream_id != stream_id: # Clear the set of hosts we've already sent to as we're @@ -951,8 +950,9 @@ class DeviceHandler(DeviceWorkerHandler): # Notify things that device lists need to be sent out. self.notifier.notify_replication() - for host in potentially_changed_hosts: - self.federation_sender.send_device_messages(host, immediate=False) + await self.federation_sender.send_device_messages( + potentially_changed_hosts, immediate=False + ) def _update_device_from_client_ips( diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py index 798c7039f9..1c79f7a61e 100644 --- a/synapse/handlers/devicemessage.py +++ b/synapse/handlers/devicemessage.py @@ -302,10 +302,9 @@ class DeviceMessageHandler: ) if self.federation_sender: - for destination in remote_messages.keys(): - # Enqueue a new federation transaction to send the new - # device messages to each remote destination. - self.federation_sender.send_device_messages(destination) + # Enqueue a new federation transaction to send the new + # device messages to each remote destination. + await self.federation_sender.send_device_messages(remote_messages.keys()) async def get_events_for_dehydrated_device( self, diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 2f841863ae..f31e18328b 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -354,7 +354,9 @@ class BasePresenceHandler(abc.ABC): ) for destination, host_states in hosts_to_states.items(): - self._federation.send_presence_to_destinations(host_states, [destination]) + await self._federation.send_presence_to_destinations( + host_states, [destination] + ) async def send_full_presence_to_users(self, user_ids: StrCollection) -> None: """ @@ -936,7 +938,7 @@ class PresenceHandler(BasePresenceHandler): ) for destination, states in hosts_to_states.items(): - self._federation_queue.send_presence_to_destinations( + await self._federation_queue.send_presence_to_destinations( states, [destination] ) @@ -1508,7 +1510,7 @@ class PresenceHandler(BasePresenceHandler): or state.status_msg is not None ] - self._federation_queue.send_presence_to_destinations( + await self._federation_queue.send_presence_to_destinations( destinations=newly_joined_remote_hosts, states=states, ) @@ -1519,7 +1521,7 @@ class PresenceHandler(BasePresenceHandler): prev_remote_hosts or newly_joined_remote_hosts ): local_states = await self.current_state_for_users(newly_joined_local_users) - self._federation_queue.send_presence_to_destinations( + await self._federation_queue.send_presence_to_destinations( destinations=prev_remote_hosts | newly_joined_remote_hosts, states=list(local_states.values()), ) @@ -2182,7 +2184,7 @@ class PresenceFederationQueue: index = bisect(self._queue, (clear_before,)) self._queue = self._queue[index:] - def send_presence_to_destinations( + async def send_presence_to_destinations( self, states: Collection[UserPresenceState], destinations: StrCollection ) -> None: """Send the presence states to the given destinations. @@ -2202,7 +2204,7 @@ class PresenceFederationQueue: return if self._federation: - self._federation.send_presence_to_destinations( + await self._federation.send_presence_to_destinations( states=states, destinations=destinations, ) @@ -2325,7 +2327,7 @@ class PresenceFederationQueue: for host, user_ids in hosts_to_users.items(): states = await self._presence_handler.current_state_for_users(user_ids) - self._federation.send_presence_to_destinations( + await self._federation.send_presence_to_destinations( states=states.values(), destinations=[host], ) diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 7aeae5319c..4b4227003d 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -26,9 +26,10 @@ from synapse.metrics.background_process_metrics import ( ) from synapse.replication.tcp.streams import TypingStream from synapse.streams import EventSource -from synapse.types import JsonDict, Requester, StreamKeyType, UserID +from synapse.types import JsonDict, Requester, StrCollection, StreamKeyType, UserID from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.metrics import Measure +from synapse.util.retryutils import filter_destinations_by_retry_limiter from synapse.util.wheel_timer import WheelTimer if TYPE_CHECKING: @@ -150,8 +151,15 @@ class FollowerTypingHandler: now=now, obj=member, then=now + FEDERATION_PING_INTERVAL ) - hosts = await self._storage_controllers.state.get_current_hosts_in_room( - member.room_id + hosts: StrCollection = ( + await self._storage_controllers.state.get_current_hosts_in_room( + member.room_id + ) + ) + hosts = await filter_destinations_by_retry_limiter( + hosts, + clock=self.clock, + store=self.store, ) for domain in hosts: if not self.is_mine_server_name(domain): |