diff options
author | Richard van der Hoff <richard@matrix.org> | 2018-07-20 12:43:23 +0100 |
---|---|---|
committer | Richard van der Hoff <richard@matrix.org> | 2018-07-20 12:50:27 +0100 |
commit | 8462c26485fb4f19fc52accc05870c0ea4c8eb6a (patch) | |
tree | bcf4724005702d8798fbda90439439ff4e9c3693 /synapse/util | |
parent | Add a sleep to the Limiter to fix stack overflows. (diff) | |
download | synapse-8462c26485fb4f19fc52accc05870c0ea4c8eb6a.tar.xz |
Improvements to the Limiter
* give them names, to improve logging * use a deque rather than a list for efficiency
Diffstat (limited to 'synapse/util')
-rw-r--r-- | synapse/util/async.py | 33 |
1 files changed, 20 insertions, 13 deletions
diff --git a/synapse/util/async.py b/synapse/util/async.py index 7d5acecb1c..22071ddef7 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,7 +13,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import collections import logging from contextlib import contextmanager @@ -248,11 +249,16 @@ class Limiter(object): # do some work. """ - def __init__(self, max_count, clock=None): + def __init__(self, max_count=1, name=None, clock=None): """ Args: - max_count(int): The maximum number of concurrent access + max_count(int): The maximum number of concurrent accesses """ + if name is None: + self.name = id(self) + else: + self.name = name + if not clock: from twisted.internet import reactor clock = Clock(reactor) @@ -260,14 +266,14 @@ class Limiter(object): self.max_count = max_count # key_to_defer is a map from the key to a 2 element list where - # the first element is the number of things executing - # the second element is a list of deferreds for the things blocked from + # the first element is the number of things executing, and + # the second element is a deque of deferreds for the things blocked from # executing. self.key_to_defer = {} @defer.inlineCallbacks def queue(self, key): - entry = self.key_to_defer.setdefault(key, [0, []]) + entry = self.key_to_defer.setdefault(key, [0, collections.deque()]) # If the number of things executing is greater than the maximum # then add a deferred to the list of blocked items @@ -277,10 +283,10 @@ class Limiter(object): new_defer = defer.Deferred() entry[1].append(new_defer) - logger.info("Waiting to acquire limiter lock for key %r", key) - with PreserveLoggingContext(): - yield new_defer - logger.info("Acquired limiter lock for key %r", key) + logger.info("Waiting to acquire limiter lock %r for key %r", self.name, key) + yield make_deferred_yieldable(new_defer) + + logger.info("Acquired limiter lock %r for key %r", self.name, key) entry[0] += 1 # if the code holding the lock completes synchronously, then it @@ -296,7 +302,7 @@ class Limiter(object): yield self._clock.sleep(0) else: - logger.info("Acquired uncontended limiter lock for key %r", key) + logger.info("Acquired uncontended limiter lock %r for key %r", self.name, key) entry[0] += 1 @contextmanager @@ -304,15 +310,16 @@ class Limiter(object): try: yield finally: - logger.info("Releasing limiter lock for key %r", key) + logger.info("Releasing limiter lock %r for key %r", self.name, key) # We've finished executing so check if there are any things # blocked waiting to execute and start one of them entry[0] -= 1 if entry[1]: - next_def = entry[1].pop(0) + next_def = entry[1].popleft() + # we need to run the next thing in the sentinel context. with PreserveLoggingContext(): next_def.callback(None) elif entry[0] == 0: |