Immediately retry any requests that have backed off when a server comes back online. (#12500)
Otherwise it can take up to a minute for any in-flight `/send` requests to be retried.
1 files changed, 57 insertions, 0 deletions
diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py
index b91020117f..7f1d41eb3c 100644
--- a/synapse/util/async_helpers.py
+++ b/synapse/util/async_helpers.py
@@ -778,3 +778,60 @@ def delay_cancellation(awaitable: Awaitable[T]) -> Awaitable[T]:
new_deferred: "defer.Deferred[T]" = defer.Deferred(handle_cancel)
deferred.chainDeferred(new_deferred)
return new_deferred
+
+
+class AwakenableSleeper:
+ """Allows explicitly waking up deferreds related to an entity that are
+ currently sleeping.
+ """
+
+ def __init__(self, reactor: IReactorTime) -> None:
+ self._streams: Dict[str, Set[defer.Deferred[None]]] = {}
+ self._reactor = reactor
+
+ def wake(self, name: str) -> None:
+ """Wake everything related to `name` that is currently sleeping."""
+ stream_set = self._streams.pop(name, set())
+ for deferred in stream_set:
+ try:
+ with PreserveLoggingContext():
+ deferred.callback(None)
+ except Exception:
+ pass
+
+ async def sleep(self, name: str, delay_ms: int) -> None:
+ """Sleep for the given number of milliseconds, or return if the given
+ `name` is explicitly woken up.
+ """
+
+ # Create a deferred that gets called in N seconds
+ sleep_deferred: "defer.Deferred[None]" = defer.Deferred()
+ call = self._reactor.callLater(delay_ms / 1000, sleep_deferred.callback, None)
+
+ # Create a deferred that will get called if `wake` is called with
+ # the same `name`.
+ stream_set = self._streams.setdefault(name, set())
+ notify_deferred: "defer.Deferred[None]" = defer.Deferred()
+ stream_set.add(notify_deferred)
+
+ try:
+ # Wait for either the delay or for `wake` to be called.
+ await make_deferred_yieldable(
+ defer.DeferredList(
+ [sleep_deferred, notify_deferred],
+ fireOnOneCallback=True,
+ fireOnOneErrback=True,
+ consumeErrors=True,
+ )
+ )
+ finally:
+ # Clean up the state
+ curr_stream_set = self._streams.get(name)
+ if curr_stream_set is not None:
+ curr_stream_set.discard(notify_deferred)
+ if len(curr_stream_set) == 0:
+ self._streams.pop(name)
+
+ # Cancel the sleep if we were woken up
+ if call.active():
+ call.cancel()
|