summary refs log tree commit diff
path: root/synapse/util/async.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/util/async.py')
-rw-r--r--synapse/util/async.py42
1 files changed, 42 insertions, 0 deletions
diff --git a/synapse/util/async.py b/synapse/util/async.py
index cd4d90f3cf..408c86be91 100644
--- a/synapse/util/async.py
+++ b/synapse/util/async.py
@@ -19,6 +19,8 @@ from twisted.internet import defer, reactor
 from .logcontext import PreserveLoggingContext, preserve_fn
 from synapse.util import unwrapFirstError
 
+from contextlib import contextmanager
+
 
 @defer.inlineCallbacks
 def sleep(seconds):
@@ -137,3 +139,43 @@ def concurrently_execute(func, args, limit):
         preserve_fn(_concurrently_execute_inner)()
         for _ in xrange(limit)
     ], consumeErrors=True).addErrback(unwrapFirstError)
+
+
+@contextmanager
+def _trigger_defer_manager(d):
+    try:
+        yield
+    finally:
+        d.callback(None)
+
+
+class Linearizer(object):
+    """Linearizes access to resources based on a key. Useful to ensure only one
+    thing is happening at a time on a given resource.
+
+    Example:
+
+        with (yield linearizer.queue("test_key")):
+            # do some work.
+
+    """
+    def __init__(self):
+        self.key_to_defer = {}
+
+    @defer.inlineCallbacks
+    def queue(self, key):
+        current_defer = self.key_to_defer.get(key)
+
+        new_defer = defer.Deferred()
+        self.key_to_defer[key] = new_defer
+
+        def remove_if_current(_):
+            d = self.key_to_defer.get(key)
+            if d is new_defer:
+                self.key_to_defer.pop(key, None)
+
+        new_defer.addBoth(remove_if_current)
+
+        yield current_defer
+
+        defer.returnValue(_trigger_defer_manager(new_defer))