diff options
author | Matthew Hodgson <matthew@matrix.org> | 2016-04-04 00:38:21 +0100 |
---|---|---|
committer | Matthew Hodgson <matthew@matrix.org> | 2016-04-04 00:38:21 +0100 |
commit | 9f7dc2bef7cd39645ae74d96f2919a3f9fdb65ac (patch) | |
tree | b818eb0b962de7e860584bb5286f98e60e12ac75 /synapse/util | |
parent | report image size (bytewise) in OG meta (diff) | |
parent | Merge pull request #686 from matrix-org/markjh/doc_strings (diff) | |
download | synapse-9f7dc2bef7cd39645ae74d96f2919a3f9fdb65ac.tar.xz |
Merge branch 'develop' into matthew/preview_urls
Diffstat (limited to 'synapse/util')
-rw-r--r-- | synapse/util/async.py | 32 |
1 files changed, 31 insertions, 1 deletions
diff --git a/synapse/util/async.py b/synapse/util/async.py index 640fae3890..cd4d90f3cf 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -16,7 +16,8 @@ from twisted.internet import defer, reactor -from .logcontext import PreserveLoggingContext +from .logcontext import PreserveLoggingContext, preserve_fn +from synapse.util import unwrapFirstError @defer.inlineCallbacks @@ -107,3 +108,32 @@ class ObservableDeferred(object): return "<ObservableDeferred object at %s, result=%r, _deferred=%r>" % ( id(self), self._result, self._deferred, ) + + +def concurrently_execute(func, args, limit): + """Executes the function with each argument conncurrently while limiting + the number of concurrent executions. + + Args: + func (func): Function to execute, should return a deferred. + args (list): List of arguments to pass to func, each invocation of func + gets a signle argument. + limit (int): Maximum number of conccurent executions. + + Returns: + deferred: Resolved when all function invocations have finished. + """ + it = iter(args) + + @defer.inlineCallbacks + def _concurrently_execute_inner(): + try: + while True: + yield func(it.next()) + except StopIteration: + pass + + return defer.gatherResults([ + preserve_fn(_concurrently_execute_inner)() + for _ in xrange(limit) + ], consumeErrors=True).addErrback(unwrapFirstError) |