diff --git a/changelog.d/15743.misc b/changelog.d/15743.misc
new file mode 100644
index 0000000000..b95eed929e
--- /dev/null
+++ b/changelog.d/15743.misc
@@ -0,0 +1 @@
+Regularly try to send transactions to other servers after they failed instead of waiting for a new event to be available before trying.
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index f3bdc5a4d2..97abbdee18 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -109,10 +109,8 @@ was enabled*, Catch-Up Mode is exited and we return to `_transaction_transmissio
If a remote server is unreachable over federation, we back off from that server,
with an exponentially-increasing retry interval.
-Whilst we don't automatically retry after the interval, we prevent making new attempts
-until such time as the back-off has cleared.
-Once the back-off is cleared and a new PDU or EDU arrives for transmission, the transmission
-loop resumes and empties the queue by making federation requests.
+We automatically retry after the retry interval expires (roughly, the logic to do so
+being triggered every minute).
If the backoff grows too large (> 1 hour), the in-memory queue is emptied (to prevent
unbounded growth) and Catch-Up Mode is entered.
@@ -145,7 +143,6 @@ from prometheus_client import Counter
from typing_extensions import Literal
from twisted.internet import defer
-from twisted.internet.interfaces import IDelayedCall
import synapse.metrics
from synapse.api.presence import UserPresenceState
@@ -184,14 +181,18 @@ sent_pdus_destination_dist_total = Counter(
"Total number of PDUs queued for sending across all destinations",
)
-# Time (in s) after Synapse's startup that we will begin to wake up destinations
-# that have catch-up outstanding.
-CATCH_UP_STARTUP_DELAY_SEC = 15
+# Time (in s) to wait before trying to wake up destinations that have
+# catch-up outstanding. This will also be the delay applied at startup
+# before trying the same.
+# Please note that rate limiting still applies, so while the loop is
+# executed every X seconds the destinations may not be wake up because
+# they are being rate limited following previous attempt failures.
+WAKEUP_RETRY_PERIOD_SEC = 60
# Time (in s) to wait in between waking up each destination, i.e. one destination
-# will be woken up every <x> seconds after Synapse's startup until we have woken
-# every destination has outstanding catch-up.
-CATCH_UP_STARTUP_INTERVAL_SEC = 5
+# will be woken up every <x> seconds until we have woken every destination
+# has outstanding catch-up.
+WAKEUP_INTERVAL_BETWEEN_DESTINATIONS_SEC = 5
class AbstractFederationSender(metaclass=abc.ABCMeta):
@@ -415,12 +416,10 @@ class FederationSender(AbstractFederationSender):
/ hs.config.ratelimiting.federation_rr_transactions_per_room_per_second
)
- # wake up destinations that have outstanding PDUs to be caught up
- self._catchup_after_startup_timer: Optional[
- IDelayedCall
- ] = self.clock.call_later(
- CATCH_UP_STARTUP_DELAY_SEC,
+ # Regularly wake up destinations that have outstanding PDUs to be caught up
+ self.clock.looping_call(
run_as_background_process,
+ WAKEUP_RETRY_PERIOD_SEC * 1000.0,
"wake_destinations_needing_catchup",
self._wake_destinations_needing_catchup,
)
@@ -966,7 +965,6 @@ class FederationSender(AbstractFederationSender):
if not destinations_to_wake:
# finished waking all destinations!
- self._catchup_after_startup_timer = None
break
last_processed = destinations_to_wake[-1]
@@ -983,4 +981,4 @@ class FederationSender(AbstractFederationSender):
last_processed,
)
self.wake_destination(destination)
- await self.clock.sleep(CATCH_UP_STARTUP_INTERVAL_SEC)
+ await self.clock.sleep(WAKEUP_INTERVAL_BETWEEN_DESTINATIONS_SEC)
diff --git a/tests/federation/test_federation_catch_up.py b/tests/federation/test_federation_catch_up.py
index 391ae51707..b290b020a2 100644
--- a/tests/federation/test_federation_catch_up.py
+++ b/tests/federation/test_federation_catch_up.py
@@ -431,28 +431,24 @@ class FederationCatchUpTestCases(FederatingHomeserverTestCase):
# ACT: call _wake_destinations_needing_catchup
# patch wake_destination to just count the destinations instead
- woken = []
+ woken = set()
def wake_destination_track(destination: str) -> None:
- woken.append(destination)
+ woken.add(destination)
self.federation_sender.wake_destination = wake_destination_track # type: ignore[assignment]
- # cancel the pre-existing timer for _wake_destinations_needing_catchup
- # this is because we are calling it manually rather than waiting for it
- # to be called automatically
- assert self.federation_sender._catchup_after_startup_timer is not None
- self.federation_sender._catchup_after_startup_timer.cancel()
-
- self.get_success(
- self.federation_sender._wake_destinations_needing_catchup(), by=5.0
- )
+ # We wait quite long so that all dests can be woken up, since there is a delay
+ # between them.
+ self.pump(by=5.0)
# ASSERT (_wake_destinations_needing_catchup):
# - all remotes are woken up, save for zzzerver
self.assertNotIn("zzzerver", woken)
- # - all destinations are woken exactly once; they appear once in woken.
- self.assertCountEqual(woken, server_names[:-1])
+ # - all destinations are woken, potentially more than once, since the
+ # wake up is called regularly and we don't ack in this test that a transaction
+ # has been successfully sent.
+ self.assertCountEqual(woken, set(server_names[:-1]))
def test_not_latest_event(self) -> None:
"""Test that we send the latest event in the room even if its not ours."""
|