From 8d73cd502bd8ee6903c81f20f79fe5e1509692e3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 1 Apr 2016 14:06:00 +0100 Subject: Add concurrently_execute function --- synapse/util/async.py | 32 +++++++++++++++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) (limited to 'synapse/util') diff --git a/synapse/util/async.py b/synapse/util/async.py index 640fae3890..a75e1c71fb 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -16,7 +16,8 @@ from twisted.internet import defer, reactor -from .logcontext import PreserveLoggingContext +from .logcontext import PreserveLoggingContext, preserve_fn +from synapse.util import unwrapFirstError @defer.inlineCallbacks @@ -107,3 +108,32 @@ class ObservableDeferred(object): return "" % ( id(self), self._result, self._deferred, ) + + +def concurrently_execute(func, args, limit): + """Executes the function with each argument conncurrently while limiting + the number of concurrent executions. + + Args: + func (func): Function to execute, should return a deferred. + args (list): List of arguments to pass to func, each invocation of func + gets a signle argument. + limit (int): Maximum number of conccurent executions. + + Returns: + deferred + """ + it = iter(args) + + @defer.inlineCallbacks + def _concurrently_execute_inner(): + try: + while True: + yield func(it.next()) + except StopIteration: + pass + + return defer.gatherResults([ + preserve_fn(_concurrently_execute_inner)() + for _ in xrange(limit) + ], consumeErrors=True).addErrback(unwrapFirstError) -- cgit 1.4.1 From 3f4eb4c92402d80d0f41501bf71a60a1b94f2756 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 1 Apr 2016 14:15:27 +0100 Subject: Comment --- synapse/util/async.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/util') diff --git a/synapse/util/async.py b/synapse/util/async.py index a75e1c71fb..cd4d90f3cf 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -121,7 +121,7 @@ def concurrently_execute(func, args, limit): limit (int): Maximum number of conccurent executions. Returns: - deferred + deferred: Resolved when all function invocations have finished. """ it = iter(args) -- cgit 1.4.1