summary refs log tree commit diff
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/util/test_async_helpers.py124
1 files changed, 118 insertions, 6 deletions
diff --git a/tests/util/test_async_helpers.py b/tests/util/test_async_helpers.py
index ff53ce114b..e5bc416de1 100644
--- a/tests/util/test_async_helpers.py
+++ b/tests/util/test_async_helpers.py
@@ -13,6 +13,8 @@
 # limitations under the License.
 import traceback
 
+from parameterized import parameterized_class
+
 from twisted.internet import defer
 from twisted.internet.defer import CancelledError, Deferred, ensureDeferred
 from twisted.internet.task import Clock
@@ -23,10 +25,12 @@ from synapse.logging.context import (
     LoggingContext,
     PreserveLoggingContext,
     current_context,
+    make_deferred_yieldable,
 )
 from synapse.util.async_helpers import (
     ObservableDeferred,
     concurrently_execute,
+    delay_cancellation,
     stop_cancellation,
     timeout_deferred,
 )
@@ -313,13 +317,27 @@ class ConcurrentlyExecuteTest(TestCase):
         self.successResultOf(d2)
 
 
-class StopCancellationTests(TestCase):
-    """Tests for the `stop_cancellation` function."""
+@parameterized_class(
+    ("wrapper",),
+    [("stop_cancellation",), ("delay_cancellation",)],
+)
+class CancellationWrapperTests(TestCase):
+    """Common tests for the `stop_cancellation` and `delay_cancellation` functions."""
+
+    wrapper: str
+
+    def wrap_deferred(self, deferred: "Deferred[str]") -> "Deferred[str]":
+        if self.wrapper == "stop_cancellation":
+            return stop_cancellation(deferred)
+        elif self.wrapper == "delay_cancellation":
+            return delay_cancellation(deferred)
+        else:
+            raise ValueError(f"Unsupported wrapper type: {self.wrapper}")
 
     def test_succeed(self):
         """Test that the new `Deferred` receives the result."""
         deferred: "Deferred[str]" = Deferred()
-        wrapper_deferred = stop_cancellation(deferred)
+        wrapper_deferred = self.wrap_deferred(deferred)
 
         # Success should propagate through.
         deferred.callback("success")
@@ -329,7 +347,7 @@ class StopCancellationTests(TestCase):
     def test_failure(self):
         """Test that the new `Deferred` receives the `Failure`."""
         deferred: "Deferred[str]" = Deferred()
-        wrapper_deferred = stop_cancellation(deferred)
+        wrapper_deferred = self.wrap_deferred(deferred)
 
         # Failure should propagate through.
         deferred.errback(ValueError("abc"))
@@ -337,6 +355,10 @@ class StopCancellationTests(TestCase):
         self.failureResultOf(wrapper_deferred, ValueError)
         self.assertIsNone(deferred.result, "`Failure` was not consumed")
 
+
+class StopCancellationTests(TestCase):
+    """Tests for the `stop_cancellation` function."""
+
     def test_cancellation(self):
         """Test that cancellation of the new `Deferred` leaves the original running."""
         deferred: "Deferred[str]" = Deferred()
@@ -347,11 +369,101 @@ class StopCancellationTests(TestCase):
         self.assertTrue(wrapper_deferred.called)
         self.failureResultOf(wrapper_deferred, CancelledError)
         self.assertFalse(
-            deferred.called, "Original `Deferred` was unexpectedly cancelled."
+            deferred.called, "Original `Deferred` was unexpectedly cancelled"
+        )
+
+        # Now make the original `Deferred` fail.
+        # The `Failure` must be consumed, otherwise unwanted tracebacks will be printed
+        # in logs.
+        deferred.errback(ValueError("abc"))
+        self.assertIsNone(deferred.result, "`Failure` was not consumed")
+
+
+class DelayCancellationTests(TestCase):
+    """Tests for the `delay_cancellation` function."""
+
+    def test_cancellation(self):
+        """Test that cancellation of the new `Deferred` waits for the original."""
+        deferred: "Deferred[str]" = Deferred()
+        wrapper_deferred = delay_cancellation(deferred)
+
+        # Cancel the new `Deferred`.
+        wrapper_deferred.cancel()
+        self.assertNoResult(wrapper_deferred)
+        self.assertFalse(
+            deferred.called, "Original `Deferred` was unexpectedly cancelled"
+        )
+
+        # Now make the original `Deferred` fail.
+        # The `Failure` must be consumed, otherwise unwanted tracebacks will be printed
+        # in logs.
+        deferred.errback(ValueError("abc"))
+        self.assertIsNone(deferred.result, "`Failure` was not consumed")
+
+        # Now that the original `Deferred` has failed, we should get a `CancelledError`.
+        self.failureResultOf(wrapper_deferred, CancelledError)
+
+    def test_suppresses_second_cancellation(self):
+        """Test that a second cancellation is suppressed.
+
+        Identical to `test_cancellation` except the new `Deferred` is cancelled twice.
+        """
+        deferred: "Deferred[str]" = Deferred()
+        wrapper_deferred = delay_cancellation(deferred)
+
+        # Cancel the new `Deferred`, twice.
+        wrapper_deferred.cancel()
+        wrapper_deferred.cancel()
+        self.assertNoResult(wrapper_deferred)
+        self.assertFalse(
+            deferred.called, "Original `Deferred` was unexpectedly cancelled"
         )
 
-        # Now make the inner `Deferred` fail.
+        # Now make the original `Deferred` fail.
         # The `Failure` must be consumed, otherwise unwanted tracebacks will be printed
         # in logs.
         deferred.errback(ValueError("abc"))
         self.assertIsNone(deferred.result, "`Failure` was not consumed")
+
+        # Now that the original `Deferred` has failed, we should get a `CancelledError`.
+        self.failureResultOf(wrapper_deferred, CancelledError)
+
+    def test_propagates_cancelled_error(self):
+        """Test that a `CancelledError` from the original `Deferred` gets propagated."""
+        deferred: "Deferred[str]" = Deferred()
+        wrapper_deferred = delay_cancellation(deferred)
+
+        # Fail the original `Deferred` with a `CancelledError`.
+        cancelled_error = CancelledError()
+        deferred.errback(cancelled_error)
+
+        # The new `Deferred` should fail with exactly the same `CancelledError`.
+        self.assertTrue(wrapper_deferred.called)
+        self.assertIs(cancelled_error, self.failureResultOf(wrapper_deferred).value)
+
+    def test_preserves_logcontext(self):
+        """Test that logging contexts are preserved."""
+        blocking_d: "Deferred[None]" = Deferred()
+
+        async def inner():
+            await make_deferred_yieldable(blocking_d)
+
+        async def outer():
+            with LoggingContext("c") as c:
+                try:
+                    await delay_cancellation(defer.ensureDeferred(inner()))
+                    self.fail("`CancelledError` was not raised")
+                except CancelledError:
+                    self.assertEqual(c, current_context())
+                    # Succeed with no error, unless the logging context is wrong.
+
+        # Run and block inside `inner()`.
+        d = defer.ensureDeferred(outer())
+        self.assertEqual(SENTINEL_CONTEXT, current_context())
+
+        d.cancel()
+
+        # Now unblock. `outer()` will consume the `CancelledError` and check the
+        # logging context.
+        blocking_d.callback(None)
+        self.successResultOf(d)