1 files changed, 43 insertions, 0 deletions
diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py
index 70139beef2..8618bb0651 100644
--- a/synapse/util/async_helpers.py
+++ b/synapse/util/async_helpers.py
@@ -885,3 +885,46 @@ class AwakenableSleeper:
# Cancel the sleep if we were woken up
if call.active():
call.cancel()
+
+
+class DeferredEvent:
+ """Like threading.Event but for async code"""
+
+ def __init__(self, reactor: IReactorTime) -> None:
+ self._reactor = reactor
+ self._deferred: "defer.Deferred[None]" = defer.Deferred()
+
+ def set(self) -> None:
+ if not self._deferred.called:
+ self._deferred.callback(None)
+
+ def clear(self) -> None:
+ if self._deferred.called:
+ self._deferred = defer.Deferred()
+
+ def is_set(self) -> bool:
+ return self._deferred.called
+
+ async def wait(self, timeout_seconds: float) -> bool:
+ if self.is_set():
+ return True
+
+ # Create a deferred that gets called in N seconds
+ sleep_deferred: "defer.Deferred[None]" = defer.Deferred()
+ call = self._reactor.callLater(timeout_seconds, sleep_deferred.callback, None)
+
+ try:
+ await make_deferred_yieldable(
+ defer.DeferredList(
+ [sleep_deferred, self._deferred],
+ fireOnOneCallback=True,
+ fireOnOneErrback=True,
+ consumeErrors=True,
+ )
+ )
+ finally:
+ # Cancel the sleep if we were woken up
+ if call.active():
+ call.cancel()
+
+ return self.is_set()
|