diff options
Diffstat (limited to 'synapse/util')
-rw-r--r-- | synapse/util/async_helpers.py | 57 | ||||
-rw-r--r-- | synapse/util/retryutils.py | 24 |
2 files changed, 80 insertions, 1 deletions
diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index b91020117f..7f1d41eb3c 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -778,3 +778,60 @@ def delay_cancellation(awaitable: Awaitable[T]) -> Awaitable[T]: new_deferred: "defer.Deferred[T]" = defer.Deferred(handle_cancel) deferred.chainDeferred(new_deferred) return new_deferred + + +class AwakenableSleeper: + """Allows explicitly waking up deferreds related to an entity that are + currently sleeping. + """ + + def __init__(self, reactor: IReactorTime) -> None: + self._streams: Dict[str, Set[defer.Deferred[None]]] = {} + self._reactor = reactor + + def wake(self, name: str) -> None: + """Wake everything related to `name` that is currently sleeping.""" + stream_set = self._streams.pop(name, set()) + for deferred in stream_set: + try: + with PreserveLoggingContext(): + deferred.callback(None) + except Exception: + pass + + async def sleep(self, name: str, delay_ms: int) -> None: + """Sleep for the given number of milliseconds, or return if the given + `name` is explicitly woken up. + """ + + # Create a deferred that gets called in N seconds + sleep_deferred: "defer.Deferred[None]" = defer.Deferred() + call = self._reactor.callLater(delay_ms / 1000, sleep_deferred.callback, None) + + # Create a deferred that will get called if `wake` is called with + # the same `name`. + stream_set = self._streams.setdefault(name, set()) + notify_deferred: "defer.Deferred[None]" = defer.Deferred() + stream_set.add(notify_deferred) + + try: + # Wait for either the delay or for `wake` to be called. + await make_deferred_yieldable( + defer.DeferredList( + [sleep_deferred, notify_deferred], + fireOnOneCallback=True, + fireOnOneErrback=True, + consumeErrors=True, + ) + ) + finally: + # Clean up the state + curr_stream_set = self._streams.get(name) + if curr_stream_set is not None: + curr_stream_set.discard(notify_deferred) + if len(curr_stream_set) == 0: + self._streams.pop(name) + + # Cancel the sleep if we were woken up + if call.active(): + call.cancel() diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py index d81f2527d7..81bfed268e 100644 --- a/synapse/util/retryutils.py +++ b/synapse/util/retryutils.py @@ -14,13 +14,17 @@ import logging import random from types import TracebackType -from typing import Any, Optional, Type +from typing import TYPE_CHECKING, Any, Optional, Type import synapse.logging.context from synapse.api.errors import CodeMessageException from synapse.storage import DataStore from synapse.util import Clock +if TYPE_CHECKING: + from synapse.notifier import Notifier + from synapse.replication.tcp.handler import ReplicationCommandHandler + logger = logging.getLogger(__name__) # the initial backoff, after the first transaction fails @@ -131,6 +135,8 @@ class RetryDestinationLimiter: retry_interval: int, backoff_on_404: bool = False, backoff_on_failure: bool = True, + notifier: Optional["Notifier"] = None, + replication_client: Optional["ReplicationCommandHandler"] = None, ): """Marks the destination as "down" if an exception is thrown in the context, except for CodeMessageException with code < 500. @@ -160,6 +166,9 @@ class RetryDestinationLimiter: self.backoff_on_404 = backoff_on_404 self.backoff_on_failure = backoff_on_failure + self.notifier = notifier + self.replication_client = replication_client + def __enter__(self) -> None: pass @@ -239,6 +248,19 @@ class RetryDestinationLimiter: retry_last_ts, self.retry_interval, ) + + if self.notifier: + # Inform the relevant places that the remote server is back up. + self.notifier.notify_remote_server_up(self.destination) + + if self.replication_client: + # If we're on a worker we try and inform master about this. The + # replication client doesn't hook into the notifier to avoid + # infinite loops where we send a `REMOTE_SERVER_UP` command to + # master, which then echoes it back to us which in turn pokes + # the notifier. + self.replication_client.send_remote_server_up(self.destination) + except Exception: logger.exception("Failed to store destination_retry_timings") |