diff --git a/synapse/util/async.py b/synapse/util/async.py
index 7d5acecb1c..22071ddef7 100644
--- a/synapse/util/async.py
+++ b/synapse/util/async.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -12,7 +13,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
+import collections
import logging
from contextlib import contextmanager
@@ -248,11 +249,16 @@ class Limiter(object):
# do some work.
"""
- def __init__(self, max_count, clock=None):
+ def __init__(self, max_count=1, name=None, clock=None):
"""
Args:
- max_count(int): The maximum number of concurrent access
+ max_count(int): The maximum number of concurrent accesses
"""
+ if name is None:
+ self.name = id(self)
+ else:
+ self.name = name
+
if not clock:
from twisted.internet import reactor
clock = Clock(reactor)
@@ -260,14 +266,14 @@ class Limiter(object):
self.max_count = max_count
# key_to_defer is a map from the key to a 2 element list where
- # the first element is the number of things executing
- # the second element is a list of deferreds for the things blocked from
+ # the first element is the number of things executing, and
+ # the second element is a deque of deferreds for the things blocked from
# executing.
self.key_to_defer = {}
@defer.inlineCallbacks
def queue(self, key):
- entry = self.key_to_defer.setdefault(key, [0, []])
+ entry = self.key_to_defer.setdefault(key, [0, collections.deque()])
# If the number of things executing is greater than the maximum
# then add a deferred to the list of blocked items
@@ -277,10 +283,10 @@ class Limiter(object):
new_defer = defer.Deferred()
entry[1].append(new_defer)
- logger.info("Waiting to acquire limiter lock for key %r", key)
- with PreserveLoggingContext():
- yield new_defer
- logger.info("Acquired limiter lock for key %r", key)
+ logger.info("Waiting to acquire limiter lock %r for key %r", self.name, key)
+ yield make_deferred_yieldable(new_defer)
+
+ logger.info("Acquired limiter lock %r for key %r", self.name, key)
entry[0] += 1
# if the code holding the lock completes synchronously, then it
@@ -296,7 +302,7 @@ class Limiter(object):
yield self._clock.sleep(0)
else:
- logger.info("Acquired uncontended limiter lock for key %r", key)
+ logger.info("Acquired uncontended limiter lock %r for key %r", self.name, key)
entry[0] += 1
@contextmanager
@@ -304,15 +310,16 @@ class Limiter(object):
try:
yield
finally:
- logger.info("Releasing limiter lock for key %r", key)
+ logger.info("Releasing limiter lock %r for key %r", self.name, key)
# We've finished executing so check if there are any things
# blocked waiting to execute and start one of them
entry[0] -= 1
if entry[1]:
- next_def = entry[1].pop(0)
+ next_def = entry[1].popleft()
+ # we need to run the next thing in the sentinel context.
with PreserveLoggingContext():
next_def.callback(None)
elif entry[0] == 0:
|