diff options
Diffstat (limited to 'synapse/util/async.py')
-rw-r--r-- | synapse/util/async.py | 77 |
1 files changed, 73 insertions, 4 deletions
diff --git a/synapse/util/async.py b/synapse/util/async.py index 0729bb2863..9dd4e6b5bc 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -15,9 +15,11 @@ from twisted.internet import defer, reactor +from twisted.internet.defer import CancelledError +from twisted.python import failure from .logcontext import ( - PreserveLoggingContext, make_deferred_yieldable, preserve_fn + PreserveLoggingContext, make_deferred_yieldable, run_in_background ) from synapse.util import logcontext, unwrapFirstError @@ -25,6 +27,8 @@ from contextlib import contextmanager import logging +from six.moves import range + logger = logging.getLogger(__name__) @@ -156,13 +160,13 @@ def concurrently_execute(func, args, limit): def _concurrently_execute_inner(): try: while True: - yield func(it.next()) + yield func(next(it)) except StopIteration: pass return logcontext.make_deferred_yieldable(defer.gatherResults([ - preserve_fn(_concurrently_execute_inner)() - for _ in xrange(limit) + run_in_background(_concurrently_execute_inner) + for _ in range(limit) ], consumeErrors=True)).addErrback(unwrapFirstError) @@ -392,3 +396,68 @@ class ReadWriteLock(object): self.key_to_current_writer.pop(key) defer.returnValue(_ctx_manager()) + + +class DeferredTimeoutError(Exception): + """ + This error is raised by default when a L{Deferred} times out. + """ + + +def add_timeout_to_deferred(deferred, timeout, on_timeout_cancel=None): + """ + Add a timeout to a deferred by scheduling it to be cancelled after + timeout seconds. + + This is essentially a backport of deferred.addTimeout, which was introduced + in twisted 16.5. + + If the deferred gets timed out, it errbacks with a DeferredTimeoutError, + unless a cancelable function was passed to its initialization or unless + a different on_timeout_cancel callable is provided. + + Args: + deferred (defer.Deferred): deferred to be timed out + timeout (Number): seconds to time out after + + on_timeout_cancel (callable): A callable which is called immediately + after the deferred times out, and not if this deferred is + otherwise cancelled before the timeout. + + It takes an arbitrary value, which is the value of the deferred at + that exact point in time (probably a CancelledError Failure), and + the timeout. + + The default callable (if none is provided) will translate a + CancelledError Failure into a DeferredTimeoutError. + """ + timed_out = [False] + + def time_it_out(): + timed_out[0] = True + deferred.cancel() + + delayed_call = reactor.callLater(timeout, time_it_out) + + def convert_cancelled(value): + if timed_out[0]: + to_call = on_timeout_cancel or _cancelled_to_timed_out_error + return to_call(value, timeout) + return value + + deferred.addBoth(convert_cancelled) + + def cancel_timeout(result): + # stop the pending call to cancel the deferred if it's been fired + if delayed_call.active(): + delayed_call.cancel() + return result + + deferred.addBoth(cancel_timeout) + + +def _cancelled_to_timed_out_error(value, timeout): + if isinstance(value, failure.Failure): + value.trap(CancelledError) + raise DeferredTimeoutError(timeout, "Deferred") + return value |