diff options
Diffstat (limited to 'tests/util/test_async_helpers.py')
-rw-r--r-- | tests/util/test_async_helpers.py | 118 |
1 files changed, 63 insertions, 55 deletions
diff --git a/tests/util/test_async_helpers.py b/tests/util/test_async_helpers.py index 9d5010bf92..91cac9822a 100644 --- a/tests/util/test_async_helpers.py +++ b/tests/util/test_async_helpers.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import traceback +from typing import Generator, List, NoReturn, Optional from parameterized import parameterized_class @@ -41,8 +42,8 @@ from tests.unittest import TestCase class ObservableDeferredTest(TestCase): - def test_succeed(self): - origin_d = Deferred() + def test_succeed(self) -> None: + origin_d: "Deferred[int]" = Deferred() observable = ObservableDeferred(origin_d) observer1 = observable.observe() @@ -52,16 +53,18 @@ class ObservableDeferredTest(TestCase): self.assertFalse(observer2.called) # check the first observer is called first - def check_called_first(res): + def check_called_first(res: int) -> int: self.assertFalse(observer2.called) return res observer1.addBoth(check_called_first) # store the results - results = [None, None] + results: List[Optional[ObservableDeferred[int]]] = [None, None] - def check_val(res, idx): + def check_val( + res: ObservableDeferred[int], idx: int + ) -> ObservableDeferred[int]: results[idx] = res return res @@ -72,8 +75,8 @@ class ObservableDeferredTest(TestCase): self.assertEqual(results[0], 123, "observer 1 callback result") self.assertEqual(results[1], 123, "observer 2 callback result") - def test_failure(self): - origin_d = Deferred() + def test_failure(self) -> None: + origin_d: Deferred = Deferred() observable = ObservableDeferred(origin_d, consumeErrors=True) observer1 = observable.observe() @@ -83,16 +86,16 @@ class ObservableDeferredTest(TestCase): self.assertFalse(observer2.called) # check the first observer is called first - def check_called_first(res): + def check_called_first(res: int) -> int: self.assertFalse(observer2.called) return res observer1.addBoth(check_called_first) # store the results - results = [None, None] + results: List[Optional[ObservableDeferred[str]]] = [None, None] - def check_val(res, idx): + def check_val(res: ObservableDeferred[str], idx: int) -> None: results[idx] = res return None @@ -103,10 +106,12 @@ class ObservableDeferredTest(TestCase): raise Exception("gah!") except Exception as e: origin_d.errback(e) + assert results[0] is not None self.assertEqual(str(results[0].value), "gah!", "observer 1 errback result") + assert results[1] is not None self.assertEqual(str(results[1].value), "gah!", "observer 2 errback result") - def test_cancellation(self): + def test_cancellation(self) -> None: """Test that cancelling an observer does not affect other observers.""" origin_d: "Deferred[int]" = Deferred() observable = ObservableDeferred(origin_d, consumeErrors=True) @@ -136,37 +141,38 @@ class ObservableDeferredTest(TestCase): class TimeoutDeferredTest(TestCase): - def setUp(self): + def setUp(self) -> None: self.clock = Clock() - def test_times_out(self): + def test_times_out(self) -> None: """Basic test case that checks that the original deferred is cancelled and that the timing-out deferred is errbacked """ - cancelled = [False] + cancelled = False - def canceller(_d): - cancelled[0] = True + def canceller(_d: Deferred) -> None: + nonlocal cancelled + cancelled = True - non_completing_d = Deferred(canceller) + non_completing_d: Deferred = Deferred(canceller) timing_out_d = timeout_deferred(non_completing_d, 1.0, self.clock) self.assertNoResult(timing_out_d) - self.assertFalse(cancelled[0], "deferred was cancelled prematurely") + self.assertFalse(cancelled, "deferred was cancelled prematurely") self.clock.pump((1.0,)) - self.assertTrue(cancelled[0], "deferred was not cancelled by timeout") + self.assertTrue(cancelled, "deferred was not cancelled by timeout") self.failureResultOf(timing_out_d, defer.TimeoutError) - def test_times_out_when_canceller_throws(self): + def test_times_out_when_canceller_throws(self) -> None: """Test that we have successfully worked around https://twistedmatrix.com/trac/ticket/9534""" - def canceller(_d): + def canceller(_d: Deferred) -> None: raise Exception("can't cancel this deferred") - non_completing_d = Deferred(canceller) + non_completing_d: Deferred = Deferred(canceller) timing_out_d = timeout_deferred(non_completing_d, 1.0, self.clock) self.assertNoResult(timing_out_d) @@ -175,22 +181,24 @@ class TimeoutDeferredTest(TestCase): self.failureResultOf(timing_out_d, defer.TimeoutError) - def test_logcontext_is_preserved_on_cancellation(self): - blocking_was_cancelled = [False] + def test_logcontext_is_preserved_on_cancellation(self) -> None: + blocking_was_cancelled = False @defer.inlineCallbacks - def blocking(): - non_completing_d = Deferred() + def blocking() -> Generator["Deferred[object]", object, None]: + nonlocal blocking_was_cancelled + + non_completing_d: Deferred = Deferred() with PreserveLoggingContext(): try: yield non_completing_d except CancelledError: - blocking_was_cancelled[0] = True + blocking_was_cancelled = True raise with LoggingContext("one") as context_one: # the errbacks should be run in the test logcontext - def errback(res, deferred_name): + def errback(res: Failure, deferred_name: str) -> Failure: self.assertIs( current_context(), context_one, @@ -209,7 +217,7 @@ class TimeoutDeferredTest(TestCase): self.clock.pump((1.0,)) self.assertTrue( - blocking_was_cancelled[0], "non-completing deferred was not cancelled" + blocking_was_cancelled, "non-completing deferred was not cancelled" ) self.failureResultOf(timing_out_d, defer.TimeoutError) self.assertIs(current_context(), context_one) @@ -220,13 +228,13 @@ class _TestException(Exception): class ConcurrentlyExecuteTest(TestCase): - def test_limits_runners(self): + def test_limits_runners(self) -> None: """If we have more tasks than runners, we should get the limit of runners""" started = 0 waiters = [] processed = [] - async def callback(v): + async def callback(v: int) -> None: # when we first enter, bump the start count nonlocal started started += 1 @@ -235,7 +243,7 @@ class ConcurrentlyExecuteTest(TestCase): processed.append(v) # wait for the goahead before returning - d2 = Deferred() + d2: "Deferred[int]" = Deferred() waiters.append(d2) await d2 @@ -265,16 +273,16 @@ class ConcurrentlyExecuteTest(TestCase): self.assertCountEqual(processed, [1, 2, 3, 4, 5]) self.successResultOf(d2) - def test_preserves_stacktraces(self): + def test_preserves_stacktraces(self) -> None: """Test that the stacktrace from an exception thrown in the callback is preserved""" - d1 = Deferred() + d1: "Deferred[int]" = Deferred() - async def callback(v): + async def callback(v: int) -> None: # alas, this doesn't work at all without an await here await d1 raise _TestException("bah") - async def caller(): + async def caller() -> None: try: await concurrently_execute(callback, [1], 2) except _TestException as e: @@ -290,17 +298,17 @@ class ConcurrentlyExecuteTest(TestCase): d1.callback(0) self.successResultOf(d2) - def test_preserves_stacktraces_on_preformed_failure(self): + def test_preserves_stacktraces_on_preformed_failure(self) -> None: """Test that the stacktrace on a Failure returned by the callback is preserved""" - d1 = Deferred() + d1: "Deferred[int]" = Deferred() f = Failure(_TestException("bah")) - async def callback(v): + async def callback(v: int) -> None: # alas, this doesn't work at all without an await here await d1 await defer.fail(f) - async def caller(): + async def caller() -> None: try: await concurrently_execute(callback, [1], 2) except _TestException as e: @@ -336,7 +344,7 @@ class CancellationWrapperTests(TestCase): else: raise ValueError(f"Unsupported wrapper type: {self.wrapper}") - def test_succeed(self): + def test_succeed(self) -> None: """Test that the new `Deferred` receives the result.""" deferred: "Deferred[str]" = Deferred() wrapper_deferred = self.wrap_deferred(deferred) @@ -346,7 +354,7 @@ class CancellationWrapperTests(TestCase): self.assertTrue(wrapper_deferred.called) self.assertEqual("success", self.successResultOf(wrapper_deferred)) - def test_failure(self): + def test_failure(self) -> None: """Test that the new `Deferred` receives the `Failure`.""" deferred: "Deferred[str]" = Deferred() wrapper_deferred = self.wrap_deferred(deferred) @@ -361,7 +369,7 @@ class CancellationWrapperTests(TestCase): class StopCancellationTests(TestCase): """Tests for the `stop_cancellation` function.""" - def test_cancellation(self): + def test_cancellation(self) -> None: """Test that cancellation of the new `Deferred` leaves the original running.""" deferred: "Deferred[str]" = Deferred() wrapper_deferred = stop_cancellation(deferred) @@ -384,7 +392,7 @@ class StopCancellationTests(TestCase): class DelayCancellationTests(TestCase): """Tests for the `delay_cancellation` function.""" - def test_deferred_cancellation(self): + def test_deferred_cancellation(self) -> None: """Test that cancellation of the new `Deferred` waits for the original.""" deferred: "Deferred[str]" = Deferred() wrapper_deferred = delay_cancellation(deferred) @@ -405,12 +413,12 @@ class DelayCancellationTests(TestCase): # Now that the original `Deferred` has failed, we should get a `CancelledError`. self.failureResultOf(wrapper_deferred, CancelledError) - def test_coroutine_cancellation(self): + def test_coroutine_cancellation(self) -> None: """Test that cancellation of the new `Deferred` waits for the original.""" blocking_deferred: "Deferred[None]" = Deferred() completion_deferred: "Deferred[None]" = Deferred() - async def task(): + async def task() -> NoReturn: await blocking_deferred completion_deferred.callback(None) # Raise an exception. Twisted should consume it, otherwise unwanted @@ -434,7 +442,7 @@ class DelayCancellationTests(TestCase): # Now that the original coroutine has failed, we should get a `CancelledError`. self.failureResultOf(wrapper_deferred, CancelledError) - def test_suppresses_second_cancellation(self): + def test_suppresses_second_cancellation(self) -> None: """Test that a second cancellation is suppressed. Identical to `test_cancellation` except the new `Deferred` is cancelled twice. @@ -459,7 +467,7 @@ class DelayCancellationTests(TestCase): # Now that the original `Deferred` has failed, we should get a `CancelledError`. self.failureResultOf(wrapper_deferred, CancelledError) - def test_propagates_cancelled_error(self): + def test_propagates_cancelled_error(self) -> None: """Test that a `CancelledError` from the original `Deferred` gets propagated.""" deferred: "Deferred[str]" = Deferred() wrapper_deferred = delay_cancellation(deferred) @@ -472,14 +480,14 @@ class DelayCancellationTests(TestCase): self.assertTrue(wrapper_deferred.called) self.assertIs(cancelled_error, self.failureResultOf(wrapper_deferred).value) - def test_preserves_logcontext(self): + def test_preserves_logcontext(self) -> None: """Test that logging contexts are preserved.""" blocking_d: "Deferred[None]" = Deferred() - async def inner(): + async def inner() -> None: await make_deferred_yieldable(blocking_d) - async def outer(): + async def outer() -> None: with LoggingContext("c") as c: try: await delay_cancellation(inner()) @@ -503,7 +511,7 @@ class DelayCancellationTests(TestCase): class AwakenableSleeperTests(TestCase): "Tests AwakenableSleeper" - def test_sleep(self): + def test_sleep(self) -> None: reactor, _ = get_clock() sleeper = AwakenableSleeper(reactor) @@ -518,7 +526,7 @@ class AwakenableSleeperTests(TestCase): reactor.advance(0.6) self.assertTrue(d.called) - def test_explicit_wake(self): + def test_explicit_wake(self) -> None: reactor, _ = get_clock() sleeper = AwakenableSleeper(reactor) @@ -535,7 +543,7 @@ class AwakenableSleeperTests(TestCase): reactor.advance(0.6) - def test_multiple_sleepers_timeout(self): + def test_multiple_sleepers_timeout(self) -> None: reactor, _ = get_clock() sleeper = AwakenableSleeper(reactor) @@ -555,7 +563,7 @@ class AwakenableSleeperTests(TestCase): reactor.advance(0.6) self.assertTrue(d2.called) - def test_multiple_sleepers_wake(self): + def test_multiple_sleepers_wake(self) -> None: reactor, _ = get_clock() sleeper = AwakenableSleeper(reactor) |