diff options
Diffstat (limited to 'synapse/federation')
-rw-r--r-- | synapse/federation/sender/__init__.py | 51 |
1 files changed, 51 insertions, 0 deletions
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 41a726878d..8bb17b3a05 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -55,6 +55,15 @@ sent_pdus_destination_dist_total = Counter( "Total number of PDUs queued for sending across all destinations", ) +# Time (in s) after Synapse's startup that we will begin to wake up destinations +# that have catch-up outstanding. +CATCH_UP_STARTUP_DELAY_SEC = 15 + +# Time (in s) to wait in between waking up each destination, i.e. one destination +# will be woken up every <x> seconds after Synapse's startup until we have woken +# every destination has outstanding catch-up. +CATCH_UP_STARTUP_INTERVAL_SEC = 5 + class FederationSender: def __init__(self, hs: "synapse.server.HomeServer"): @@ -125,6 +134,14 @@ class FederationSender: 1000.0 / hs.config.federation_rr_transactions_per_room_per_second ) + # wake up destinations that have outstanding PDUs to be caught up + self._catchup_after_startup_timer = self.clock.call_later( + CATCH_UP_STARTUP_DELAY_SEC, + run_as_background_process, + "wake_destinations_needing_catchup", + self._wake_destinations_needing_catchup, + ) + def _get_per_destination_queue(self, destination: str) -> PerDestinationQueue: """Get or create a PerDestinationQueue for the given destination @@ -560,3 +577,37 @@ class FederationSender: # Dummy implementation for case where federation sender isn't offloaded # to a worker. return [], 0, False + + async def _wake_destinations_needing_catchup(self): + """ + Wakes up destinations that need catch-up and are not currently being + backed off from. + + In order to reduce load spikes, adds a delay between each destination. + """ + + last_processed = None # type: Optional[str] + + while True: + destinations_to_wake = await self.store.get_catch_up_outstanding_destinations( + last_processed + ) + + if not destinations_to_wake: + # finished waking all destinations! + self._catchup_after_startup_timer = None + break + + destinations_to_wake = [ + d + for d in destinations_to_wake + if self._federation_shard_config.should_handle(self._instance_name, d) + ] + + for last_processed in destinations_to_wake: + logger.info( + "Destination %s has outstanding catch-up, waking up.", + last_processed, + ) + self.wake_destination(last_processed) + await self.clock.sleep(CATCH_UP_STARTUP_INTERVAL_SEC) |