diff options
author | Erik Johnston <erik@matrix.org> | 2016-12-16 10:40:10 +0000 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2016-12-16 10:40:10 +0000 |
commit | f5a4001bb116c468cc5e8e0ae04a1c570e2cb171 (patch) | |
tree | fce7147d9b4422f76b5cec8b53312bb34932d84f /synapse/util/async.py | |
parent | Merge pull request #1685 from matrix-org/rav/update_readme_for_tests (diff) | |
parent | Bump version and changelog (diff) | |
download | synapse-f5a4001bb116c468cc5e8e0ae04a1c570e2cb171.tar.xz |
Merge branch 'release-v0.18.5' of github.com:matrix-org/synapse v0.18.5
Diffstat (limited to 'synapse/util/async.py')
-rw-r--r-- | synapse/util/async.py | 58 |
1 files changed, 58 insertions, 0 deletions
diff --git a/synapse/util/async.py b/synapse/util/async.py index 347fb1e380..16ed183d4c 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -197,6 +197,64 @@ class Linearizer(object): defer.returnValue(_ctx_manager()) +class Limiter(object): + """Limits concurrent access to resources based on a key. Useful to ensure + only a few thing happen at a time on a given resource. + + Example: + + with (yield limiter.queue("test_key")): + # do some work. + + """ + def __init__(self, max_count): + """ + Args: + max_count(int): The maximum number of concurrent access + """ + self.max_count = max_count + + # key_to_defer is a map from the key to a 2 element list where + # the first element is the number of things executing + # the second element is a list of deferreds for the things blocked from + # executing. + self.key_to_defer = {} + + @defer.inlineCallbacks + def queue(self, key): + entry = self.key_to_defer.setdefault(key, [0, []]) + + # If the number of things executing is greater than the maximum + # then add a deferred to the list of blocked items + # When on of the things currently executing finishes it will callback + # this item so that it can continue executing. + if entry[0] >= self.max_count: + new_defer = defer.Deferred() + entry[1].append(new_defer) + with PreserveLoggingContext(): + yield new_defer + + entry[0] += 1 + + @contextmanager + def _ctx_manager(): + try: + yield + finally: + # We've finished executing so check if there are any things + # blocked waiting to execute and start one of them + entry[0] -= 1 + try: + entry[1].pop(0).callback(None) + except IndexError: + # If nothing else is executing for this key then remove it + # from the map + if entry[0] == 0: + self.key_to_defer.pop(key, None) + + defer.returnValue(_ctx_manager()) + + class ReadWriteLock(object): """A deferred style read write lock. |