From d073cb7ead62314ff14c59a9aaeac4f5f8470dd6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 10 Nov 2016 16:29:51 +0000 Subject: Add Limiter: limit concurrent access to resource --- synapse/util/async.py | 45 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) (limited to 'synapse/util/async.py') diff --git a/synapse/util/async.py b/synapse/util/async.py index 347fb1e380..2a680fff5e 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -197,6 +197,51 @@ class Linearizer(object): defer.returnValue(_ctx_manager()) +class Limiter(object): + """Limits concurrent access to resources based on a key. Useful to ensure + only a few thing happen at a time on a given resource. + + Example: + + with (yield limiter.queue("test_key")): + # do some work. + + """ + def __init__(self, max_count): + """ + Args: + max_count(int): The maximum number of concurrent access + """ + self.max_count = max_count + self.key_to_defer = {} + + @defer.inlineCallbacks + def queue(self, key): + entry = self.key_to_defer.setdefault(key, [0, []]) + + if entry[0] >= self.max_count: + new_defer = defer.Deferred() + entry[1].append(new_defer) + with PreserveLoggingContext(): + yield new_defer + + entry[0] += 1 + + @contextmanager + def _ctx_manager(): + try: + yield + finally: + entry[0] -= 1 + try: + entry[1].pop(0).callback(None) + except IndexError: + if entry[0] == 0: + self.key_to_defer.pop(key, None) + + defer.returnValue(_ctx_manager()) + + class ReadWriteLock(object): """A deferred style read write lock. -- cgit 1.5.1 From 64038b806cff6fea0a190b3d310225ede8abc5af Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 11 Nov 2016 10:42:08 +0000 Subject: Comments --- synapse/handlers/message.py | 2 ++ synapse/util/async.py | 13 +++++++++++++ 2 files changed, 15 insertions(+) (limited to 'synapse/util/async.py') diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 90519e33e3..4d0515ddfb 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -50,6 +50,8 @@ class MessageHandler(BaseHandler): self.pagination_lock = ReadWriteLock() + # We arbitrarily limit concurrent event creation for a room to 5. + # This is to stop us from diverging history *too* much. self.limiter = Limiter(max_count=5) @defer.inlineCallbacks diff --git a/synapse/util/async.py b/synapse/util/async.py index 2a680fff5e..16ed183d4c 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -213,12 +213,21 @@ class Limiter(object): max_count(int): The maximum number of concurrent access """ self.max_count = max_count + + # key_to_defer is a map from the key to a 2 element list where + # the first element is the number of things executing + # the second element is a list of deferreds for the things blocked from + # executing. self.key_to_defer = {} @defer.inlineCallbacks def queue(self, key): entry = self.key_to_defer.setdefault(key, [0, []]) + # If the number of things executing is greater than the maximum + # then add a deferred to the list of blocked items + # When on of the things currently executing finishes it will callback + # this item so that it can continue executing. if entry[0] >= self.max_count: new_defer = defer.Deferred() entry[1].append(new_defer) @@ -232,10 +241,14 @@ class Limiter(object): try: yield finally: + # We've finished executing so check if there are any things + # blocked waiting to execute and start one of them entry[0] -= 1 try: entry[1].pop(0).callback(None) except IndexError: + # If nothing else is executing for this key then remove it + # from the map if entry[0] == 0: self.key_to_defer.pop(key, None) -- cgit 1.5.1