diff options
author | Erik Johnston <erik@matrix.org> | 2016-04-07 16:35:40 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2016-04-07 16:35:40 +0100 |
commit | a294b04bf09bb2292485f7946b6ae8854910dc24 (patch) | |
tree | 4639be3f9543b4a9090654f4ecc781d3b4398a6d /synapse/util/async.py | |
parent | Merge pull request #703 from matrix-org/erikj/member (diff) | |
parent | Rename things (diff) | |
download | synapse-a294b04bf09bb2292485f7946b6ae8854910dc24.tar.xz |
Merge pull request #700 from matrix-org/erikj/deduplicate_joins
Deduplicate membership changes
Diffstat (limited to 'synapse/util/async.py')
-rw-r--r-- | synapse/util/async.py | 50 |
1 files changed, 49 insertions, 1 deletions
diff --git a/synapse/util/async.py b/synapse/util/async.py index cd4d90f3cf..0d6f48e2d8 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -16,9 +16,13 @@ from twisted.internet import defer, reactor -from .logcontext import PreserveLoggingContext, preserve_fn +from .logcontext import ( + PreserveLoggingContext, preserve_fn, preserve_context_over_deferred, +) from synapse.util import unwrapFirstError +from contextlib import contextmanager + @defer.inlineCallbacks def sleep(seconds): @@ -137,3 +141,47 @@ def concurrently_execute(func, args, limit): preserve_fn(_concurrently_execute_inner)() for _ in xrange(limit) ], consumeErrors=True).addErrback(unwrapFirstError) + + +class Linearizer(object): + """Linearizes access to resources based on a key. Useful to ensure only one + thing is happening at a time on a given resource. + + Example: + + with (yield linearizer.queue("test_key")): + # do some work. + + """ + def __init__(self): + self.key_to_defer = {} + + @defer.inlineCallbacks + def queue(self, key): + # If there is already a deferred in the queue, we pull it out so that + # we can wait on it later. + # Then we replace it with a deferred that we resolve *after* the + # context manager has exited. + # We only return the context manager after the previous deferred has + # resolved. + # This all has the net effect of creating a chain of deferreds that + # wait for the previous deferred before starting their work. + current_defer = self.key_to_defer.get(key) + + new_defer = defer.Deferred() + self.key_to_defer[key] = new_defer + + if current_defer: + yield preserve_context_over_deferred(current_defer) + + @contextmanager + def _ctx_manager(): + try: + yield + finally: + new_defer.callback(None) + current_d = self.key_to_defer.get(key) + if current_d is new_defer: + self.key_to_defer.pop(key, None) + + defer.returnValue(_ctx_manager()) |