From 36efbcaf511790d6f1dd7df2260900f07489bda6 Mon Sep 17 00:00:00 2001 From: reivilibre <38398653+reivilibre@users.noreply.github.com> Date: Fri, 18 Sep 2020 14:59:13 +0100 Subject: Catch-up after Federation Outage (bonus): Catch-up on Synapse Startup (#8322) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Olivier Wilkinson (reivilibre) Co-authored-by: Patrick Cloke * Fix _set_destination_retry_timings This came about because the code assumed that retry_interval could not be NULL — which has been challenged by catch-up. --- synapse/federation/sender/__init__.py | 51 +++++++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) (limited to 'synapse/federation/sender') diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 41a726878d..8bb17b3a05 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -55,6 +55,15 @@ sent_pdus_destination_dist_total = Counter( "Total number of PDUs queued for sending across all destinations", ) +# Time (in s) after Synapse's startup that we will begin to wake up destinations +# that have catch-up outstanding. +CATCH_UP_STARTUP_DELAY_SEC = 15 + +# Time (in s) to wait in between waking up each destination, i.e. one destination +# will be woken up every seconds after Synapse's startup until we have woken +# every destination has outstanding catch-up. +CATCH_UP_STARTUP_INTERVAL_SEC = 5 + class FederationSender: def __init__(self, hs: "synapse.server.HomeServer"): @@ -125,6 +134,14 @@ class FederationSender: 1000.0 / hs.config.federation_rr_transactions_per_room_per_second ) + # wake up destinations that have outstanding PDUs to be caught up + self._catchup_after_startup_timer = self.clock.call_later( + CATCH_UP_STARTUP_DELAY_SEC, + run_as_background_process, + "wake_destinations_needing_catchup", + self._wake_destinations_needing_catchup, + ) + def _get_per_destination_queue(self, destination: str) -> PerDestinationQueue: """Get or create a PerDestinationQueue for the given destination @@ -560,3 +577,37 @@ class FederationSender: # Dummy implementation for case where federation sender isn't offloaded # to a worker. return [], 0, False + + async def _wake_destinations_needing_catchup(self): + """ + Wakes up destinations that need catch-up and are not currently being + backed off from. + + In order to reduce load spikes, adds a delay between each destination. + """ + + last_processed = None # type: Optional[str] + + while True: + destinations_to_wake = await self.store.get_catch_up_outstanding_destinations( + last_processed + ) + + if not destinations_to_wake: + # finished waking all destinations! + self._catchup_after_startup_timer = None + break + + destinations_to_wake = [ + d + for d in destinations_to_wake + if self._federation_shard_config.should_handle(self._instance_name, d) + ] + + for last_processed in destinations_to_wake: + logger.info( + "Destination %s has outstanding catch-up, waking up.", + last_processed, + ) + self.wake_destination(last_processed) + await self.clock.sleep(CATCH_UP_STARTUP_INTERVAL_SEC) -- cgit 1.4.1