summary refs log tree commit diff
path: root/synapse/federation/sender
diff options
context:
space:
mode:
authorMathieu Velten <mathieuv@matrix.org>2023-06-16 12:15:12 +0200
committerGitHub <noreply@github.com>2023-06-16 10:15:12 +0000
commitf63d4a3a65e95d3845c43a9dd2893605b06f164a (patch)
tree235555c85b5fb77c2f61569364464c0db8682483 /synapse/federation/sender
parentFix unsafe hotserving behaviour for non-multimedia uploads. (#15680) (diff)
downloadsynapse-f63d4a3a65e95d3845c43a9dd2893605b06f164a.tar.xz
Regularly try to wake up dests instead of waiting for next PDU/EDU (#15743)
Diffstat (limited to 'synapse/federation/sender')
-rw-r--r--synapse/federation/sender/__init__.py34
1 files changed, 16 insertions, 18 deletions
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index f3bdc5a4d2..97abbdee18 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -109,10 +109,8 @@ was enabled*, Catch-Up Mode is exited and we return to `_transaction_transmissio
 
 If a remote server is unreachable over federation, we back off from that server,
 with an exponentially-increasing retry interval.
-Whilst we don't automatically retry after the interval, we prevent making new attempts
-until such time as the back-off has cleared.
-Once the back-off is cleared and a new PDU or EDU arrives for transmission, the transmission
-loop resumes and empties the queue by making federation requests.
+We automatically retry after the retry interval expires (roughly, the logic to do so
+being triggered every minute).
 
 If the backoff grows too large (> 1 hour), the in-memory queue is emptied (to prevent
 unbounded growth) and Catch-Up Mode is entered.
@@ -145,7 +143,6 @@ from prometheus_client import Counter
 from typing_extensions import Literal
 
 from twisted.internet import defer
-from twisted.internet.interfaces import IDelayedCall
 
 import synapse.metrics
 from synapse.api.presence import UserPresenceState
@@ -184,14 +181,18 @@ sent_pdus_destination_dist_total = Counter(
     "Total number of PDUs queued for sending across all destinations",
 )
 
-# Time (in s) after Synapse's startup that we will begin to wake up destinations
-# that have catch-up outstanding.
-CATCH_UP_STARTUP_DELAY_SEC = 15
+# Time (in s) to wait before trying to wake up destinations that have
+# catch-up outstanding. This will also be the delay applied at startup
+# before trying the same.
+# Please note that rate limiting still applies, so while the loop is
+# executed every X seconds the destinations may not be wake up because
+# they are being rate limited following previous attempt failures.
+WAKEUP_RETRY_PERIOD_SEC = 60
 
 # Time (in s) to wait in between waking up each destination, i.e. one destination
-# will be woken up every <x> seconds after Synapse's startup until we have woken
-# every destination has outstanding catch-up.
-CATCH_UP_STARTUP_INTERVAL_SEC = 5
+# will be woken up every <x> seconds until we have woken every destination
+# has outstanding catch-up.
+WAKEUP_INTERVAL_BETWEEN_DESTINATIONS_SEC = 5
 
 
 class AbstractFederationSender(metaclass=abc.ABCMeta):
@@ -415,12 +416,10 @@ class FederationSender(AbstractFederationSender):
             / hs.config.ratelimiting.federation_rr_transactions_per_room_per_second
         )
 
-        # wake up destinations that have outstanding PDUs to be caught up
-        self._catchup_after_startup_timer: Optional[
-            IDelayedCall
-        ] = self.clock.call_later(
-            CATCH_UP_STARTUP_DELAY_SEC,
+        # Regularly wake up destinations that have outstanding PDUs to be caught up
+        self.clock.looping_call(
             run_as_background_process,
+            WAKEUP_RETRY_PERIOD_SEC * 1000.0,
             "wake_destinations_needing_catchup",
             self._wake_destinations_needing_catchup,
         )
@@ -966,7 +965,6 @@ class FederationSender(AbstractFederationSender):
 
             if not destinations_to_wake:
                 # finished waking all destinations!
-                self._catchup_after_startup_timer = None
                 break
 
             last_processed = destinations_to_wake[-1]
@@ -983,4 +981,4 @@ class FederationSender(AbstractFederationSender):
                     last_processed,
                 )
                 self.wake_destination(destination)
-                await self.clock.sleep(CATCH_UP_STARTUP_INTERVAL_SEC)
+                await self.clock.sleep(WAKEUP_INTERVAL_BETWEEN_DESTINATIONS_SEC)