summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/http/client.py4
-rw-r--r--synapse/http/matrixfederationclient.py25
-rw-r--r--synapse/notifier.py13
-rw-r--r--synapse/util/async_helpers.py73
4 files changed, 43 insertions, 72 deletions
diff --git a/synapse/http/client.py b/synapse/http/client.py
index ec339a92ad..3d05f83b8c 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -43,7 +43,7 @@ from twisted.web.http_headers import Headers
 from synapse.api.errors import Codes, HttpResponseException, SynapseError
 from synapse.http import cancelled_to_request_timed_out_error, redact_uri
 from synapse.http.endpoint import SpiderEndpoint
-from synapse.util.async_helpers import add_timeout_to_deferred
+from synapse.util.async_helpers import timeout_deferred
 from synapse.util.caches import CACHE_SIZE_FACTOR
 from synapse.util.logcontext import make_deferred_yieldable
 
@@ -99,7 +99,7 @@ class SimpleHttpClient(object):
             request_deferred = treq.request(
                 method, uri, agent=self.agent, data=data, headers=headers
             )
-            add_timeout_to_deferred(
+            request_deferred = timeout_deferred(
                 request_deferred, 60, self.hs.get_reactor(),
                 cancelled_to_request_timed_out_error,
             )
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 530c0245a9..14b12cd1c4 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -44,7 +44,7 @@ from synapse.api.errors import (
     SynapseError,
 )
 from synapse.http.endpoint import matrix_federation_endpoint
-from synapse.util.async_helpers import timeout_no_seriously
+from synapse.util.async_helpers import timeout_deferred
 from synapse.util.logcontext import make_deferred_yieldable
 from synapse.util.metrics import Measure
 
@@ -145,8 +145,14 @@ def _handle_json_response(reactor, timeout_sec, request, response):
     """
     try:
         check_content_type_is_json(response.headers)
+
         d = treq.json_content(response)
-        d.addTimeout(timeout_sec, reactor)
+        d = timeout_deferred(
+            d,
+            timeout=timeout_sec,
+            reactor=reactor,
+        )
+
         body = yield make_deferred_yieldable(d)
     except Exception as e:
         logger.warn(
@@ -321,15 +327,10 @@ class MatrixFederationHttpClient(object):
                         reactor=self.hs.get_reactor(),
                         unbuffered=True
                     )
-                    request_deferred.addTimeout(_sec_timeout, self.hs.get_reactor())
 
-                    # Sometimes the timeout above doesn't work, so lets hack yet
-                    # another layer of timeouts in in the vain hope that at some
-                    # point the world made sense and this really really really
-                    # should work.
-                    request_deferred = timeout_no_seriously(
+                    request_deferred = timeout_deferred(
                         request_deferred,
-                        timeout=_sec_timeout * 2,
+                        timeout=_sec_timeout,
                         reactor=self.hs.get_reactor(),
                     )
 
@@ -388,7 +389,11 @@ class MatrixFederationHttpClient(object):
                 # :'(
                 # Update transactions table?
                 d = treq.content(response)
-                d.addTimeout(_sec_timeout, self.hs.get_reactor())
+                d = timeout_deferred(
+                    d,
+                    timeout=_sec_timeout,
+                    reactor=self.hs.get_reactor(),
+                )
                 body = yield make_deferred_yieldable(d)
                 raise HttpResponseException(
                     response.code, response.phrase, body
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 82f391481c..2d683718fb 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -28,7 +28,7 @@ from synapse.types import StreamToken
 from synapse.util.async_helpers import (
     DeferredTimeoutError,
     ObservableDeferred,
-    add_timeout_to_deferred,
+    timeout_deferred,
 )
 from synapse.util.logcontext import PreserveLoggingContext, run_in_background
 from synapse.util.logutils import log_function
@@ -337,7 +337,7 @@ class Notifier(object):
                     # Now we wait for the _NotifierUserStream to be told there
                     # is a new token.
                     listener = user_stream.new_listener(prev_token)
-                    add_timeout_to_deferred(
+                    listener.deferred = timeout_deferred(
                         listener.deferred,
                         (end_time - now) / 1000.,
                         self.hs.get_reactor(),
@@ -559,11 +559,12 @@ class Notifier(object):
             if end_time <= now:
                 break
 
-            add_timeout_to_deferred(
-                listener.deferred.addTimeout,
-                (end_time - now) / 1000.,
-                self.hs.get_reactor(),
+            listener.deferred = timeout_deferred(
+                listener.deferred,
+                timeout=(end_time - now) / 1000.,
+                reactor=self.hs.get_reactor(),
             )
+
             try:
                 with PreserveLoggingContext():
                     yield listener.deferred
diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py
index 40c2946129..2e12bcda98 100644
--- a/synapse/util/async_helpers.py
+++ b/synapse/util/async_helpers.py
@@ -380,23 +380,25 @@ class DeferredTimeoutError(Exception):
     """
 
 
-def add_timeout_to_deferred(deferred, timeout, reactor, on_timeout_cancel=None):
-    """
-    Add a timeout to a deferred by scheduling it to be cancelled after
-    timeout seconds.
+def _cancelled_to_timed_out_error(value, timeout):
+    if isinstance(value, failure.Failure):
+        value.trap(CancelledError)
+        raise DeferredTimeoutError(timeout, "Deferred")
+    return value
 
-    This is essentially a backport of deferred.addTimeout, which was introduced
-    in twisted 16.5.
 
-    If the deferred gets timed out, it errbacks with a DeferredTimeoutError,
-    unless a cancelable function was passed to its initialization or unless
-    a different on_timeout_cancel callable is provided.
+def timeout_deferred(deferred, timeout, reactor, on_timeout_cancel=None):
+    """The in built twisted `Deferred.addTimeout` fails to time out deferreds
+    that have a canceller that throws exceptions. This method creates a new
+    deferred that wraps and times out the given deferred, correctly handling
+    the case where the given deferred's canceller throws.
 
-    Args:
-        deferred (defer.Deferred): deferred to be timed out
-        timeout (Number): seconds to time out after
-        reactor (twisted.internet.reactor): the Twisted reactor to use
+    NOTE: Unlike `Deferred.addTimeout`, this function returns a new deferred
 
+    Args:
+        deferred (Deferred)
+        timeout (float): Timeout in seconds
+        reactor (twisted.internet.reactor): The twisted reactor to use
         on_timeout_cancel (callable): A callable which is called immediately
             after the deferred times out, and not if this deferred is
             otherwise cancelled before the timeout.
@@ -407,47 +409,9 @@ def add_timeout_to_deferred(deferred, timeout, reactor, on_timeout_cancel=None):
 
             The default callable (if none is provided) will translate a
             CancelledError Failure into a DeferredTimeoutError.
-    """
-    timed_out = [False]
 
-    def time_it_out():
-        timed_out[0] = True
-        deferred.cancel()
-
-    delayed_call = reactor.callLater(timeout, time_it_out)
-
-    def convert_cancelled(value):
-        if timed_out[0]:
-            to_call = on_timeout_cancel or _cancelled_to_timed_out_error
-            return to_call(value, timeout)
-        return value
-
-    deferred.addBoth(convert_cancelled)
-
-    def cancel_timeout(result):
-        # stop the pending call to cancel the deferred if it's been fired
-        if delayed_call.active():
-            delayed_call.cancel()
-        return result
-
-    deferred.addBoth(cancel_timeout)
-
-
-def _cancelled_to_timed_out_error(value, timeout):
-    if isinstance(value, failure.Failure):
-        value.trap(CancelledError)
-        raise DeferredTimeoutError(timeout, "Deferred")
-    return value
-
-
-def timeout_no_seriously(deferred, timeout, reactor):
-    """The in build twisted deferred addTimeout (and the method above)
-    completely fail to time things out under some unknown circumstances.
-
-    Lets try a different way of timing things out and maybe that will make
-    things work?!
-
-    TODO: Kill this with fire.
+    Returns:
+        Deferred
     """
 
     new_d = defer.Deferred()
@@ -466,7 +430,8 @@ def timeout_no_seriously(deferred, timeout, reactor):
 
     def convert_cancelled(value):
         if timed_out[0]:
-            return _cancelled_to_timed_out_error(value, timeout)
+            to_call = on_timeout_cancel or _cancelled_to_timed_out_error
+            return to_call(value, timeout)
         return value
 
     deferred.addBoth(convert_cancelled)