summary refs log tree commit diff
path: root/synapse/util/async.py
diff options
context:
space:
mode:
authorRichard van der Hoff <richard@matrix.org>2018-07-23 17:15:12 +0100
committerRichard van der Hoff <richard@matrix.org>2018-07-23 17:15:12 +0100
commit4f5cc8e4e796153cbdc09517e521fc3a23eb66c2 (patch)
treeccaa25ba8cec666b88ac217f6495cf59663550b0 /synapse/util/async.py
parentRemove redundant checks on room forgottenness (diff)
parentMerge pull request #3582 from matrix-org/erikj/fixup_stateless (diff)
downloadsynapse-4f5cc8e4e796153cbdc09517e521fc3a23eb66c2.tar.xz
Merge remote-tracking branch 'origin/develop' into rav/remove_who_forgot_in_room
Diffstat (limited to 'synapse/util/async.py')
-rw-r--r--synapse/util/async.py28
1 files changed, 22 insertions, 6 deletions
diff --git a/synapse/util/async.py b/synapse/util/async.py
index 5a50d9700f..a7094e2fb4 100644
--- a/synapse/util/async.py
+++ b/synapse/util/async.py
@@ -184,13 +184,13 @@ class Linearizer(object):
 
         # key_to_defer is a map from the key to a 2 element list where
         # the first element is the number of things executing, and
-        # the second element is a deque of deferreds for the things blocked from
-        # executing.
+        # the second element is an OrderedDict, where the keys are deferreds for the
+        # things blocked from executing.
         self.key_to_defer = {}
 
     @defer.inlineCallbacks
     def queue(self, key):
-        entry = self.key_to_defer.setdefault(key, [0, collections.deque()])
+        entry = self.key_to_defer.setdefault(key, [0, collections.OrderedDict()])
 
         # If the number of things executing is greater than the maximum
         # then add a deferred to the list of blocked items
@@ -198,12 +198,28 @@ class Linearizer(object):
         # this item so that it can continue executing.
         if entry[0] >= self.max_count:
             new_defer = defer.Deferred()
-            entry[1].append(new_defer)
+            entry[1][new_defer] = 1
 
             logger.info(
                 "Waiting to acquire linearizer lock %r for key %r", self.name, key,
             )
-            yield make_deferred_yieldable(new_defer)
+            try:
+                yield make_deferred_yieldable(new_defer)
+            except Exception as e:
+                if isinstance(e, CancelledError):
+                    logger.info(
+                        "Cancelling wait for linearizer lock %r for key %r",
+                        self.name, key,
+                    )
+                else:
+                    logger.warn(
+                        "Unexpected exception waiting for linearizer lock %r for key %r",
+                        self.name, key,
+                    )
+
+                # we just have to take ourselves back out of the queue.
+                del entry[1][new_defer]
+                raise
 
             logger.info("Acquired linearizer lock %r for key %r", self.name, key)
             entry[0] += 1
@@ -238,7 +254,7 @@ class Linearizer(object):
                 entry[0] -= 1
 
                 if entry[1]:
-                    next_def = entry[1].popleft()
+                    (next_def, _) = entry[1].popitem(last=False)
 
                     # we need to run the next thing in the sentinel context.
                     with PreserveLoggingContext():