summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/4407.bugfix1
-rw-r--r--synapse/python_dependencies.py11
-rw-r--r--synapse/util/async_helpers.py4
-rw-r--r--tests/util/test_async_utils.py104
4 files changed, 117 insertions, 3 deletions
diff --git a/changelog.d/4407.bugfix b/changelog.d/4407.bugfix
new file mode 100644
index 0000000000..54c5e76d1f
--- /dev/null
+++ b/changelog.d/4407.bugfix
@@ -0,0 +1 @@
+Fix incorrect logcontexts after a Deferred was cancelled
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index 2fe790f95d..882e844eb1 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -40,7 +40,11 @@ REQUIREMENTS = [
     "signedjson>=1.0.0",
     "pynacl>=1.2.1",
     "service_identity>=16.0.0",
-    "Twisted>=17.1.0",
+
+    # our logcontext handling relies on the ability to cancel inlineCallbacks
+    # (https://twistedmatrix.com/trac/ticket/4632) which landed in Twisted 18.7.
+    "Twisted>=18.7.0",
+
     "treq>=15.1",
     # Twisted has required pyopenssl 16.0 since about Twisted 16.6.
     "pyopenssl>=16.0.0",
@@ -59,8 +63,11 @@ REQUIREMENTS = [
     # prometheus_client 0.4.0 changed the format of counter metrics
     # (cf https://github.com/matrix-org/synapse/issues/4001)
     "prometheus_client>=0.0.18,<0.4.0",
+
     # we use attr.s(slots), which arrived in 16.0.0
-    "attrs>=16.0.0",
+    # Twisted 18.7.0 requires attrs>=17.4.0
+    "attrs>=17.4.0",
+
     "netaddr>=0.7.18",
 ]
 
diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py
index ec7b2c9672..430bb15f51 100644
--- a/synapse/util/async_helpers.py
+++ b/synapse/util/async_helpers.py
@@ -387,12 +387,14 @@ def timeout_deferred(deferred, timeout, reactor, on_timeout_cancel=None):
     deferred that wraps and times out the given deferred, correctly handling
     the case where the given deferred's canceller throws.
 
+    (See https://twistedmatrix.com/trac/ticket/9534)
+
     NOTE: Unlike `Deferred.addTimeout`, this function returns a new deferred
 
     Args:
         deferred (Deferred)
         timeout (float): Timeout in seconds
-        reactor (twisted.internet.reactor): The twisted reactor to use
+        reactor (twisted.interfaces.IReactorTime): The twisted reactor to use
         on_timeout_cancel (callable): A callable which is called immediately
             after the deferred times out, and not if this deferred is
             otherwise cancelled before the timeout.
diff --git a/tests/util/test_async_utils.py b/tests/util/test_async_utils.py
new file mode 100644
index 0000000000..84dd71e47a
--- /dev/null
+++ b/tests/util/test_async_utils.py
@@ -0,0 +1,104 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from twisted.internet import defer
+from twisted.internet.defer import CancelledError, Deferred
+from twisted.internet.task import Clock
+
+from synapse.util import logcontext
+from synapse.util.async_helpers import timeout_deferred
+from synapse.util.logcontext import LoggingContext
+
+from tests.unittest import TestCase
+
+
+class TimeoutDeferredTest(TestCase):
+    def setUp(self):
+        self.clock = Clock()
+
+    def test_times_out(self):
+        """Basic test case that checks that the original deferred is cancelled and that
+        the timing-out deferred is errbacked
+        """
+        cancelled = [False]
+
+        def canceller(_d):
+            cancelled[0] = True
+
+        non_completing_d = Deferred(canceller)
+        timing_out_d = timeout_deferred(non_completing_d, 1.0, self.clock)
+
+        self.assertNoResult(timing_out_d)
+        self.assertFalse(cancelled[0], "deferred was cancelled prematurely")
+
+        self.clock.pump((1.0, ))
+
+        self.assertTrue(cancelled[0], "deferred was not cancelled by timeout")
+        self.failureResultOf(timing_out_d, defer.TimeoutError, )
+
+    def test_times_out_when_canceller_throws(self):
+        """Test that we have successfully worked around
+        https://twistedmatrix.com/trac/ticket/9534"""
+
+        def canceller(_d):
+            raise Exception("can't cancel this deferred")
+
+        non_completing_d = Deferred(canceller)
+        timing_out_d = timeout_deferred(non_completing_d, 1.0, self.clock)
+
+        self.assertNoResult(timing_out_d)
+
+        self.clock.pump((1.0, ))
+
+        self.failureResultOf(timing_out_d, defer.TimeoutError, )
+
+    def test_logcontext_is_preserved_on_cancellation(self):
+        blocking_was_cancelled = [False]
+
+        @defer.inlineCallbacks
+        def blocking():
+            non_completing_d = Deferred()
+            with logcontext.PreserveLoggingContext():
+                try:
+                    yield non_completing_d
+                except CancelledError:
+                    blocking_was_cancelled[0] = True
+                    raise
+
+        with logcontext.LoggingContext("one") as context_one:
+            # the errbacks should be run in the test logcontext
+            def errback(res, deferred_name):
+                self.assertIs(
+                    LoggingContext.current_context(), context_one,
+                    "errback %s run in unexpected logcontext %s" % (
+                        deferred_name, LoggingContext.current_context(),
+                    )
+                )
+                return res
+
+            original_deferred = blocking()
+            original_deferred.addErrback(errback, "orig")
+            timing_out_d = timeout_deferred(original_deferred, 1.0, self.clock)
+            self.assertNoResult(timing_out_d)
+            self.assertIs(LoggingContext.current_context(), LoggingContext.sentinel)
+            timing_out_d.addErrback(errback, "timingout")
+
+            self.clock.pump((1.0, ))
+
+            self.assertTrue(
+                blocking_was_cancelled[0],
+                "non-completing deferred was not cancelled",
+            )
+            self.failureResultOf(timing_out_d, defer.TimeoutError, )
+            self.assertIs(LoggingContext.current_context(), context_one)