diff options
author | Erik Johnston <erik@matrix.org> | 2016-04-01 14:06:00 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2016-04-01 14:06:00 +0100 |
commit | 8d73cd502bd8ee6903c81f20f79fe5e1509692e3 (patch) | |
tree | ab50c21d3746ea5f56846bb1633e9ede7151f3c5 /synapse/util/async.py | |
parent | Filter rooms list before chunking (diff) | |
download | synapse-8d73cd502bd8ee6903c81f20f79fe5e1509692e3.tar.xz |
Add concurrently_execute function
Diffstat (limited to 'synapse/util/async.py')
-rw-r--r-- | synapse/util/async.py | 32 |
1 files changed, 31 insertions, 1 deletions
diff --git a/synapse/util/async.py b/synapse/util/async.py index 640fae3890..a75e1c71fb 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -16,7 +16,8 @@ from twisted.internet import defer, reactor -from .logcontext import PreserveLoggingContext +from .logcontext import PreserveLoggingContext, preserve_fn +from synapse.util import unwrapFirstError @defer.inlineCallbacks @@ -107,3 +108,32 @@ class ObservableDeferred(object): return "<ObservableDeferred object at %s, result=%r, _deferred=%r>" % ( id(self), self._result, self._deferred, ) + + +def concurrently_execute(func, args, limit): + """Executes the function with each argument conncurrently while limiting + the number of concurrent executions. + + Args: + func (func): Function to execute, should return a deferred. + args (list): List of arguments to pass to func, each invocation of func + gets a signle argument. + limit (int): Maximum number of conccurent executions. + + Returns: + deferred + """ + it = iter(args) + + @defer.inlineCallbacks + def _concurrently_execute_inner(): + try: + while True: + yield func(it.next()) + except StopIteration: + pass + + return defer.gatherResults([ + preserve_fn(_concurrently_execute_inner)() + for _ in xrange(limit) + ], consumeErrors=True).addErrback(unwrapFirstError) |