diff options
author | Erik Johnston <erik@matrix.org> | 2016-04-01 15:02:59 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2016-04-01 15:02:59 +0100 |
commit | a853cdec5beb74624e5ab4ec93bce7c37a67cf3e (patch) | |
tree | cf03ca30a3617e5c47e5d34eb7cd094dffdc1a48 /synapse/util | |
parent | Merge pull request #682 from matrix-org/markjh/fix_invalidate (diff) | |
parent | Comment (diff) | |
download | synapse-a853cdec5beb74624e5ab4ec93bce7c37a67cf3e.tar.xz |
Merge pull request #685 from matrix-org/erikj/sync_leave
Add concurrently_execute function
Diffstat (limited to 'synapse/util')
-rw-r--r-- | synapse/util/async.py | 32 |
1 files changed, 31 insertions, 1 deletions
diff --git a/synapse/util/async.py b/synapse/util/async.py index 640fae3890..cd4d90f3cf 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -16,7 +16,8 @@ from twisted.internet import defer, reactor -from .logcontext import PreserveLoggingContext +from .logcontext import PreserveLoggingContext, preserve_fn +from synapse.util import unwrapFirstError @defer.inlineCallbacks @@ -107,3 +108,32 @@ class ObservableDeferred(object): return "<ObservableDeferred object at %s, result=%r, _deferred=%r>" % ( id(self), self._result, self._deferred, ) + + +def concurrently_execute(func, args, limit): + """Executes the function with each argument conncurrently while limiting + the number of concurrent executions. + + Args: + func (func): Function to execute, should return a deferred. + args (list): List of arguments to pass to func, each invocation of func + gets a signle argument. + limit (int): Maximum number of conccurent executions. + + Returns: + deferred: Resolved when all function invocations have finished. + """ + it = iter(args) + + @defer.inlineCallbacks + def _concurrently_execute_inner(): + try: + while True: + yield func(it.next()) + except StopIteration: + pass + + return defer.gatherResults([ + preserve_fn(_concurrently_execute_inner)() + for _ in xrange(limit) + ], consumeErrors=True).addErrback(unwrapFirstError) |