Catch-up after Federation Outage (bonus): Catch-up on Synapse Startup (#8322)
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
Co-authored-by: Patrick Cloke <clokep@users.noreply.github.com>
* Fix _set_destination_retry_timings
This came about because the code assumed that retry_interval
could not be NULL — which has been challenged by catch-up.
1 files changed, 51 insertions, 0 deletions
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 41a726878d..8bb17b3a05 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -55,6 +55,15 @@ sent_pdus_destination_dist_total = Counter(
"Total number of PDUs queued for sending across all destinations",
)
+# Time (in s) after Synapse's startup that we will begin to wake up destinations
+# that have catch-up outstanding.
+CATCH_UP_STARTUP_DELAY_SEC = 15
+
+# Time (in s) to wait in between waking up each destination, i.e. one destination
+# will be woken up every <x> seconds after Synapse's startup until we have woken
+# every destination has outstanding catch-up.
+CATCH_UP_STARTUP_INTERVAL_SEC = 5
+
class FederationSender:
def __init__(self, hs: "synapse.server.HomeServer"):
@@ -125,6 +134,14 @@ class FederationSender:
1000.0 / hs.config.federation_rr_transactions_per_room_per_second
)
+ # wake up destinations that have outstanding PDUs to be caught up
+ self._catchup_after_startup_timer = self.clock.call_later(
+ CATCH_UP_STARTUP_DELAY_SEC,
+ run_as_background_process,
+ "wake_destinations_needing_catchup",
+ self._wake_destinations_needing_catchup,
+ )
+
def _get_per_destination_queue(self, destination: str) -> PerDestinationQueue:
"""Get or create a PerDestinationQueue for the given destination
@@ -560,3 +577,37 @@ class FederationSender:
# Dummy implementation for case where federation sender isn't offloaded
# to a worker.
return [], 0, False
+
+ async def _wake_destinations_needing_catchup(self):
+ """
+ Wakes up destinations that need catch-up and are not currently being
+ backed off from.
+
+ In order to reduce load spikes, adds a delay between each destination.
+ """
+
+ last_processed = None # type: Optional[str]
+
+ while True:
+ destinations_to_wake = await self.store.get_catch_up_outstanding_destinations(
+ last_processed
+ )
+
+ if not destinations_to_wake:
+ # finished waking all destinations!
+ self._catchup_after_startup_timer = None
+ break
+
+ destinations_to_wake = [
+ d
+ for d in destinations_to_wake
+ if self._federation_shard_config.should_handle(self._instance_name, d)
+ ]
+
+ for last_processed in destinations_to_wake:
+ logger.info(
+ "Destination %s has outstanding catch-up, waking up.",
+ last_processed,
+ )
+ self.wake_destination(last_processed)
+ await self.clock.sleep(CATCH_UP_STARTUP_INTERVAL_SEC)
|