diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 41a726878d..8bb17b3a05 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -55,6 +55,15 @@ sent_pdus_destination_dist_total = Counter(
"Total number of PDUs queued for sending across all destinations",
)
+# Time (in s) after Synapse's startup that we will begin to wake up destinations
+# that have catch-up outstanding.
+CATCH_UP_STARTUP_DELAY_SEC = 15
+
+# Time (in s) to wait in between waking up each destination, i.e. one destination
+# will be woken up every <x> seconds after Synapse's startup until we have woken
+# every destination has outstanding catch-up.
+CATCH_UP_STARTUP_INTERVAL_SEC = 5
+
class FederationSender:
def __init__(self, hs: "synapse.server.HomeServer"):
@@ -125,6 +134,14 @@ class FederationSender:
1000.0 / hs.config.federation_rr_transactions_per_room_per_second
)
+ # wake up destinations that have outstanding PDUs to be caught up
+ self._catchup_after_startup_timer = self.clock.call_later(
+ CATCH_UP_STARTUP_DELAY_SEC,
+ run_as_background_process,
+ "wake_destinations_needing_catchup",
+ self._wake_destinations_needing_catchup,
+ )
+
def _get_per_destination_queue(self, destination: str) -> PerDestinationQueue:
"""Get or create a PerDestinationQueue for the given destination
@@ -560,3 +577,37 @@ class FederationSender:
# Dummy implementation for case where federation sender isn't offloaded
# to a worker.
return [], 0, False
+
+ async def _wake_destinations_needing_catchup(self):
+ """
+ Wakes up destinations that need catch-up and are not currently being
+ backed off from.
+
+ In order to reduce load spikes, adds a delay between each destination.
+ """
+
+ last_processed = None # type: Optional[str]
+
+ while True:
+ destinations_to_wake = await self.store.get_catch_up_outstanding_destinations(
+ last_processed
+ )
+
+ if not destinations_to_wake:
+ # finished waking all destinations!
+ self._catchup_after_startup_timer = None
+ break
+
+ destinations_to_wake = [
+ d
+ for d in destinations_to_wake
+ if self._federation_shard_config.should_handle(self._instance_name, d)
+ ]
+
+ for last_processed in destinations_to_wake:
+ logger.info(
+ "Destination %s has outstanding catch-up, waking up.",
+ last_processed,
+ )
+ self.wake_destination(last_processed)
+ await self.clock.sleep(CATCH_UP_STARTUP_INTERVAL_SEC)
diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py
index 99cffff50c..97aed1500e 100644
--- a/synapse/storage/databases/main/transactions.py
+++ b/synapse/storage/databases/main/transactions.py
@@ -218,6 +218,7 @@ class TransactionStore(SQLBaseStore):
retry_interval = EXCLUDED.retry_interval
WHERE
EXCLUDED.retry_interval = 0
+ OR destinations.retry_interval IS NULL
OR destinations.retry_interval < EXCLUDED.retry_interval
"""
@@ -249,7 +250,11 @@ class TransactionStore(SQLBaseStore):
"retry_interval": retry_interval,
},
)
- elif retry_interval == 0 or prev_row["retry_interval"] < retry_interval:
+ elif (
+ retry_interval == 0
+ or prev_row["retry_interval"] is None
+ or prev_row["retry_interval"] < retry_interval
+ ):
self.db_pool.simple_update_one_txn(
txn,
"destinations",
@@ -397,7 +402,7 @@ class TransactionStore(SQLBaseStore):
@staticmethod
def _get_catch_up_room_event_ids_txn(
- txn, destination: str, last_successful_stream_ordering: int,
+ txn: LoggingTransaction, destination: str, last_successful_stream_ordering: int,
) -> List[str]:
q = """
SELECT event_id FROM destination_rooms
@@ -412,3 +417,60 @@ class TransactionStore(SQLBaseStore):
)
event_ids = [row[0] for row in txn]
return event_ids
+
+ async def get_catch_up_outstanding_destinations(
+ self, after_destination: Optional[str]
+ ) -> List[str]:
+ """
+ Gets at most 25 destinations which have outstanding PDUs to be caught up,
+ and are not being backed off from
+ Args:
+ after_destination:
+ If provided, all destinations must be lexicographically greater
+ than this one.
+
+ Returns:
+ list of up to 25 destinations with outstanding catch-up.
+ These are the lexicographically first destinations which are
+ lexicographically greater than after_destination (if provided).
+ """
+ time = self.hs.get_clock().time_msec()
+
+ return await self.db_pool.runInteraction(
+ "get_catch_up_outstanding_destinations",
+ self._get_catch_up_outstanding_destinations_txn,
+ time,
+ after_destination,
+ )
+
+ @staticmethod
+ def _get_catch_up_outstanding_destinations_txn(
+ txn: LoggingTransaction, now_time_ms: int, after_destination: Optional[str]
+ ) -> List[str]:
+ q = """
+ SELECT destination FROM destinations
+ WHERE destination IN (
+ SELECT destination FROM destination_rooms
+ WHERE destination_rooms.stream_ordering >
+ destinations.last_successful_stream_ordering
+ )
+ AND destination > ?
+ AND (
+ retry_last_ts IS NULL OR
+ retry_last_ts + retry_interval < ?
+ )
+ ORDER BY destination
+ LIMIT 25
+ """
+ txn.execute(
+ q,
+ (
+ # everything is lexicographically greater than "" so this gives
+ # us the first batch of up to 25.
+ after_destination or "",
+ now_time_ms,
+ ),
+ )
+
+ destinations = [row[0] for row in txn]
+ return destinations
|