diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 97abbdee18..fb20fd8a10 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -147,7 +147,10 @@ from twisted.internet import defer
import synapse.metrics
from synapse.api.presence import UserPresenceState
from synapse.events import EventBase
-from synapse.federation.sender.per_destination_queue import PerDestinationQueue
+from synapse.federation.sender.per_destination_queue import (
+ CATCHUP_RETRY_INTERVAL,
+ PerDestinationQueue,
+)
from synapse.federation.sender.transaction_manager import TransactionManager
from synapse.federation.units import Edu
from synapse.logging.context import make_deferred_yieldable, run_in_background
@@ -161,9 +164,10 @@ from synapse.metrics.background_process_metrics import (
run_as_background_process,
wrap_as_background_process,
)
-from synapse.types import JsonDict, ReadReceipt, RoomStreamToken
+from synapse.types import JsonDict, ReadReceipt, RoomStreamToken, StrCollection
from synapse.util import Clock
from synapse.util.metrics import Measure
+from synapse.util.retryutils import filter_destinations_by_retry_limiter
if TYPE_CHECKING:
from synapse.events.presence_router import PresenceRouter
@@ -213,7 +217,7 @@ class AbstractFederationSender(metaclass=abc.ABCMeta):
raise NotImplementedError()
@abc.abstractmethod
- def send_presence_to_destinations(
+ async def send_presence_to_destinations(
self, states: Iterable[UserPresenceState], destinations: Iterable[str]
) -> None:
"""Send the given presence states to the given destinations.
@@ -242,9 +246,11 @@ class AbstractFederationSender(metaclass=abc.ABCMeta):
raise NotImplementedError()
@abc.abstractmethod
- def send_device_messages(self, destination: str, immediate: bool = True) -> None:
+ async def send_device_messages(
+ self, destinations: StrCollection, immediate: bool = True
+ ) -> None:
"""Tells the sender that a new device message is ready to be sent to the
- destination. The `immediate` flag specifies whether the messages should
+ destinations. The `immediate` flag specifies whether the messages should
be tried to be sent immediately, or whether it can be delayed for a
short while (to aid performance).
"""
@@ -716,6 +722,13 @@ class FederationSender(AbstractFederationSender):
pdu.internal_metadata.stream_ordering,
)
+ destinations = await filter_destinations_by_retry_limiter(
+ destinations,
+ clock=self.clock,
+ store=self.store,
+ retry_due_within_ms=CATCHUP_RETRY_INTERVAL,
+ )
+
for destination in destinations:
self._get_per_destination_queue(destination).send_pdu(pdu)
@@ -763,12 +776,20 @@ class FederationSender(AbstractFederationSender):
domains_set = await self._storage_controllers.state.get_current_hosts_in_room_or_partial_state_approximation(
room_id
)
- domains = [
+ domains: StrCollection = [
d
for d in domains_set
if not self.is_mine_server_name(d)
and self._federation_shard_config.should_handle(self._instance_name, d)
]
+
+ domains = await filter_destinations_by_retry_limiter(
+ domains,
+ clock=self.clock,
+ store=self.store,
+ retry_due_within_ms=CATCHUP_RETRY_INTERVAL,
+ )
+
if not domains:
return
@@ -816,7 +837,7 @@ class FederationSender(AbstractFederationSender):
for queue in queues:
queue.flush_read_receipts_for_room(room_id)
- def send_presence_to_destinations(
+ async def send_presence_to_destinations(
self, states: Iterable[UserPresenceState], destinations: Iterable[str]
) -> None:
"""Send the given presence states to the given destinations.
@@ -831,13 +852,20 @@ class FederationSender(AbstractFederationSender):
for state in states:
assert self.is_mine_id(state.user_id)
+ destinations = await filter_destinations_by_retry_limiter(
+ [
+ d
+ for d in destinations
+ if self._federation_shard_config.should_handle(self._instance_name, d)
+ ],
+ clock=self.clock,
+ store=self.store,
+ retry_due_within_ms=CATCHUP_RETRY_INTERVAL,
+ )
+
for destination in destinations:
if self.is_mine_server_name(destination):
continue
- if not self._federation_shard_config.should_handle(
- self._instance_name, destination
- ):
- continue
self._get_per_destination_queue(destination).send_presence(
states, start_loop=False
@@ -896,21 +924,29 @@ class FederationSender(AbstractFederationSender):
else:
queue.send_edu(edu)
- def send_device_messages(self, destination: str, immediate: bool = True) -> None:
- if self.is_mine_server_name(destination):
- logger.warning("Not sending device update to ourselves")
- return
-
- if not self._federation_shard_config.should_handle(
- self._instance_name, destination
- ):
- return
+ async def send_device_messages(
+ self, destinations: StrCollection, immediate: bool = True
+ ) -> None:
+ destinations = await filter_destinations_by_retry_limiter(
+ [
+ destination
+ for destination in destinations
+ if self._federation_shard_config.should_handle(
+ self._instance_name, destination
+ )
+ and not self.is_mine_server_name(destination)
+ ],
+ clock=self.clock,
+ store=self.store,
+ retry_due_within_ms=CATCHUP_RETRY_INTERVAL,
+ )
- if immediate:
- self._get_per_destination_queue(destination).attempt_new_transaction()
- else:
- self._get_per_destination_queue(destination).mark_new_data()
- self._destination_wakeup_queue.add_to_queue(destination)
+ for destination in destinations:
+ if immediate:
+ self._get_per_destination_queue(destination).attempt_new_transaction()
+ else:
+ self._get_per_destination_queue(destination).mark_new_data()
+ self._destination_wakeup_queue.add_to_queue(destination)
def wake_destination(self, destination: str) -> None:
"""Called when we want to retry sending transactions to a remote.
|