diff options
author | Sean Quah <8349537+squahtx@users.noreply.github.com> | 2022-03-14 17:52:15 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-03-14 17:52:15 +0000 |
commit | 90b2327066d2343faa86c464a182b6f3c4422ecd (patch) | |
tree | 013f856afd16eda88efcf00415e5ad2024b266f2 /tests/util | |
parent | Deprecate the groups/communities endpoints and add an experimental configurat... (diff) | |
download | synapse-90b2327066d2343faa86c464a182b6f3c4422ecd.tar.xz |
Add `delay_cancellation` utility function (#12180)
`delay_cancellation` behaves like `stop_cancellation`, except it delays `CancelledError`s until the original `Deferred` resolves. This is handy for unifying cleanup paths and ensuring that uncancelled coroutines don't use finished logcontexts. Signed-off-by: Sean Quah <seanq@element.io>
Diffstat (limited to 'tests/util')
-rw-r--r-- | tests/util/test_async_helpers.py | 124 |
1 files changed, 118 insertions, 6 deletions
diff --git a/tests/util/test_async_helpers.py b/tests/util/test_async_helpers.py index ff53ce114b..e5bc416de1 100644 --- a/tests/util/test_async_helpers.py +++ b/tests/util/test_async_helpers.py @@ -13,6 +13,8 @@ # limitations under the License. import traceback +from parameterized import parameterized_class + from twisted.internet import defer from twisted.internet.defer import CancelledError, Deferred, ensureDeferred from twisted.internet.task import Clock @@ -23,10 +25,12 @@ from synapse.logging.context import ( LoggingContext, PreserveLoggingContext, current_context, + make_deferred_yieldable, ) from synapse.util.async_helpers import ( ObservableDeferred, concurrently_execute, + delay_cancellation, stop_cancellation, timeout_deferred, ) @@ -313,13 +317,27 @@ class ConcurrentlyExecuteTest(TestCase): self.successResultOf(d2) -class StopCancellationTests(TestCase): - """Tests for the `stop_cancellation` function.""" +@parameterized_class( + ("wrapper",), + [("stop_cancellation",), ("delay_cancellation",)], +) +class CancellationWrapperTests(TestCase): + """Common tests for the `stop_cancellation` and `delay_cancellation` functions.""" + + wrapper: str + + def wrap_deferred(self, deferred: "Deferred[str]") -> "Deferred[str]": + if self.wrapper == "stop_cancellation": + return stop_cancellation(deferred) + elif self.wrapper == "delay_cancellation": + return delay_cancellation(deferred) + else: + raise ValueError(f"Unsupported wrapper type: {self.wrapper}") def test_succeed(self): """Test that the new `Deferred` receives the result.""" deferred: "Deferred[str]" = Deferred() - wrapper_deferred = stop_cancellation(deferred) + wrapper_deferred = self.wrap_deferred(deferred) # Success should propagate through. deferred.callback("success") @@ -329,7 +347,7 @@ class StopCancellationTests(TestCase): def test_failure(self): """Test that the new `Deferred` receives the `Failure`.""" deferred: "Deferred[str]" = Deferred() - wrapper_deferred = stop_cancellation(deferred) + wrapper_deferred = self.wrap_deferred(deferred) # Failure should propagate through. deferred.errback(ValueError("abc")) @@ -337,6 +355,10 @@ class StopCancellationTests(TestCase): self.failureResultOf(wrapper_deferred, ValueError) self.assertIsNone(deferred.result, "`Failure` was not consumed") + +class StopCancellationTests(TestCase): + """Tests for the `stop_cancellation` function.""" + def test_cancellation(self): """Test that cancellation of the new `Deferred` leaves the original running.""" deferred: "Deferred[str]" = Deferred() @@ -347,11 +369,101 @@ class StopCancellationTests(TestCase): self.assertTrue(wrapper_deferred.called) self.failureResultOf(wrapper_deferred, CancelledError) self.assertFalse( - deferred.called, "Original `Deferred` was unexpectedly cancelled." + deferred.called, "Original `Deferred` was unexpectedly cancelled" + ) + + # Now make the original `Deferred` fail. + # The `Failure` must be consumed, otherwise unwanted tracebacks will be printed + # in logs. + deferred.errback(ValueError("abc")) + self.assertIsNone(deferred.result, "`Failure` was not consumed") + + +class DelayCancellationTests(TestCase): + """Tests for the `delay_cancellation` function.""" + + def test_cancellation(self): + """Test that cancellation of the new `Deferred` waits for the original.""" + deferred: "Deferred[str]" = Deferred() + wrapper_deferred = delay_cancellation(deferred) + + # Cancel the new `Deferred`. + wrapper_deferred.cancel() + self.assertNoResult(wrapper_deferred) + self.assertFalse( + deferred.called, "Original `Deferred` was unexpectedly cancelled" + ) + + # Now make the original `Deferred` fail. + # The `Failure` must be consumed, otherwise unwanted tracebacks will be printed + # in logs. + deferred.errback(ValueError("abc")) + self.assertIsNone(deferred.result, "`Failure` was not consumed") + + # Now that the original `Deferred` has failed, we should get a `CancelledError`. + self.failureResultOf(wrapper_deferred, CancelledError) + + def test_suppresses_second_cancellation(self): + """Test that a second cancellation is suppressed. + + Identical to `test_cancellation` except the new `Deferred` is cancelled twice. + """ + deferred: "Deferred[str]" = Deferred() + wrapper_deferred = delay_cancellation(deferred) + + # Cancel the new `Deferred`, twice. + wrapper_deferred.cancel() + wrapper_deferred.cancel() + self.assertNoResult(wrapper_deferred) + self.assertFalse( + deferred.called, "Original `Deferred` was unexpectedly cancelled" ) - # Now make the inner `Deferred` fail. + # Now make the original `Deferred` fail. # The `Failure` must be consumed, otherwise unwanted tracebacks will be printed # in logs. deferred.errback(ValueError("abc")) self.assertIsNone(deferred.result, "`Failure` was not consumed") + + # Now that the original `Deferred` has failed, we should get a `CancelledError`. + self.failureResultOf(wrapper_deferred, CancelledError) + + def test_propagates_cancelled_error(self): + """Test that a `CancelledError` from the original `Deferred` gets propagated.""" + deferred: "Deferred[str]" = Deferred() + wrapper_deferred = delay_cancellation(deferred) + + # Fail the original `Deferred` with a `CancelledError`. + cancelled_error = CancelledError() + deferred.errback(cancelled_error) + + # The new `Deferred` should fail with exactly the same `CancelledError`. + self.assertTrue(wrapper_deferred.called) + self.assertIs(cancelled_error, self.failureResultOf(wrapper_deferred).value) + + def test_preserves_logcontext(self): + """Test that logging contexts are preserved.""" + blocking_d: "Deferred[None]" = Deferred() + + async def inner(): + await make_deferred_yieldable(blocking_d) + + async def outer(): + with LoggingContext("c") as c: + try: + await delay_cancellation(defer.ensureDeferred(inner())) + self.fail("`CancelledError` was not raised") + except CancelledError: + self.assertEqual(c, current_context()) + # Succeed with no error, unless the logging context is wrong. + + # Run and block inside `inner()`. + d = defer.ensureDeferred(outer()) + self.assertEqual(SENTINEL_CONTEXT, current_context()) + + d.cancel() + + # Now unblock. `outer()` will consume the `CancelledError` and check the + # logging context. + blocking_d.callback(None) + self.successResultOf(d) |