summary refs log tree commit diff
path: root/synapse/federation/sender/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation/sender/__init__.py')
-rw-r--r--synapse/federation/sender/__init__.py86
1 files changed, 61 insertions, 25 deletions
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py

index 97abbdee18..fb20fd8a10 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py
@@ -147,7 +147,10 @@ from twisted.internet import defer import synapse.metrics from synapse.api.presence import UserPresenceState from synapse.events import EventBase -from synapse.federation.sender.per_destination_queue import PerDestinationQueue +from synapse.federation.sender.per_destination_queue import ( + CATCHUP_RETRY_INTERVAL, + PerDestinationQueue, +) from synapse.federation.sender.transaction_manager import TransactionManager from synapse.federation.units import Edu from synapse.logging.context import make_deferred_yieldable, run_in_background @@ -161,9 +164,10 @@ from synapse.metrics.background_process_metrics import ( run_as_background_process, wrap_as_background_process, ) -from synapse.types import JsonDict, ReadReceipt, RoomStreamToken +from synapse.types import JsonDict, ReadReceipt, RoomStreamToken, StrCollection from synapse.util import Clock from synapse.util.metrics import Measure +from synapse.util.retryutils import filter_destinations_by_retry_limiter if TYPE_CHECKING: from synapse.events.presence_router import PresenceRouter @@ -213,7 +217,7 @@ class AbstractFederationSender(metaclass=abc.ABCMeta): raise NotImplementedError() @abc.abstractmethod - def send_presence_to_destinations( + async def send_presence_to_destinations( self, states: Iterable[UserPresenceState], destinations: Iterable[str] ) -> None: """Send the given presence states to the given destinations. @@ -242,9 +246,11 @@ class AbstractFederationSender(metaclass=abc.ABCMeta): raise NotImplementedError() @abc.abstractmethod - def send_device_messages(self, destination: str, immediate: bool = True) -> None: + async def send_device_messages( + self, destinations: StrCollection, immediate: bool = True + ) -> None: """Tells the sender that a new device message is ready to be sent to the - destination. The `immediate` flag specifies whether the messages should + destinations. The `immediate` flag specifies whether the messages should be tried to be sent immediately, or whether it can be delayed for a short while (to aid performance). """ @@ -716,6 +722,13 @@ class FederationSender(AbstractFederationSender): pdu.internal_metadata.stream_ordering, ) + destinations = await filter_destinations_by_retry_limiter( + destinations, + clock=self.clock, + store=self.store, + retry_due_within_ms=CATCHUP_RETRY_INTERVAL, + ) + for destination in destinations: self._get_per_destination_queue(destination).send_pdu(pdu) @@ -763,12 +776,20 @@ class FederationSender(AbstractFederationSender): domains_set = await self._storage_controllers.state.get_current_hosts_in_room_or_partial_state_approximation( room_id ) - domains = [ + domains: StrCollection = [ d for d in domains_set if not self.is_mine_server_name(d) and self._federation_shard_config.should_handle(self._instance_name, d) ] + + domains = await filter_destinations_by_retry_limiter( + domains, + clock=self.clock, + store=self.store, + retry_due_within_ms=CATCHUP_RETRY_INTERVAL, + ) + if not domains: return @@ -816,7 +837,7 @@ class FederationSender(AbstractFederationSender): for queue in queues: queue.flush_read_receipts_for_room(room_id) - def send_presence_to_destinations( + async def send_presence_to_destinations( self, states: Iterable[UserPresenceState], destinations: Iterable[str] ) -> None: """Send the given presence states to the given destinations. @@ -831,13 +852,20 @@ class FederationSender(AbstractFederationSender): for state in states: assert self.is_mine_id(state.user_id) + destinations = await filter_destinations_by_retry_limiter( + [ + d + for d in destinations + if self._federation_shard_config.should_handle(self._instance_name, d) + ], + clock=self.clock, + store=self.store, + retry_due_within_ms=CATCHUP_RETRY_INTERVAL, + ) + for destination in destinations: if self.is_mine_server_name(destination): continue - if not self._federation_shard_config.should_handle( - self._instance_name, destination - ): - continue self._get_per_destination_queue(destination).send_presence( states, start_loop=False @@ -896,21 +924,29 @@ class FederationSender(AbstractFederationSender): else: queue.send_edu(edu) - def send_device_messages(self, destination: str, immediate: bool = True) -> None: - if self.is_mine_server_name(destination): - logger.warning("Not sending device update to ourselves") - return - - if not self._federation_shard_config.should_handle( - self._instance_name, destination - ): - return + async def send_device_messages( + self, destinations: StrCollection, immediate: bool = True + ) -> None: + destinations = await filter_destinations_by_retry_limiter( + [ + destination + for destination in destinations + if self._federation_shard_config.should_handle( + self._instance_name, destination + ) + and not self.is_mine_server_name(destination) + ], + clock=self.clock, + store=self.store, + retry_due_within_ms=CATCHUP_RETRY_INTERVAL, + ) - if immediate: - self._get_per_destination_queue(destination).attempt_new_transaction() - else: - self._get_per_destination_queue(destination).mark_new_data() - self._destination_wakeup_queue.add_to_queue(destination) + for destination in destinations: + if immediate: + self._get_per_destination_queue(destination).attempt_new_transaction() + else: + self._get_per_destination_queue(destination).mark_new_data() + self._destination_wakeup_queue.add_to_queue(destination) def wake_destination(self, destination: str) -> None: """Called when we want to retry sending transactions to a remote.