diff --git a/synapse/http/__init__.py b/synapse/http/__init__.py
index 20e568bc43..0d47ccdb59 100644
--- a/synapse/http/__init__.py
+++ b/synapse/http/__init__.py
@@ -28,7 +28,7 @@ class RequestTimedOutError(SynapseError):
def cancelled_to_request_timed_out_error(value):
"""Turns CancelledErrors into RequestTimedOutErrors.
- For use with deferred.addTimeout()
+ For use with async.add_timeout_to_deferred
"""
if isinstance(value, failure.Failure):
value.trap(CancelledError)
diff --git a/synapse/http/client.py b/synapse/http/client.py
index 35c8d51e71..62309c3365 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -20,6 +20,7 @@ from synapse.api.errors import (
CodeMessageException, MatrixCodeMessageException, SynapseError, Codes,
)
from synapse.http import cancelled_to_request_timed_out_error
+from synapse.util.async import add_timeout_to_deferred
from synapse.util.caches import CACHE_SIZE_FACTOR
from synapse.util.logcontext import make_deferred_yieldable
import synapse.metrics
@@ -102,8 +103,9 @@ class SimpleHttpClient(object):
request_deferred = self.agent.request(
method, uri, *args, **kwargs
)
- request_deferred.addTimeout(
- 60, reactor, cancelled_to_request_timed_out_error,
+ add_timeout_to_deferred(
+ request_deferred,
+ 60, cancelled_to_request_timed_out_error,
)
response = yield make_deferred_yieldable(request_deferred)
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index fe4b1636a6..30036fe81c 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -22,7 +22,7 @@ from twisted.web._newclient import ResponseDone
from synapse.http import cancelled_to_request_timed_out_error
from synapse.http.endpoint import matrix_federation_endpoint
import synapse.metrics
-from synapse.util.async import sleep
+from synapse.util.async import sleep, add_timeout_to_deferred
from synapse.util import logcontext
from synapse.util.logcontext import make_deferred_yieldable
import synapse.util.retryutils
@@ -193,9 +193,10 @@ class MatrixFederationHttpClient(object):
Headers(headers_dict),
producer
)
- request_deferred.addTimeout(
+ add_timeout_to_deferred(
+ request_deferred,
timeout / 1000. if timeout else 60,
- reactor, cancelled_to_request_timed_out_error,
+ cancelled_to_request_timed_out_error,
)
response = yield make_deferred_yieldable(
request_deferred,
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 1e4f78b993..a1c06e8fca 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -13,15 +13,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from twisted.internet import defer, reactor
-from twisted.internet.defer import TimeoutError
+from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError
from synapse.handlers.presence import format_user_presence_state
from synapse.util.logutils import log_function
-from synapse.util.async import ObservableDeferred
+from synapse.util.async import (
+ ObservableDeferred, add_timeout_to_deferred,
+ DeferredTimeoutError,
+)
from synapse.util.logcontext import PreserveLoggingContext, preserve_fn
from synapse.util.metrics import Measure
from synapse.types import StreamToken
@@ -332,8 +334,9 @@ class Notifier(object):
# Now we wait for the _NotifierUserStream to be told there
# is a new token.
listener = user_stream.new_listener(prev_token)
- listener.deferred.addTimeout(
- (end_time - now) / 1000., reactor,
+ add_timeout_to_deferred(
+ listener.deferred,
+ (end_time - now) / 1000.,
)
with PreserveLoggingContext():
yield listener.deferred
@@ -347,7 +350,7 @@ class Notifier(object):
# Update the prev_token to the current_token since nothing
# has happened between the old prev_token and the current_token
prev_token = current_token
- except TimeoutError:
+ except DeferredTimeoutError:
break
except defer.CancelledError:
break
@@ -552,11 +555,14 @@ class Notifier(object):
if end_time <= now:
break
- listener.deferred.addTimeout((end_time - now) / 1000., reactor)
+ add_timeout_to_deferred(
+ listener.deferred.addTimeout,
+ (end_time - now) / 1000.,
+ )
try:
with PreserveLoggingContext():
yield listener.deferred
- except TimeoutError:
+ except DeferredTimeoutError:
break
except defer.CancelledError:
break
diff --git a/synapse/util/async.py b/synapse/util/async.py
index 0729bb2863..1df5c5600c 100644
--- a/synapse/util/async.py
+++ b/synapse/util/async.py
@@ -15,6 +15,8 @@
from twisted.internet import defer, reactor
+from twisted.internet.defer import CancelledError
+from twisted.python import failure
from .logcontext import (
PreserveLoggingContext, make_deferred_yieldable, preserve_fn
@@ -392,3 +394,68 @@ class ReadWriteLock(object):
self.key_to_current_writer.pop(key)
defer.returnValue(_ctx_manager())
+
+
+class DeferredTimeoutError(Exception):
+ """
+ This error is raised by default when a L{Deferred} times out.
+ """
+
+
+def add_timeout_to_deferred(deferred, timeout, on_timeout_cancel=None):
+ """
+ Add a timeout to a deferred by scheduling it to be cancelled after
+ timeout seconds.
+
+ This is essentially a backport of deferred.addTimeout, which was introduced
+ in twisted 16.5.
+
+ If the deferred gets timed out, it errbacks with a DeferredTimeoutError,
+ unless a cancelable function was passed to its initialization or unless
+ a different on_timeout_cancel callable is provided.
+
+ Args:
+ deferred (defer.Deferred): deferred to be timed out
+ timeout (Number): seconds to time out after
+
+ on_timeout_cancel (callable): A callable which is called immediately
+ after the deferred times out, and not if this deferred is
+ otherwise cancelled before the timeout.
+
+ It takes an arbitrary value, which is the value of the deferred at
+ that exact point in time (probably a CancelledError Failure), and
+ the timeout.
+
+ The default callable (if none is provided) will translate a
+ CancelledError Failure into a DeferredTimeoutError.
+ """
+ timed_out = [False]
+
+ def time_it_out():
+ timed_out[0] = True
+ deferred.cancel()
+
+ delayed_call = reactor.callLater(timeout, time_it_out)
+
+ def convert_cancelled(value):
+ if timed_out[0]:
+ to_call = on_timeout_cancel or _cancelled_to_timed_out_error
+ return to_call(value, timeout)
+ return value
+
+ deferred.addBoth(convert_cancelled)
+
+ def cancel_timeout(result):
+ # stop the pending call to cancel the deferred if it's been fired
+ if delayed_call.active():
+ delayed_call.cancel()
+ return result
+
+ deferred.addBoth(cancel_timeout)
+
+
+def _cancelled_to_timed_out_error(value, timeout):
+ if isinstance(value, failure.Failure):
+ value.trap(CancelledError)
+ raise DeferredTimeoutError(timeout, "Deferred")
+ return value
|