diff options
author | Richard van der Hoff <richard@matrix.org> | 2018-07-20 13:59:55 +0100 |
---|---|---|
committer | Richard van der Hoff <richard@matrix.org> | 2018-07-20 13:59:55 +0100 |
commit | 3d6df846580ce6ef8769945e6990af2f44251e40 (patch) | |
tree | 01fe71f39a23e0b35b214b31e38e9a541a22a167 /synapse/util | |
parent | Merge pull request #3571 from matrix-org/rav/limiter_fixes (diff) | |
download | synapse-3d6df846580ce6ef8769945e6990af2f44251e40.tar.xz |
Test and fix support for cancellation in Linearizer
Diffstat (limited to 'synapse/util')
-rw-r--r-- | synapse/util/async.py | 28 |
1 files changed, 22 insertions, 6 deletions
diff --git a/synapse/util/async.py b/synapse/util/async.py index 5a50d9700f..a7094e2fb4 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -184,13 +184,13 @@ class Linearizer(object): # key_to_defer is a map from the key to a 2 element list where # the first element is the number of things executing, and - # the second element is a deque of deferreds for the things blocked from - # executing. + # the second element is an OrderedDict, where the keys are deferreds for the + # things blocked from executing. self.key_to_defer = {} @defer.inlineCallbacks def queue(self, key): - entry = self.key_to_defer.setdefault(key, [0, collections.deque()]) + entry = self.key_to_defer.setdefault(key, [0, collections.OrderedDict()]) # If the number of things executing is greater than the maximum # then add a deferred to the list of blocked items @@ -198,12 +198,28 @@ class Linearizer(object): # this item so that it can continue executing. if entry[0] >= self.max_count: new_defer = defer.Deferred() - entry[1].append(new_defer) + entry[1][new_defer] = 1 logger.info( "Waiting to acquire linearizer lock %r for key %r", self.name, key, ) - yield make_deferred_yieldable(new_defer) + try: + yield make_deferred_yieldable(new_defer) + except Exception as e: + if isinstance(e, CancelledError): + logger.info( + "Cancelling wait for linearizer lock %r for key %r", + self.name, key, + ) + else: + logger.warn( + "Unexpected exception waiting for linearizer lock %r for key %r", + self.name, key, + ) + + # we just have to take ourselves back out of the queue. + del entry[1][new_defer] + raise logger.info("Acquired linearizer lock %r for key %r", self.name, key) entry[0] += 1 @@ -238,7 +254,7 @@ class Linearizer(object): entry[0] -= 1 if entry[1]: - next_def = entry[1].popleft() + (next_def, _) = entry[1].popitem(last=False) # we need to run the next thing in the sentinel context. with PreserveLoggingContext(): |