diff --git a/synapse/util/async.py b/synapse/util/async.py
index 640fae3890..347fb1e380 100644
--- a/synapse/util/async.py
+++ b/synapse/util/async.py
@@ -16,7 +16,12 @@
from twisted.internet import defer, reactor
-from .logcontext import PreserveLoggingContext
+from .logcontext import (
+ PreserveLoggingContext, preserve_fn, preserve_context_over_deferred,
+)
+from synapse.util import unwrapFirstError
+
+from contextlib import contextmanager
@defer.inlineCallbacks
@@ -97,6 +102,15 @@ class ObservableDeferred(object):
def observers(self):
return self._observers
+ def has_called(self):
+ return self._result is not None
+
+ def has_succeeded(self):
+ return self._result is not None and self._result[0] is True
+
+ def get_result(self):
+ return self._result[1]
+
def __getattr__(self, name):
return getattr(self._deferred, name)
@@ -107,3 +121,159 @@ class ObservableDeferred(object):
return "<ObservableDeferred object at %s, result=%r, _deferred=%r>" % (
id(self), self._result, self._deferred,
)
+
+
+def concurrently_execute(func, args, limit):
+ """Executes the function with each argument conncurrently while limiting
+ the number of concurrent executions.
+
+ Args:
+ func (func): Function to execute, should return a deferred.
+ args (list): List of arguments to pass to func, each invocation of func
+ gets a signle argument.
+ limit (int): Maximum number of conccurent executions.
+
+ Returns:
+ deferred: Resolved when all function invocations have finished.
+ """
+ it = iter(args)
+
+ @defer.inlineCallbacks
+ def _concurrently_execute_inner():
+ try:
+ while True:
+ yield func(it.next())
+ except StopIteration:
+ pass
+
+ return preserve_context_over_deferred(defer.gatherResults([
+ preserve_fn(_concurrently_execute_inner)()
+ for _ in xrange(limit)
+ ], consumeErrors=True)).addErrback(unwrapFirstError)
+
+
+class Linearizer(object):
+ """Linearizes access to resources based on a key. Useful to ensure only one
+ thing is happening at a time on a given resource.
+
+ Example:
+
+ with (yield linearizer.queue("test_key")):
+ # do some work.
+
+ """
+ def __init__(self):
+ self.key_to_defer = {}
+
+ @defer.inlineCallbacks
+ def queue(self, key):
+ # If there is already a deferred in the queue, we pull it out so that
+ # we can wait on it later.
+ # Then we replace it with a deferred that we resolve *after* the
+ # context manager has exited.
+ # We only return the context manager after the previous deferred has
+ # resolved.
+ # This all has the net effect of creating a chain of deferreds that
+ # wait for the previous deferred before starting their work.
+ current_defer = self.key_to_defer.get(key)
+
+ new_defer = defer.Deferred()
+ self.key_to_defer[key] = new_defer
+
+ if current_defer:
+ with PreserveLoggingContext():
+ yield current_defer
+
+ @contextmanager
+ def _ctx_manager():
+ try:
+ yield
+ finally:
+ new_defer.callback(None)
+ current_d = self.key_to_defer.get(key)
+ if current_d is new_defer:
+ self.key_to_defer.pop(key, None)
+
+ defer.returnValue(_ctx_manager())
+
+
+class ReadWriteLock(object):
+ """A deferred style read write lock.
+
+ Example:
+
+ with (yield read_write_lock.read("test_key")):
+ # do some work
+ """
+
+ # IMPLEMENTATION NOTES
+ #
+ # We track the most recent queued reader and writer deferreds (which get
+ # resolved when they release the lock).
+ #
+ # Read: We know its safe to acquire a read lock when the latest writer has
+ # been resolved. The new reader is appeneded to the list of latest readers.
+ #
+ # Write: We know its safe to acquire the write lock when both the latest
+ # writers and readers have been resolved. The new writer replaces the latest
+ # writer.
+
+ def __init__(self):
+ # Latest readers queued
+ self.key_to_current_readers = {}
+
+ # Latest writer queued
+ self.key_to_current_writer = {}
+
+ @defer.inlineCallbacks
+ def read(self, key):
+ new_defer = defer.Deferred()
+
+ curr_readers = self.key_to_current_readers.setdefault(key, set())
+ curr_writer = self.key_to_current_writer.get(key, None)
+
+ curr_readers.add(new_defer)
+
+ # We wait for the latest writer to finish writing. We can safely ignore
+ # any existing readers... as they're readers.
+ yield curr_writer
+
+ @contextmanager
+ def _ctx_manager():
+ try:
+ yield
+ finally:
+ new_defer.callback(None)
+ self.key_to_current_readers.get(key, set()).discard(new_defer)
+
+ defer.returnValue(_ctx_manager())
+
+ @defer.inlineCallbacks
+ def write(self, key):
+ new_defer = defer.Deferred()
+
+ curr_readers = self.key_to_current_readers.get(key, set())
+ curr_writer = self.key_to_current_writer.get(key, None)
+
+ # We wait on all latest readers and writer.
+ to_wait_on = list(curr_readers)
+ if curr_writer:
+ to_wait_on.append(curr_writer)
+
+ # We can clear the list of current readers since the new writer waits
+ # for them to finish.
+ curr_readers.clear()
+ self.key_to_current_writer[key] = new_defer
+
+ yield preserve_context_over_deferred(defer.gatherResults(to_wait_on))
+
+ @contextmanager
+ def _ctx_manager():
+ try:
+ yield
+ finally:
+ new_defer.callback(None)
+ if self.key_to_current_writer[key] == new_defer:
+ self.key_to_current_writer.pop(key)
+
+ defer.returnValue(_ctx_manager())
|