diff --git a/synapse/http/client.py b/synapse/http/client.py
index ec339a92ad..3d05f83b8c 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -43,7 +43,7 @@ from twisted.web.http_headers import Headers
from synapse.api.errors import Codes, HttpResponseException, SynapseError
from synapse.http import cancelled_to_request_timed_out_error, redact_uri
from synapse.http.endpoint import SpiderEndpoint
-from synapse.util.async_helpers import add_timeout_to_deferred
+from synapse.util.async_helpers import timeout_deferred
from synapse.util.caches import CACHE_SIZE_FACTOR
from synapse.util.logcontext import make_deferred_yieldable
@@ -99,7 +99,7 @@ class SimpleHttpClient(object):
request_deferred = treq.request(
method, uri, agent=self.agent, data=data, headers=headers
)
- add_timeout_to_deferred(
+ request_deferred = timeout_deferred(
request_deferred, 60, self.hs.get_reactor(),
cancelled_to_request_timed_out_error,
)
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 530c0245a9..14b12cd1c4 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -44,7 +44,7 @@ from synapse.api.errors import (
SynapseError,
)
from synapse.http.endpoint import matrix_federation_endpoint
-from synapse.util.async_helpers import timeout_no_seriously
+from synapse.util.async_helpers import timeout_deferred
from synapse.util.logcontext import make_deferred_yieldable
from synapse.util.metrics import Measure
@@ -145,8 +145,14 @@ def _handle_json_response(reactor, timeout_sec, request, response):
"""
try:
check_content_type_is_json(response.headers)
+
d = treq.json_content(response)
- d.addTimeout(timeout_sec, reactor)
+ d = timeout_deferred(
+ d,
+ timeout=timeout_sec,
+ reactor=reactor,
+ )
+
body = yield make_deferred_yieldable(d)
except Exception as e:
logger.warn(
@@ -321,15 +327,10 @@ class MatrixFederationHttpClient(object):
reactor=self.hs.get_reactor(),
unbuffered=True
)
- request_deferred.addTimeout(_sec_timeout, self.hs.get_reactor())
- # Sometimes the timeout above doesn't work, so lets hack yet
- # another layer of timeouts in in the vain hope that at some
- # point the world made sense and this really really really
- # should work.
- request_deferred = timeout_no_seriously(
+ request_deferred = timeout_deferred(
request_deferred,
- timeout=_sec_timeout * 2,
+ timeout=_sec_timeout,
reactor=self.hs.get_reactor(),
)
@@ -388,7 +389,11 @@ class MatrixFederationHttpClient(object):
# :'(
# Update transactions table?
d = treq.content(response)
- d.addTimeout(_sec_timeout, self.hs.get_reactor())
+ d = timeout_deferred(
+ d,
+ timeout=_sec_timeout,
+ reactor=self.hs.get_reactor(),
+ )
body = yield make_deferred_yieldable(d)
raise HttpResponseException(
response.code, response.phrase, body
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 82f391481c..f1d92c1395 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -25,11 +25,7 @@ from synapse.api.errors import AuthError
from synapse.handlers.presence import format_user_presence_state
from synapse.metrics import LaterGauge
from synapse.types import StreamToken
-from synapse.util.async_helpers import (
- DeferredTimeoutError,
- ObservableDeferred,
- add_timeout_to_deferred,
-)
+from synapse.util.async_helpers import ObservableDeferred, timeout_deferred
from synapse.util.logcontext import PreserveLoggingContext, run_in_background
from synapse.util.logutils import log_function
from synapse.util.metrics import Measure
@@ -337,7 +333,7 @@ class Notifier(object):
# Now we wait for the _NotifierUserStream to be told there
# is a new token.
listener = user_stream.new_listener(prev_token)
- add_timeout_to_deferred(
+ listener.deferred = timeout_deferred(
listener.deferred,
(end_time - now) / 1000.,
self.hs.get_reactor(),
@@ -354,7 +350,7 @@ class Notifier(object):
# Update the prev_token to the current_token since nothing
# has happened between the old prev_token and the current_token
prev_token = current_token
- except DeferredTimeoutError:
+ except defer.TimeoutError:
break
except defer.CancelledError:
break
@@ -559,15 +555,16 @@ class Notifier(object):
if end_time <= now:
break
- add_timeout_to_deferred(
- listener.deferred.addTimeout,
- (end_time - now) / 1000.,
- self.hs.get_reactor(),
+ listener.deferred = timeout_deferred(
+ listener.deferred,
+ timeout=(end_time - now) / 1000.,
+ reactor=self.hs.get_reactor(),
)
+
try:
with PreserveLoggingContext():
yield listener.deferred
- except DeferredTimeoutError:
+ except defer.TimeoutError:
break
except defer.CancelledError:
break
diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py
index 40c2946129..ec7b2c9672 100644
--- a/synapse/util/async_helpers.py
+++ b/synapse/util/async_helpers.py
@@ -374,29 +374,25 @@ class ReadWriteLock(object):
defer.returnValue(_ctx_manager())
-class DeferredTimeoutError(Exception):
- """
- This error is raised by default when a L{Deferred} times out.
- """
-
+def _cancelled_to_timed_out_error(value, timeout):
+ if isinstance(value, failure.Failure):
+ value.trap(CancelledError)
+ raise defer.TimeoutError(timeout, "Deferred")
+ return value
-def add_timeout_to_deferred(deferred, timeout, reactor, on_timeout_cancel=None):
- """
- Add a timeout to a deferred by scheduling it to be cancelled after
- timeout seconds.
- This is essentially a backport of deferred.addTimeout, which was introduced
- in twisted 16.5.
+def timeout_deferred(deferred, timeout, reactor, on_timeout_cancel=None):
+ """The in built twisted `Deferred.addTimeout` fails to time out deferreds
+ that have a canceller that throws exceptions. This method creates a new
+ deferred that wraps and times out the given deferred, correctly handling
+ the case where the given deferred's canceller throws.
- If the deferred gets timed out, it errbacks with a DeferredTimeoutError,
- unless a cancelable function was passed to its initialization or unless
- a different on_timeout_cancel callable is provided.
+ NOTE: Unlike `Deferred.addTimeout`, this function returns a new deferred
Args:
- deferred (defer.Deferred): deferred to be timed out
- timeout (Number): seconds to time out after
- reactor (twisted.internet.reactor): the Twisted reactor to use
-
+ deferred (Deferred)
+ timeout (float): Timeout in seconds
+ reactor (twisted.internet.reactor): The twisted reactor to use
on_timeout_cancel (callable): A callable which is called immediately
after the deferred times out, and not if this deferred is
otherwise cancelled before the timeout.
@@ -406,48 +402,10 @@ def add_timeout_to_deferred(deferred, timeout, reactor, on_timeout_cancel=None):
the timeout.
The default callable (if none is provided) will translate a
- CancelledError Failure into a DeferredTimeoutError.
- """
- timed_out = [False]
-
- def time_it_out():
- timed_out[0] = True
- deferred.cancel()
-
- delayed_call = reactor.callLater(timeout, time_it_out)
-
- def convert_cancelled(value):
- if timed_out[0]:
- to_call = on_timeout_cancel or _cancelled_to_timed_out_error
- return to_call(value, timeout)
- return value
-
- deferred.addBoth(convert_cancelled)
+ CancelledError Failure into a defer.TimeoutError.
- def cancel_timeout(result):
- # stop the pending call to cancel the deferred if it's been fired
- if delayed_call.active():
- delayed_call.cancel()
- return result
-
- deferred.addBoth(cancel_timeout)
-
-
-def _cancelled_to_timed_out_error(value, timeout):
- if isinstance(value, failure.Failure):
- value.trap(CancelledError)
- raise DeferredTimeoutError(timeout, "Deferred")
- return value
-
-
-def timeout_no_seriously(deferred, timeout, reactor):
- """The in build twisted deferred addTimeout (and the method above)
- completely fail to time things out under some unknown circumstances.
-
- Lets try a different way of timing things out and maybe that will make
- things work?!
-
- TODO: Kill this with fire.
+ Returns:
+ Deferred
"""
new_d = defer.Deferred()
@@ -457,16 +415,20 @@ def timeout_no_seriously(deferred, timeout, reactor):
def time_it_out():
timed_out[0] = True
- if not new_d.called:
- new_d.errback(DeferredTimeoutError(timeout, "Deferred"))
+ try:
+ deferred.cancel()
+ except: # noqa: E722, if we throw any exception it'll break time outs
+ logger.exception("Canceller failed during timeout")
- deferred.cancel()
+ if not new_d.called:
+ new_d.errback(defer.TimeoutError(timeout, "Deferred"))
delayed_call = reactor.callLater(timeout, time_it_out)
def convert_cancelled(value):
if timed_out[0]:
- return _cancelled_to_timed_out_error(value, timeout)
+ to_call = on_timeout_cancel or _cancelled_to_timed_out_error
+ return to_call(value, timeout)
return value
deferred.addBoth(convert_cancelled)
|