summary refs log tree commit diff
path: root/synapse/util
diff options
context:
space:
mode:
authorRichard van der Hoff <richard@matrix.org>2018-07-20 13:59:55 +0100
committerRichard van der Hoff <richard@matrix.org>2018-07-20 13:59:55 +0100
commit3d6df846580ce6ef8769945e6990af2f44251e40 (patch)
tree01fe71f39a23e0b35b214b31e38e9a541a22a167 /synapse/util
parentMerge pull request #3571 from matrix-org/rav/limiter_fixes (diff)
downloadsynapse-3d6df846580ce6ef8769945e6990af2f44251e40.tar.xz
Test and fix support for cancellation in Linearizer
Diffstat (limited to 'synapse/util')
-rw-r--r--synapse/util/async.py28
1 files changed, 22 insertions, 6 deletions
diff --git a/synapse/util/async.py b/synapse/util/async.py
index 5a50d9700f..a7094e2fb4 100644
--- a/synapse/util/async.py
+++ b/synapse/util/async.py
@@ -184,13 +184,13 @@ class Linearizer(object):
 
         # key_to_defer is a map from the key to a 2 element list where
         # the first element is the number of things executing, and
-        # the second element is a deque of deferreds for the things blocked from
-        # executing.
+        # the second element is an OrderedDict, where the keys are deferreds for the
+        # things blocked from executing.
         self.key_to_defer = {}
 
     @defer.inlineCallbacks
     def queue(self, key):
-        entry = self.key_to_defer.setdefault(key, [0, collections.deque()])
+        entry = self.key_to_defer.setdefault(key, [0, collections.OrderedDict()])
 
         # If the number of things executing is greater than the maximum
         # then add a deferred to the list of blocked items
@@ -198,12 +198,28 @@ class Linearizer(object):
         # this item so that it can continue executing.
         if entry[0] >= self.max_count:
             new_defer = defer.Deferred()
-            entry[1].append(new_defer)
+            entry[1][new_defer] = 1
 
             logger.info(
                 "Waiting to acquire linearizer lock %r for key %r", self.name, key,
             )
-            yield make_deferred_yieldable(new_defer)
+            try:
+                yield make_deferred_yieldable(new_defer)
+            except Exception as e:
+                if isinstance(e, CancelledError):
+                    logger.info(
+                        "Cancelling wait for linearizer lock %r for key %r",
+                        self.name, key,
+                    )
+                else:
+                    logger.warn(
+                        "Unexpected exception waiting for linearizer lock %r for key %r",
+                        self.name, key,
+                    )
+
+                # we just have to take ourselves back out of the queue.
+                del entry[1][new_defer]
+                raise
 
             logger.info("Acquired linearizer lock %r for key %r", self.name, key)
             entry[0] += 1
@@ -238,7 +254,7 @@ class Linearizer(object):
                 entry[0] -= 1
 
                 if entry[1]:
-                    next_def = entry[1].popleft()
+                    (next_def, _) = entry[1].popitem(last=False)
 
                     # we need to run the next thing in the sentinel context.
                     with PreserveLoggingContext():