summary refs log tree commit diff
path: root/synapse/util/async.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/util/async.py')
-rw-r--r--synapse/util/async.py89
1 files changed, 88 insertions, 1 deletions
diff --git a/synapse/util/async.py b/synapse/util/async.py
index 640fae3890..40be7fe7e3 100644
--- a/synapse/util/async.py
+++ b/synapse/util/async.py
@@ -16,7 +16,12 @@
 
 from twisted.internet import defer, reactor
 
-from .logcontext import PreserveLoggingContext
+from .logcontext import (
+    PreserveLoggingContext, preserve_fn, preserve_context_over_deferred,
+)
+from synapse.util import unwrapFirstError
+
+from contextlib import contextmanager
 
 
 @defer.inlineCallbacks
@@ -97,6 +102,15 @@ class ObservableDeferred(object):
     def observers(self):
         return self._observers
 
+    def has_called(self):
+        return self._result is not None
+
+    def has_succeeded(self):
+        return self._result is not None and self._result[0] is True
+
+    def get_result(self):
+        return self._result[1]
+
     def __getattr__(self, name):
         return getattr(self._deferred, name)
 
@@ -107,3 +121,76 @@ class ObservableDeferred(object):
         return "<ObservableDeferred object at %s, result=%r, _deferred=%r>" % (
             id(self), self._result, self._deferred,
         )
+
+
+def concurrently_execute(func, args, limit):
+    """Executes the function with each argument conncurrently while limiting
+    the number of concurrent executions.
+
+    Args:
+        func (func): Function to execute, should return a deferred.
+        args (list): List of arguments to pass to func, each invocation of func
+            gets a signle argument.
+        limit (int): Maximum number of conccurent executions.
+
+    Returns:
+        deferred: Resolved when all function invocations have finished.
+    """
+    it = iter(args)
+
+    @defer.inlineCallbacks
+    def _concurrently_execute_inner():
+        try:
+            while True:
+                yield func(it.next())
+        except StopIteration:
+            pass
+
+    return defer.gatherResults([
+        preserve_fn(_concurrently_execute_inner)()
+        for _ in xrange(limit)
+    ], consumeErrors=True).addErrback(unwrapFirstError)
+
+
+class Linearizer(object):
+    """Linearizes access to resources based on a key. Useful to ensure only one
+    thing is happening at a time on a given resource.
+
+    Example:
+
+        with (yield linearizer.queue("test_key")):
+            # do some work.
+
+    """
+    def __init__(self):
+        self.key_to_defer = {}
+
+    @defer.inlineCallbacks
+    def queue(self, key):
+        # If there is already a deferred in the queue, we pull it out so that
+        # we can wait on it later.
+        # Then we replace it with a deferred that we resolve *after* the
+        # context manager has exited.
+        # We only return the context manager after the previous deferred has
+        # resolved.
+        # This all has the net effect of creating a chain of deferreds that
+        # wait for the previous deferred before starting their work.
+        current_defer = self.key_to_defer.get(key)
+
+        new_defer = defer.Deferred()
+        self.key_to_defer[key] = new_defer
+
+        if current_defer:
+            yield preserve_context_over_deferred(current_defer)
+
+        @contextmanager
+        def _ctx_manager():
+            try:
+                yield
+            finally:
+                new_defer.callback(None)
+                current_d = self.key_to_defer.get(key)
+                if current_d is new_defer:
+                    self.key_to_defer.pop(key, None)
+
+        defer.returnValue(_ctx_manager())