diff --git a/synapse/util/async.py b/synapse/util/async.py
index 5a50d9700f..a7094e2fb4 100644
--- a/synapse/util/async.py
+++ b/synapse/util/async.py
@@ -184,13 +184,13 @@ class Linearizer(object):
# key_to_defer is a map from the key to a 2 element list where
# the first element is the number of things executing, and
- # the second element is a deque of deferreds for the things blocked from
- # executing.
+ # the second element is an OrderedDict, where the keys are deferreds for the
+ # things blocked from executing.
self.key_to_defer = {}
@defer.inlineCallbacks
def queue(self, key):
- entry = self.key_to_defer.setdefault(key, [0, collections.deque()])
+ entry = self.key_to_defer.setdefault(key, [0, collections.OrderedDict()])
# If the number of things executing is greater than the maximum
# then add a deferred to the list of blocked items
@@ -198,12 +198,28 @@ class Linearizer(object):
# this item so that it can continue executing.
if entry[0] >= self.max_count:
new_defer = defer.Deferred()
- entry[1].append(new_defer)
+ entry[1][new_defer] = 1
logger.info(
"Waiting to acquire linearizer lock %r for key %r", self.name, key,
)
- yield make_deferred_yieldable(new_defer)
+ try:
+ yield make_deferred_yieldable(new_defer)
+ except Exception as e:
+ if isinstance(e, CancelledError):
+ logger.info(
+ "Cancelling wait for linearizer lock %r for key %r",
+ self.name, key,
+ )
+ else:
+ logger.warn(
+ "Unexpected exception waiting for linearizer lock %r for key %r",
+ self.name, key,
+ )
+
+ # we just have to take ourselves back out of the queue.
+ del entry[1][new_defer]
+ raise
logger.info("Acquired linearizer lock %r for key %r", self.name, key)
entry[0] += 1
@@ -238,7 +254,7 @@ class Linearizer(object):
entry[0] -= 1
if entry[1]:
- next_def = entry[1].popleft()
+ (next_def, _) = entry[1].popitem(last=False)
# we need to run the next thing in the sentinel context.
with PreserveLoggingContext():
|