summary refs log tree commit diff
path: root/synapse/util/async_helpers.py
diff options
context:
space:
mode:
authorErik Johnston <erikj@element.io>2024-08-14 12:41:53 +0100
committerGitHub <noreply@github.com>2024-08-14 12:41:53 +0100
commita51daffba5e58489f93f76a074aa7d6f73533226 (patch)
tree763af7bc988d7617825aefb30f56a4b250a048f4 /synapse/util/async_helpers.py
parentHandle lower-case http headers in `_Mulitpart_Parser_Protocol` (#17545) (diff)
downloadsynapse-a51daffba5e58489f93f76a074aa7d6f73533226.tar.xz
Reduce concurrent thread usage in media (#17567)
Follow on from #17558

Basically, we want to reduce the number of threads we want to use at a
time, i.e. reduce the number of threads that are paused/blocked. We do
this by returning from the thread when the consumer pauses the producer,
rather than pausing in the thread.

---------

Co-authored-by: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com>
Diffstat (limited to '')
-rw-r--r--synapse/util/async_helpers.py43
1 files changed, 43 insertions, 0 deletions
diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py

index 70139beef2..8618bb0651 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py
@@ -885,3 +885,46 @@ class AwakenableSleeper: # Cancel the sleep if we were woken up if call.active(): call.cancel() + + +class DeferredEvent: + """Like threading.Event but for async code""" + + def __init__(self, reactor: IReactorTime) -> None: + self._reactor = reactor + self._deferred: "defer.Deferred[None]" = defer.Deferred() + + def set(self) -> None: + if not self._deferred.called: + self._deferred.callback(None) + + def clear(self) -> None: + if self._deferred.called: + self._deferred = defer.Deferred() + + def is_set(self) -> bool: + return self._deferred.called + + async def wait(self, timeout_seconds: float) -> bool: + if self.is_set(): + return True + + # Create a deferred that gets called in N seconds + sleep_deferred: "defer.Deferred[None]" = defer.Deferred() + call = self._reactor.callLater(timeout_seconds, sleep_deferred.callback, None) + + try: + await make_deferred_yieldable( + defer.DeferredList( + [sleep_deferred, self._deferred], + fireOnOneCallback=True, + fireOnOneErrback=True, + consumeErrors=True, + ) + ) + finally: + # Cancel the sleep if we were woken up + if call.active(): + call.cancel() + + return self.is_set()