From 1ea904b9f09ea6a9df7792f61029d0250602d46b Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 23 Apr 2018 00:53:18 +0100 Subject: Use deferred.addTimeout instead of time_bound_deferred This doesn't feel like a wheel we need to reinvent. --- synapse/http/__init__.py | 22 +++++++++++++ synapse/http/client.py | 20 +++++------- synapse/http/matrixfederationclient.py | 35 ++++++++++----------- synapse/notifier.py | 23 +++++++------- synapse/util/__init__.py | 56 ---------------------------------- 5 files changed, 59 insertions(+), 97 deletions(-) (limited to 'synapse') diff --git a/synapse/http/__init__.py b/synapse/http/__init__.py index bfebb0f644..20e568bc43 100644 --- a/synapse/http/__init__.py +++ b/synapse/http/__init__.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,3 +13,24 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from twisted.internet.defer import CancelledError +from twisted.python import failure + +from synapse.api.errors import SynapseError + + +class RequestTimedOutError(SynapseError): + """Exception representing timeout of an outbound request""" + def __init__(self): + super(RequestTimedOutError, self).__init__(504, "Timed out") + + +def cancelled_to_request_timed_out_error(value): + """Turns CancelledErrors into RequestTimedOutErrors. + + For use with deferred.addTimeout() + """ + if isinstance(value, failure.Failure): + value.trap(CancelledError) + raise RequestTimedOutError() + return value diff --git a/synapse/http/client.py b/synapse/http/client.py index f3e4973c2e..35c8d51e71 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -18,9 +19,9 @@ from OpenSSL.SSL import VERIFY_NONE from synapse.api.errors import ( CodeMessageException, MatrixCodeMessageException, SynapseError, Codes, ) +from synapse.http import cancelled_to_request_timed_out_error from synapse.util.caches import CACHE_SIZE_FACTOR from synapse.util.logcontext import make_deferred_yieldable -from synapse.util import logcontext import synapse.metrics from synapse.http.endpoint import SpiderEndpoint @@ -95,21 +96,16 @@ class SimpleHttpClient(object): # counters to it outgoing_requests_counter.inc(method) - def send_request(): + logger.info("Sending request %s %s", method, uri) + + try: request_deferred = self.agent.request( method, uri, *args, **kwargs ) - - return self.clock.time_bound_deferred( - request_deferred, - time_out=60, + request_deferred.addTimeout( + 60, reactor, cancelled_to_request_timed_out_error, ) - - logger.info("Sending request %s %s", method, uri) - - try: - with logcontext.PreserveLoggingContext(): - response = yield send_request() + response = yield make_deferred_yieldable(request_deferred) incoming_responses_counter.inc(method, response.code) logger.info( diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 60a29081e8..fe4b1636a6 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,17 +13,19 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import synapse.util.retryutils from twisted.internet import defer, reactor, protocol from twisted.internet.error import DNSLookupError from twisted.web.client import readBody, HTTPConnectionPool, Agent from twisted.web.http_headers import Headers from twisted.web._newclient import ResponseDone +from synapse.http import cancelled_to_request_timed_out_error from synapse.http.endpoint import matrix_federation_endpoint +import synapse.metrics from synapse.util.async import sleep from synapse.util import logcontext -import synapse.metrics +from synapse.util.logcontext import make_deferred_yieldable +import synapse.util.retryutils from canonicaljson import encode_canonical_json @@ -184,21 +187,19 @@ class MatrixFederationHttpClient(object): producer = body_callback(method, http_url_bytes, headers_dict) try: - def send_request(): - request_deferred = self.agent.request( - method, - url_bytes, - Headers(headers_dict), - producer - ) - - return self.clock.time_bound_deferred( - request_deferred, - time_out=timeout / 1000. if timeout else 60, - ) - - with logcontext.PreserveLoggingContext(): - response = yield send_request() + request_deferred = self.agent.request( + method, + url_bytes, + Headers(headers_dict), + producer + ) + request_deferred.addTimeout( + timeout / 1000. if timeout else 60, + reactor, cancelled_to_request_timed_out_error, + ) + response = yield make_deferred_yieldable( + request_deferred, + ) log_result = "%d %s" % (response.code, response.phrase,) break diff --git a/synapse/notifier.py b/synapse/notifier.py index 0e40a4aad6..1e4f78b993 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -13,12 +13,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -from twisted.internet import defer +from twisted.internet import defer, reactor +from twisted.internet.defer import TimeoutError + from synapse.api.constants import EventTypes, Membership from synapse.api.errors import AuthError from synapse.handlers.presence import format_user_presence_state -from synapse.util import DeferredTimedOutError from synapse.util.logutils import log_function from synapse.util.async import ObservableDeferred from synapse.util.logcontext import PreserveLoggingContext, preserve_fn @@ -331,11 +332,11 @@ class Notifier(object): # Now we wait for the _NotifierUserStream to be told there # is a new token. listener = user_stream.new_listener(prev_token) + listener.deferred.addTimeout( + (end_time - now) / 1000., reactor, + ) with PreserveLoggingContext(): - yield self.clock.time_bound_deferred( - listener.deferred, - time_out=(end_time - now) / 1000. - ) + yield listener.deferred current_token = user_stream.current_token @@ -346,7 +347,7 @@ class Notifier(object): # Update the prev_token to the current_token since nothing # has happened between the old prev_token and the current_token prev_token = current_token - except DeferredTimedOutError: + except TimeoutError: break except defer.CancelledError: break @@ -551,13 +552,11 @@ class Notifier(object): if end_time <= now: break + listener.deferred.addTimeout((end_time - now) / 1000., reactor) try: with PreserveLoggingContext(): - yield self.clock.time_bound_deferred( - listener.deferred, - time_out=(end_time - now) / 1000. - ) - except DeferredTimedOutError: + yield listener.deferred + except TimeoutError: break except defer.CancelledError: break diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index 756d8ffa32..814a7bf71b 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.api.errors import SynapseError from synapse.util.logcontext import PreserveLoggingContext from twisted.internet import defer, reactor, task @@ -24,11 +23,6 @@ import logging logger = logging.getLogger(__name__) -class DeferredTimedOutError(SynapseError): - def __init__(self): - super(DeferredTimedOutError, self).__init__(504, "Timed out") - - def unwrapFirstError(failure): # defer.gatherResults and DeferredLists wrap failures. failure.trap(defer.FirstError) @@ -85,53 +79,3 @@ class Clock(object): except Exception: if not ignore_errs: raise - - def time_bound_deferred(self, given_deferred, time_out): - if given_deferred.called: - return given_deferred - - ret_deferred = defer.Deferred() - - def timed_out_fn(): - e = DeferredTimedOutError() - - try: - ret_deferred.errback(e) - except Exception: - pass - - try: - given_deferred.cancel() - except Exception: - pass - - timer = None - - def cancel(res): - try: - self.cancel_call_later(timer) - except Exception: - pass - return res - - ret_deferred.addBoth(cancel) - - def success(res): - try: - ret_deferred.callback(res) - except Exception: - pass - - return res - - def err(res): - try: - ret_deferred.errback(res) - except Exception: - pass - - given_deferred.addCallbacks(callback=success, errback=err) - - timer = self.call_later(time_out, timed_out_fn) - - return ret_deferred -- cgit 1.4.1 From 9d2c1b8429996cb9766ac636034485e2a1d685cc Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 27 Apr 2018 12:52:30 +0100 Subject: Backport deferred.addTimeout Twisted 16.0 doesn't have addTimeout, so let's backport it. --- synapse/http/__init__.py | 2 +- synapse/http/client.py | 6 ++- synapse/http/matrixfederationclient.py | 7 ++-- synapse/notifier.py | 22 +++++++---- synapse/util/async.py | 67 ++++++++++++++++++++++++++++++++++ 5 files changed, 90 insertions(+), 14 deletions(-) (limited to 'synapse') diff --git a/synapse/http/__init__.py b/synapse/http/__init__.py index 20e568bc43..0d47ccdb59 100644 --- a/synapse/http/__init__.py +++ b/synapse/http/__init__.py @@ -28,7 +28,7 @@ class RequestTimedOutError(SynapseError): def cancelled_to_request_timed_out_error(value): """Turns CancelledErrors into RequestTimedOutErrors. - For use with deferred.addTimeout() + For use with async.add_timeout_to_deferred """ if isinstance(value, failure.Failure): value.trap(CancelledError) diff --git a/synapse/http/client.py b/synapse/http/client.py index 35c8d51e71..62309c3365 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -20,6 +20,7 @@ from synapse.api.errors import ( CodeMessageException, MatrixCodeMessageException, SynapseError, Codes, ) from synapse.http import cancelled_to_request_timed_out_error +from synapse.util.async import add_timeout_to_deferred from synapse.util.caches import CACHE_SIZE_FACTOR from synapse.util.logcontext import make_deferred_yieldable import synapse.metrics @@ -102,8 +103,9 @@ class SimpleHttpClient(object): request_deferred = self.agent.request( method, uri, *args, **kwargs ) - request_deferred.addTimeout( - 60, reactor, cancelled_to_request_timed_out_error, + add_timeout_to_deferred( + request_deferred, + 60, cancelled_to_request_timed_out_error, ) response = yield make_deferred_yieldable(request_deferred) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index fe4b1636a6..30036fe81c 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -22,7 +22,7 @@ from twisted.web._newclient import ResponseDone from synapse.http import cancelled_to_request_timed_out_error from synapse.http.endpoint import matrix_federation_endpoint import synapse.metrics -from synapse.util.async import sleep +from synapse.util.async import sleep, add_timeout_to_deferred from synapse.util import logcontext from synapse.util.logcontext import make_deferred_yieldable import synapse.util.retryutils @@ -193,9 +193,10 @@ class MatrixFederationHttpClient(object): Headers(headers_dict), producer ) - request_deferred.addTimeout( + add_timeout_to_deferred( + request_deferred, timeout / 1000. if timeout else 60, - reactor, cancelled_to_request_timed_out_error, + cancelled_to_request_timed_out_error, ) response = yield make_deferred_yieldable( request_deferred, diff --git a/synapse/notifier.py b/synapse/notifier.py index 1e4f78b993..a1c06e8fca 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -13,15 +13,17 @@ # See the License for the specific language governing permissions and # limitations under the License. -from twisted.internet import defer, reactor -from twisted.internet.defer import TimeoutError +from twisted.internet import defer from synapse.api.constants import EventTypes, Membership from synapse.api.errors import AuthError from synapse.handlers.presence import format_user_presence_state from synapse.util.logutils import log_function -from synapse.util.async import ObservableDeferred +from synapse.util.async import ( + ObservableDeferred, add_timeout_to_deferred, + DeferredTimeoutError, +) from synapse.util.logcontext import PreserveLoggingContext, preserve_fn from synapse.util.metrics import Measure from synapse.types import StreamToken @@ -332,8 +334,9 @@ class Notifier(object): # Now we wait for the _NotifierUserStream to be told there # is a new token. listener = user_stream.new_listener(prev_token) - listener.deferred.addTimeout( - (end_time - now) / 1000., reactor, + add_timeout_to_deferred( + listener.deferred, + (end_time - now) / 1000., ) with PreserveLoggingContext(): yield listener.deferred @@ -347,7 +350,7 @@ class Notifier(object): # Update the prev_token to the current_token since nothing # has happened between the old prev_token and the current_token prev_token = current_token - except TimeoutError: + except DeferredTimeoutError: break except defer.CancelledError: break @@ -552,11 +555,14 @@ class Notifier(object): if end_time <= now: break - listener.deferred.addTimeout((end_time - now) / 1000., reactor) + add_timeout_to_deferred( + listener.deferred.addTimeout, + (end_time - now) / 1000., + ) try: with PreserveLoggingContext(): yield listener.deferred - except TimeoutError: + except DeferredTimeoutError: break except defer.CancelledError: break diff --git a/synapse/util/async.py b/synapse/util/async.py index 0729bb2863..1df5c5600c 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -15,6 +15,8 @@ from twisted.internet import defer, reactor +from twisted.internet.defer import CancelledError +from twisted.python import failure from .logcontext import ( PreserveLoggingContext, make_deferred_yieldable, preserve_fn @@ -392,3 +394,68 @@ class ReadWriteLock(object): self.key_to_current_writer.pop(key) defer.returnValue(_ctx_manager()) + + +class DeferredTimeoutError(Exception): + """ + This error is raised by default when a L{Deferred} times out. + """ + + +def add_timeout_to_deferred(deferred, timeout, on_timeout_cancel=None): + """ + Add a timeout to a deferred by scheduling it to be cancelled after + timeout seconds. + + This is essentially a backport of deferred.addTimeout, which was introduced + in twisted 16.5. + + If the deferred gets timed out, it errbacks with a DeferredTimeoutError, + unless a cancelable function was passed to its initialization or unless + a different on_timeout_cancel callable is provided. + + Args: + deferred (defer.Deferred): deferred to be timed out + timeout (Number): seconds to time out after + + on_timeout_cancel (callable): A callable which is called immediately + after the deferred times out, and not if this deferred is + otherwise cancelled before the timeout. + + It takes an arbitrary value, which is the value of the deferred at + that exact point in time (probably a CancelledError Failure), and + the timeout. + + The default callable (if none is provided) will translate a + CancelledError Failure into a DeferredTimeoutError. + """ + timed_out = [False] + + def time_it_out(): + timed_out[0] = True + deferred.cancel() + + delayed_call = reactor.callLater(timeout, time_it_out) + + def convert_cancelled(value): + if timed_out[0]: + to_call = on_timeout_cancel or _cancelled_to_timed_out_error + return to_call(value, timeout) + return value + + deferred.addBoth(convert_cancelled) + + def cancel_timeout(result): + # stop the pending call to cancel the deferred if it's been fired + if delayed_call.active(): + delayed_call.cancel() + return result + + deferred.addBoth(cancel_timeout) + + +def _cancelled_to_timed_out_error(value, timeout): + if isinstance(value, failure.Failure): + value.trap(CancelledError) + raise DeferredTimeoutError(timeout, "Deferred") + return value -- cgit 1.4.1