From f63d4a3a65e95d3845c43a9dd2893605b06f164a Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Fri, 16 Jun 2023 12:15:12 +0200 Subject: Regularly try to wake up dests instead of waiting for next PDU/EDU (#15743) --- synapse/federation/sender/__init__.py | 34 ++++++++++++++++------------------ 1 file changed, 16 insertions(+), 18 deletions(-) (limited to 'synapse') diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index f3bdc5a4d2..97abbdee18 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -109,10 +109,8 @@ was enabled*, Catch-Up Mode is exited and we return to `_transaction_transmissio If a remote server is unreachable over federation, we back off from that server, with an exponentially-increasing retry interval. -Whilst we don't automatically retry after the interval, we prevent making new attempts -until such time as the back-off has cleared. -Once the back-off is cleared and a new PDU or EDU arrives for transmission, the transmission -loop resumes and empties the queue by making federation requests. +We automatically retry after the retry interval expires (roughly, the logic to do so +being triggered every minute). If the backoff grows too large (> 1 hour), the in-memory queue is emptied (to prevent unbounded growth) and Catch-Up Mode is entered. @@ -145,7 +143,6 @@ from prometheus_client import Counter from typing_extensions import Literal from twisted.internet import defer -from twisted.internet.interfaces import IDelayedCall import synapse.metrics from synapse.api.presence import UserPresenceState @@ -184,14 +181,18 @@ sent_pdus_destination_dist_total = Counter( "Total number of PDUs queued for sending across all destinations", ) -# Time (in s) after Synapse's startup that we will begin to wake up destinations -# that have catch-up outstanding. -CATCH_UP_STARTUP_DELAY_SEC = 15 +# Time (in s) to wait before trying to wake up destinations that have +# catch-up outstanding. This will also be the delay applied at startup +# before trying the same. +# Please note that rate limiting still applies, so while the loop is +# executed every X seconds the destinations may not be wake up because +# they are being rate limited following previous attempt failures. +WAKEUP_RETRY_PERIOD_SEC = 60 # Time (in s) to wait in between waking up each destination, i.e. one destination -# will be woken up every seconds after Synapse's startup until we have woken -# every destination has outstanding catch-up. -CATCH_UP_STARTUP_INTERVAL_SEC = 5 +# will be woken up every seconds until we have woken every destination +# has outstanding catch-up. +WAKEUP_INTERVAL_BETWEEN_DESTINATIONS_SEC = 5 class AbstractFederationSender(metaclass=abc.ABCMeta): @@ -415,12 +416,10 @@ class FederationSender(AbstractFederationSender): / hs.config.ratelimiting.federation_rr_transactions_per_room_per_second ) - # wake up destinations that have outstanding PDUs to be caught up - self._catchup_after_startup_timer: Optional[ - IDelayedCall - ] = self.clock.call_later( - CATCH_UP_STARTUP_DELAY_SEC, + # Regularly wake up destinations that have outstanding PDUs to be caught up + self.clock.looping_call( run_as_background_process, + WAKEUP_RETRY_PERIOD_SEC * 1000.0, "wake_destinations_needing_catchup", self._wake_destinations_needing_catchup, ) @@ -966,7 +965,6 @@ class FederationSender(AbstractFederationSender): if not destinations_to_wake: # finished waking all destinations! - self._catchup_after_startup_timer = None break last_processed = destinations_to_wake[-1] @@ -983,4 +981,4 @@ class FederationSender(AbstractFederationSender): last_processed, ) self.wake_destination(destination) - await self.clock.sleep(CATCH_UP_STARTUP_INTERVAL_SEC) + await self.clock.sleep(WAKEUP_INTERVAL_BETWEEN_DESTINATIONS_SEC) -- cgit 1.4.1