summary refs log tree commit diff
diff options
context:
space:
mode:
authorRichard van der Hoff <1389908+richvdh@users.noreply.github.com>2018-07-20 14:44:02 +0100
committerGitHub <noreply@github.com>2018-07-20 14:44:02 +0100
commitff48ab8527fb51c338ae79c583583f000263c453 (patch)
tree899ddb3c55d95502cd1d6b93a8f3b78edb8a7e79
parentMerge pull request #3571 from matrix-org/rav/limiter_fixes (diff)
parentChangelog (diff)
downloadsynapse-ff48ab8527fb51c338ae79c583583f000263c453.tar.xz
Merge pull request #3572 from matrix-org/rav/linearizer_cancellation
Test and fix support for cancellation in Linearizer
-rw-r--r--changelog.d/3572.misc1
-rw-r--r--synapse/util/async.py28
-rw-r--r--tests/util/test_linearizer.py31
3 files changed, 54 insertions, 6 deletions
diff --git a/changelog.d/3572.misc b/changelog.d/3572.misc
new file mode 100644
index 0000000000..8908324e68
--- /dev/null
+++ b/changelog.d/3572.misc
@@ -0,0 +1 @@
+Merge Linearizer and Limiter
diff --git a/synapse/util/async.py b/synapse/util/async.py
index 5a50d9700f..a7094e2fb4 100644
--- a/synapse/util/async.py
+++ b/synapse/util/async.py
@@ -184,13 +184,13 @@ class Linearizer(object):
 
         # key_to_defer is a map from the key to a 2 element list where
         # the first element is the number of things executing, and
-        # the second element is a deque of deferreds for the things blocked from
-        # executing.
+        # the second element is an OrderedDict, where the keys are deferreds for the
+        # things blocked from executing.
         self.key_to_defer = {}
 
     @defer.inlineCallbacks
     def queue(self, key):
-        entry = self.key_to_defer.setdefault(key, [0, collections.deque()])
+        entry = self.key_to_defer.setdefault(key, [0, collections.OrderedDict()])
 
         # If the number of things executing is greater than the maximum
         # then add a deferred to the list of blocked items
@@ -198,12 +198,28 @@ class Linearizer(object):
         # this item so that it can continue executing.
         if entry[0] >= self.max_count:
             new_defer = defer.Deferred()
-            entry[1].append(new_defer)
+            entry[1][new_defer] = 1
 
             logger.info(
                 "Waiting to acquire linearizer lock %r for key %r", self.name, key,
             )
-            yield make_deferred_yieldable(new_defer)
+            try:
+                yield make_deferred_yieldable(new_defer)
+            except Exception as e:
+                if isinstance(e, CancelledError):
+                    logger.info(
+                        "Cancelling wait for linearizer lock %r for key %r",
+                        self.name, key,
+                    )
+                else:
+                    logger.warn(
+                        "Unexpected exception waiting for linearizer lock %r for key %r",
+                        self.name, key,
+                    )
+
+                # we just have to take ourselves back out of the queue.
+                del entry[1][new_defer]
+                raise
 
             logger.info("Acquired linearizer lock %r for key %r", self.name, key)
             entry[0] += 1
@@ -238,7 +254,7 @@ class Linearizer(object):
                 entry[0] -= 1
 
                 if entry[1]:
-                    next_def = entry[1].popleft()
+                    (next_def, _) = entry[1].popitem(last=False)
 
                     # we need to run the next thing in the sentinel context.
                     with PreserveLoggingContext():
diff --git a/tests/util/test_linearizer.py b/tests/util/test_linearizer.py
index c9563124f9..4729bd5a0a 100644
--- a/tests/util/test_linearizer.py
+++ b/tests/util/test_linearizer.py
@@ -17,6 +17,7 @@
 from six.moves import range
 
 from twisted.internet import defer, reactor
+from twisted.internet.defer import CancelledError
 
 from synapse.util import Clock, logcontext
 from synapse.util.async import Linearizer
@@ -112,3 +113,33 @@ class LinearizerTestCase(unittest.TestCase):
         d6 = limiter.queue(key)
         with (yield d6):
             pass
+
+    @defer.inlineCallbacks
+    def test_cancellation(self):
+        linearizer = Linearizer()
+
+        key = object()
+
+        d1 = linearizer.queue(key)
+        cm1 = yield d1
+
+        d2 = linearizer.queue(key)
+        self.assertFalse(d2.called)
+
+        d3 = linearizer.queue(key)
+        self.assertFalse(d3.called)
+
+        d2.cancel()
+
+        with cm1:
+            pass
+
+        self.assertTrue(d2.called)
+        try:
+            yield d2
+            self.fail("Expected d2 to raise CancelledError")
+        except CancelledError:
+            pass
+
+        with (yield d3):
+            pass