Deduplicate joins
1 files changed, 42 insertions, 0 deletions
diff --git a/synapse/util/async.py b/synapse/util/async.py
index cd4d90f3cf..408c86be91 100644
--- a/synapse/util/async.py
+++ b/synapse/util/async.py
@@ -19,6 +19,8 @@ from twisted.internet import defer, reactor
from .logcontext import PreserveLoggingContext, preserve_fn
from synapse.util import unwrapFirstError
+from contextlib import contextmanager
+
@defer.inlineCallbacks
def sleep(seconds):
@@ -137,3 +139,43 @@ def concurrently_execute(func, args, limit):
preserve_fn(_concurrently_execute_inner)()
for _ in xrange(limit)
], consumeErrors=True).addErrback(unwrapFirstError)
+
+
+@contextmanager
+def _trigger_defer_manager(d):
+ try:
+ yield
+ finally:
+ d.callback(None)
+
+
+class Linearizer(object):
+ """Linearizes access to resources based on a key. Useful to ensure only one
+ thing is happening at a time on a given resource.
+
+ Example:
+
+ with (yield linearizer.queue("test_key")):
+ # do some work.
+
+ """
+ def __init__(self):
+ self.key_to_defer = {}
+
+ @defer.inlineCallbacks
+ def queue(self, key):
+ current_defer = self.key_to_defer.get(key)
+
+ new_defer = defer.Deferred()
+ self.key_to_defer[key] = new_defer
+
+ def remove_if_current(_):
+ d = self.key_to_defer.get(key)
+ if d is new_defer:
+ self.key_to_defer.pop(key, None)
+
+ new_defer.addBoth(remove_if_current)
+
+ yield current_defer
+
+ defer.returnValue(_trigger_defer_manager(new_defer))
|