diff options
author | Andrew Morgan <andrew@amorgan.xyz> | 2020-09-29 14:21:41 +0100 |
---|---|---|
committer | Andrew Morgan <andrew@amorgan.xyz> | 2020-09-29 14:21:41 +0100 |
commit | f43c66d23b5639706bf5aca36ac5089311fac196 (patch) | |
tree | 37b68842bc8c607262c3875f9ecf2cef75e63c4c /synapse | |
parent | Changelog (diff) | |
parent | Add support for running Complement against the local checkout (#8317) (diff) | |
download | synapse-f43c66d23b5639706bf5aca36ac5089311fac196.tar.xz |
Merge branch 'develop' of github.com:matrix-org/synapse into anoa/info-mainline-no-check-password-reset
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/handlers/identity.py | 25 | ||||
-rw-r--r-- | synapse/http/__init__.py | 17 | ||||
-rw-r--r-- | synapse/http/client.py | 54 | ||||
-rw-r--r-- | synapse/http/matrixfederationclient.py | 55 | ||||
-rw-r--r-- | synapse/http/proxyagent.py | 16 | ||||
-rw-r--r-- | synapse/replication/http/_base.py | 40 | ||||
-rw-r--r-- | synapse/rest/client/v2_alpha/account.py | 15 | ||||
-rw-r--r-- | synapse/storage/databases/main/monthly_active_users.py | 9 | ||||
-rw-r--r-- | synapse/util/async_helpers.py | 47 |
9 files changed, 167 insertions, 111 deletions
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py index ab15570f7a..bc3e9607ca 100644 --- a/synapse/handlers/identity.py +++ b/synapse/handlers/identity.py @@ -21,8 +21,6 @@ import logging import urllib.parse from typing import Awaitable, Callable, Dict, List, Optional, Tuple -from twisted.internet.error import TimeoutError - from synapse.api.errors import ( CodeMessageException, Codes, @@ -30,6 +28,7 @@ from synapse.api.errors import ( SynapseError, ) from synapse.config.emailconfig import ThreepidBehaviour +from synapse.http import RequestTimedOutError from synapse.http.client import SimpleHttpClient from synapse.types import JsonDict, Requester from synapse.util import json_decoder @@ -93,7 +92,7 @@ class IdentityHandler(BaseHandler): try: data = await self.http_client.get_json(url, query_params) - except TimeoutError: + except RequestTimedOutError: raise SynapseError(500, "Timed out contacting identity server") except HttpResponseException as e: logger.info( @@ -173,7 +172,7 @@ class IdentityHandler(BaseHandler): if e.code != 404 or not use_v2: logger.error("3PID bind failed with Matrix error: %r", e) raise e.to_synapse_error() - except TimeoutError: + except RequestTimedOutError: raise SynapseError(500, "Timed out contacting identity server") except CodeMessageException as e: data = json_decoder.decode(e.msg) # XXX WAT? @@ -273,7 +272,7 @@ class IdentityHandler(BaseHandler): else: logger.error("Failed to unbind threepid on identity server: %s", e) raise SynapseError(500, "Failed to contact identity server") - except TimeoutError: + except RequestTimedOutError: raise SynapseError(500, "Timed out contacting identity server") await self.store.remove_user_bound_threepid( @@ -419,7 +418,7 @@ class IdentityHandler(BaseHandler): except HttpResponseException as e: logger.info("Proxied requestToken failed: %r", e) raise e.to_synapse_error() - except TimeoutError: + except RequestTimedOutError: raise SynapseError(500, "Timed out contacting identity server") async def requestMsisdnToken( @@ -471,7 +470,7 @@ class IdentityHandler(BaseHandler): except HttpResponseException as e: logger.info("Proxied requestToken failed: %r", e) raise e.to_synapse_error() - except TimeoutError: + except RequestTimedOutError: raise SynapseError(500, "Timed out contacting identity server") assert self.hs.config.public_baseurl @@ -553,7 +552,7 @@ class IdentityHandler(BaseHandler): id_server + "/_matrix/identity/api/v1/validate/msisdn/submitToken", body, ) - except TimeoutError: + except RequestTimedOutError: raise SynapseError(500, "Timed out contacting identity server") except HttpResponseException as e: logger.warning("Error contacting msisdn account_threepid_delegate: %s", e) @@ -627,7 +626,7 @@ class IdentityHandler(BaseHandler): # require or validate it. See the following for context: # https://github.com/matrix-org/synapse/issues/5253#issuecomment-666246950 return data["mxid"] - except TimeoutError: + except RequestTimedOutError: raise SynapseError(500, "Timed out contacting identity server") except IOError as e: logger.warning("Error from v1 identity server lookup: %s" % (e,)) @@ -655,7 +654,7 @@ class IdentityHandler(BaseHandler): "%s%s/_matrix/identity/v2/hash_details" % (id_server_scheme, id_server), {"access_token": id_access_token}, ) - except TimeoutError: + except RequestTimedOutError: raise SynapseError(500, "Timed out contacting identity server") if not isinstance(hash_details, dict): @@ -727,7 +726,7 @@ class IdentityHandler(BaseHandler): }, headers=headers, ) - except TimeoutError: + except RequestTimedOutError: raise SynapseError(500, "Timed out contacting identity server") except Exception as e: logger.warning("Error when performing a v2 3pid lookup: %s", e) @@ -823,7 +822,7 @@ class IdentityHandler(BaseHandler): invite_config, {"Authorization": create_id_access_token_header(id_access_token)}, ) - except TimeoutError: + except RequestTimedOutError: raise SynapseError(500, "Timed out contacting identity server") except HttpResponseException as e: if e.code != 404: @@ -841,7 +840,7 @@ class IdentityHandler(BaseHandler): data = await self.blacklisting_http_client.post_json_get_json( url, invite_config ) - except TimeoutError: + except RequestTimedOutError: raise SynapseError(500, "Timed out contacting identity server") except HttpResponseException as e: logger.warning( diff --git a/synapse/http/__init__.py b/synapse/http/__init__.py index 8eb3638591..59b01b812c 100644 --- a/synapse/http/__init__.py +++ b/synapse/http/__init__.py @@ -16,8 +16,6 @@ import re from twisted.internet import task -from twisted.internet.defer import CancelledError -from twisted.python import failure from twisted.web.client import FileBodyProducer from synapse.api.errors import SynapseError @@ -26,19 +24,8 @@ from synapse.api.errors import SynapseError class RequestTimedOutError(SynapseError): """Exception representing timeout of an outbound request""" - def __init__(self): - super().__init__(504, "Timed out") - - -def cancelled_to_request_timed_out_error(value, timeout): - """Turns CancelledErrors into RequestTimedOutErrors. - - For use with async.add_timeout_to_deferred - """ - if isinstance(value, failure.Failure): - value.trap(CancelledError) - raise RequestTimedOutError() - return value + def __init__(self, msg): + super().__init__(504, msg) ACCESS_TOKEN_RE = re.compile(r"(\?.*access(_|%5[Ff])token=)[^&]*(.*)$") diff --git a/synapse/http/client.py b/synapse/http/client.py index 4694adc400..8324632cb6 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -13,7 +13,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import logging import urllib from io import BytesIO @@ -38,7 +37,7 @@ from zope.interface import implementer, provider from OpenSSL import SSL from OpenSSL.SSL import VERIFY_NONE -from twisted.internet import defer, protocol, ssl +from twisted.internet import defer, error as twisted_error, protocol, ssl from twisted.internet.interfaces import ( IReactorPluggableNameResolver, IResolutionReceiver, @@ -46,17 +45,18 @@ from twisted.internet.interfaces import ( from twisted.internet.task import Cooperator from twisted.python.failure import Failure from twisted.web._newclient import ResponseDone -from twisted.web.client import Agent, HTTPConnectionPool, readBody +from twisted.web.client import ( + Agent, + HTTPConnectionPool, + ResponseNeverReceived, + readBody, +) from twisted.web.http import PotentialDataLoss from twisted.web.http_headers import Headers from twisted.web.iweb import IResponse from synapse.api.errors import Codes, HttpResponseException, SynapseError -from synapse.http import ( - QuieterFileBodyProducer, - cancelled_to_request_timed_out_error, - redact_uri, -) +from synapse.http import QuieterFileBodyProducer, RequestTimedOutError, redact_uri from synapse.http.proxyagent import ProxyAgent from synapse.logging.context import make_deferred_yieldable from synapse.logging.opentracing import set_tag, start_active_span, tags @@ -332,8 +332,6 @@ class SimpleHttpClient: RequestTimedOutError if the request times out before the headers are read """ - # A small wrapper around self.agent.request() so we can easily attach - # counters to it outgoing_requests_counter.labels(method).inc() # log request but strip `access_token` (AS requests for example include this) @@ -362,15 +360,17 @@ class SimpleHttpClient: data=body_producer, headers=headers, **self._extra_treq_args - ) + ) # type: defer.Deferred + # we use our own timeout mechanism rather than treq's as a workaround # for https://twistedmatrix.com/trac/ticket/9534. request_deferred = timeout_deferred( - request_deferred, - 60, - self.hs.get_reactor(), - cancelled_to_request_timed_out_error, + request_deferred, 60, self.hs.get_reactor(), ) + + # turn timeouts into RequestTimedOutErrors + request_deferred.addErrback(_timeout_to_request_timed_out_error) + response = await make_deferred_yieldable(request_deferred) incoming_responses_counter.labels(method, response.code).inc() @@ -410,7 +410,7 @@ class SimpleHttpClient: parsed json Raises: - RequestTimedOutException: if there is a timeout before the response headers + RequestTimedOutError: if there is a timeout before the response headers are received. Note there is currently no timeout on reading the response body. @@ -461,7 +461,7 @@ class SimpleHttpClient: parsed json Raises: - RequestTimedOutException: if there is a timeout before the response headers + RequestTimedOutError: if there is a timeout before the response headers are received. Note there is currently no timeout on reading the response body. @@ -506,7 +506,7 @@ class SimpleHttpClient: Returns: Succeeds when we get a 2xx HTTP response, with the HTTP body as JSON. Raises: - RequestTimedOutException: if there is a timeout before the response headers + RequestTimedOutError: if there is a timeout before the response headers are received. Note there is currently no timeout on reading the response body. @@ -538,7 +538,7 @@ class SimpleHttpClient: Returns: Succeeds when we get a 2xx HTTP response, with the HTTP body as JSON. Raises: - RequestTimedOutException: if there is a timeout before the response headers + RequestTimedOutError: if there is a timeout before the response headers are received. Note there is currently no timeout on reading the response body. @@ -586,7 +586,7 @@ class SimpleHttpClient: Succeeds when we get a 2xx HTTP response, with the HTTP body as bytes. Raises: - RequestTimedOutException: if there is a timeout before the response headers + RequestTimedOutError: if there is a timeout before the response headers are received. Note there is currently no timeout on reading the response body. @@ -631,7 +631,7 @@ class SimpleHttpClient: headers, absolute URI of the response and HTTP response code. Raises: - RequestTimedOutException: if there is a timeout before the response headers + RequestTimedOutError: if there is a timeout before the response headers are received. Note there is currently no timeout on reading the response body. @@ -684,6 +684,18 @@ class SimpleHttpClient: ) +def _timeout_to_request_timed_out_error(f: Failure): + if f.check(twisted_error.TimeoutError, twisted_error.ConnectingCancelledError): + # The TCP connection has its own timeout (set by the 'connectTimeout' param + # on the Agent), which raises twisted_error.TimeoutError exception. + raise RequestTimedOutError("Timeout connecting to remote server") + elif f.check(defer.TimeoutError, ResponseNeverReceived): + # this one means that we hit our overall timeout on the request + raise RequestTimedOutError("Timeout waiting for response from remote server") + + return f + + # XXX: FIXME: This is horribly copy-pasted from matrixfederationclient. # The two should be factored out. diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index b02c74ab2d..c23a4d7c0c 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -171,7 +171,7 @@ async def _handle_json_response( d = timeout_deferred(d, timeout=timeout_sec, reactor=reactor) body = await make_deferred_yieldable(d) - except TimeoutError as e: + except defer.TimeoutError as e: logger.warning( "{%s} [%s] Timed out reading response - %s %s", request.txn_id, @@ -655,10 +655,14 @@ class MatrixFederationHttpClient: long_retries (bool): whether to use the long retry algorithm. See docs on _send_request for details. - timeout (int|None): number of milliseconds to wait for the response headers - (including connecting to the server), *for each attempt*. + timeout (int|None): number of milliseconds to wait for the response. self._default_timeout (60s) by default. + Note that we may make several attempts to send the request; this + timeout applies to the time spent waiting for response headers for + *each* attempt (including connection time) as well as the time spent + reading the response body after a 200 response. + ignore_backoff (bool): true to ignore the historical backoff data and try the request anyway. backoff_on_404 (bool): True if we should count a 404 response as @@ -704,8 +708,13 @@ class MatrixFederationHttpClient: timeout=timeout, ) + if timeout is not None: + _sec_timeout = timeout / 1000 + else: + _sec_timeout = self.default_timeout + body = await _handle_json_response( - self.reactor, self.default_timeout, request, response, start_ms + self.reactor, _sec_timeout, request, response, start_ms ) return body @@ -734,10 +743,14 @@ class MatrixFederationHttpClient: long_retries (bool): whether to use the long retry algorithm. See docs on _send_request for details. - timeout (int|None): number of milliseconds to wait for the response headers - (including connecting to the server), *for each attempt*. + timeout (int|None): number of milliseconds to wait for the response. self._default_timeout (60s) by default. + Note that we may make several attempts to send the request; this + timeout applies to the time spent waiting for response headers for + *each* attempt (including connection time) as well as the time spent + reading the response body after a 200 response. + ignore_backoff (bool): true to ignore the historical backoff data and try the request anyway. @@ -801,10 +814,14 @@ class MatrixFederationHttpClient: args (dict|None): A dictionary used to create query strings, defaults to None. - timeout (int|None): number of milliseconds to wait for the response headers - (including connecting to the server), *for each attempt*. + timeout (int|None): number of milliseconds to wait for the response. self._default_timeout (60s) by default. + Note that we may make several attempts to send the request; this + timeout applies to the time spent waiting for response headers for + *each* attempt (including connection time) as well as the time spent + reading the response body after a 200 response. + ignore_backoff (bool): true to ignore the historical backoff data and try the request anyway. @@ -840,8 +857,13 @@ class MatrixFederationHttpClient: timeout=timeout, ) + if timeout is not None: + _sec_timeout = timeout / 1000 + else: + _sec_timeout = self.default_timeout + body = await _handle_json_response( - self.reactor, self.default_timeout, request, response, start_ms + self.reactor, _sec_timeout, request, response, start_ms ) return body @@ -865,10 +887,14 @@ class MatrixFederationHttpClient: long_retries (bool): whether to use the long retry algorithm. See docs on _send_request for details. - timeout (int|None): number of milliseconds to wait for the response headers - (including connecting to the server), *for each attempt*. + timeout (int|None): number of milliseconds to wait for the response. self._default_timeout (60s) by default. + Note that we may make several attempts to send the request; this + timeout applies to the time spent waiting for response headers for + *each* attempt (including connection time) as well as the time spent + reading the response body after a 200 response. + ignore_backoff (bool): true to ignore the historical backoff data and try the request anyway. @@ -900,8 +926,13 @@ class MatrixFederationHttpClient: ignore_backoff=ignore_backoff, ) + if timeout is not None: + _sec_timeout = timeout / 1000 + else: + _sec_timeout = self.default_timeout + body = await _handle_json_response( - self.reactor, self.default_timeout, request, response, start_ms + self.reactor, _sec_timeout, request, response, start_ms ) return body diff --git a/synapse/http/proxyagent.py b/synapse/http/proxyagent.py index 332da02a8d..e32d3f43e0 100644 --- a/synapse/http/proxyagent.py +++ b/synapse/http/proxyagent.py @@ -44,8 +44,11 @@ class ProxyAgent(_AgentBase): `BrowserLikePolicyForHTTPS`, so unless you have special requirements you can leave this as-is. - connectTimeout (float): The amount of time that this Agent will wait - for the peer to accept a connection. + connectTimeout (Optional[float]): The amount of time that this Agent will wait + for the peer to accept a connection, in seconds. If 'None', + HostnameEndpoint's default (30s) will be used. + + This is used for connections to both proxies and destination servers. bindAddress (bytes): The local address for client sockets to bind to. @@ -108,6 +111,15 @@ class ProxyAgent(_AgentBase): Returns: Deferred[IResponse]: completes when the header of the response has been received (regardless of the response status code). + + Can fail with: + SchemeNotSupported: if the uri is not http or https + + twisted.internet.error.TimeoutError if the server we are connecting + to (proxy or destination) does not accept a connection before + connectTimeout. + + ... other things too. """ uri = uri.strip() if not _VALID_URI.match(uri): diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py index b448da6710..64edadb624 100644 --- a/synapse/replication/http/_base.py +++ b/synapse/replication/http/_base.py @@ -20,18 +20,28 @@ import urllib from inspect import signature from typing import Dict, List, Tuple -from synapse.api.errors import ( - CodeMessageException, - HttpResponseException, - RequestSendFailed, - SynapseError, -) +from prometheus_client import Counter, Gauge + +from synapse.api.errors import HttpResponseException, SynapseError +from synapse.http import RequestTimedOutError from synapse.logging.opentracing import inject_active_span_byte_dict, trace from synapse.util.caches.response_cache import ResponseCache from synapse.util.stringutils import random_string logger = logging.getLogger(__name__) +_pending_outgoing_requests = Gauge( + "synapse_pending_outgoing_replication_requests", + "Number of active outgoing replication requests, by replication method name", + ["name"], +) + +_outgoing_request_counter = Counter( + "synapse_outgoing_replication_requests", + "Number of outgoing replication requests, by replication method name and result", + ["name", "code"], +) + class ReplicationEndpoint(metaclass=abc.ABCMeta): """Helper base class for defining new replication HTTP endpoints. @@ -138,7 +148,10 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta): instance_map = hs.config.worker.instance_map + outgoing_gauge = _pending_outgoing_requests.labels(cls.NAME) + @trace(opname="outgoing_replication_request") + @outgoing_gauge.track_inprogress() async def send_request(instance_name="master", **kwargs): if instance_name == local_instance_name: raise Exception("Trying to send HTTP request to self") @@ -193,23 +206,26 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta): try: result = await request_func(uri, data, headers=headers) break - except CodeMessageException as e: - if e.code != 504 or not cls.RETRY_ON_TIMEOUT: + except RequestTimedOutError: + if not cls.RETRY_ON_TIMEOUT: raise - logger.warning("%s request timed out", cls.NAME) + logger.warning("%s request timed out; retrying", cls.NAME) # If we timed out we probably don't need to worry about backing # off too much, but lets just wait a little anyway. await clock.sleep(1) except HttpResponseException as e: # We convert to SynapseError as we know that it was a SynapseError - # on the master process that we should send to the client. (And + # on the main process that we should send to the client. (And # importantly, not stack traces everywhere) + _outgoing_request_counter.labels(cls.NAME, e.code).inc() raise e.to_synapse_error() - except RequestSendFailed as e: - raise SynapseError(502, "Failed to talk to master") from e + except Exception as e: + _outgoing_request_counter.labels(cls.NAME, "ERR").inc() + raise SynapseError(502, "Failed to talk to main process") from e + _outgoing_request_counter.labels(cls.NAME, 200).inc() return result return send_request diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py index ed0d0772f8..ab5815e7f7 100644 --- a/synapse/rest/client/v2_alpha/account.py +++ b/synapse/rest/client/v2_alpha/account.py @@ -96,8 +96,9 @@ class EmailPasswordRequestTokenRestServlet(RestServlet): send_attempt = body["send_attempt"] next_link = body.get("next_link") # Optional param - # Raise if the provided next_link value isn't valid - assert_valid_next_link(self.hs, next_link) + if next_link: + # Raise if the provided next_link value isn't valid + assert_valid_next_link(self.hs, next_link) # The email will be sent to the stored address. # This avoids a potential account hijack by requesting a password reset to @@ -372,8 +373,9 @@ class EmailThreepidRequestTokenRestServlet(RestServlet): Codes.THREEPID_DENIED, ) - # Raise if the provided next_link value isn't valid - assert_valid_next_link(self.hs, next_link) + if next_link: + # Raise if the provided next_link value isn't valid + assert_valid_next_link(self.hs, next_link) existing_user_id = await self.store.get_user_id_by_threepid("email", email) @@ -446,8 +448,9 @@ class MsisdnThreepidRequestTokenRestServlet(RestServlet): Codes.THREEPID_DENIED, ) - # Raise if the provided next_link value isn't valid - assert_valid_next_link(self.hs, next_link) + if next_link: + # Raise if the provided next_link value isn't valid + assert_valid_next_link(self.hs, next_link) existing_user_id = await self.store.get_user_id_by_threepid("msisdn", msisdn) diff --git a/synapse/storage/databases/main/monthly_active_users.py b/synapse/storage/databases/main/monthly_active_users.py index e0cedd1aac..e93aad33cd 100644 --- a/synapse/storage/databases/main/monthly_active_users.py +++ b/synapse/storage/databases/main/monthly_active_users.py @@ -41,7 +41,14 @@ class MonthlyActiveUsersWorkerStore(SQLBaseStore): """ def _count_users(txn): - sql = "SELECT COALESCE(count(*), 0) FROM monthly_active_users" + # Exclude app service users + sql = """ + SELECT COALESCE(count(*), 0) + FROM monthly_active_users + LEFT JOIN users + ON monthly_active_users.user_id=users.name + WHERE (users.appservice_id IS NULL OR users.appservice_id = ''); + """ txn.execute(sql) (count,) = txn.fetchone() return count diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index 67ce9a5f39..382f0cf3f0 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -449,18 +449,8 @@ class ReadWriteLock: R = TypeVar("R") -def _cancelled_to_timed_out_error(value: R, timeout: float) -> R: - if isinstance(value, failure.Failure): - value.trap(CancelledError) - raise defer.TimeoutError(timeout, "Deferred") - return value - - def timeout_deferred( - deferred: defer.Deferred, - timeout: float, - reactor: IReactorTime, - on_timeout_cancel: Optional[Callable[[Any, float], Any]] = None, + deferred: defer.Deferred, timeout: float, reactor: IReactorTime, ) -> defer.Deferred: """The in built twisted `Deferred.addTimeout` fails to time out deferreds that have a canceller that throws exceptions. This method creates a new @@ -469,27 +459,21 @@ def timeout_deferred( (See https://twistedmatrix.com/trac/ticket/9534) - NOTE: Unlike `Deferred.addTimeout`, this function returns a new deferred + NOTE: Unlike `Deferred.addTimeout`, this function returns a new deferred. + + NOTE: the TimeoutError raised by the resultant deferred is + twisted.internet.defer.TimeoutError, which is *different* to the built-in + TimeoutError, as well as various other TimeoutErrors you might have imported. Args: deferred: The Deferred to potentially timeout. timeout: Timeout in seconds reactor: The twisted reactor to use - on_timeout_cancel: A callable which is called immediately - after the deferred times out, and not if this deferred is - otherwise cancelled before the timeout. - It takes an arbitrary value, which is the value of the deferred at - that exact point in time (probably a CancelledError Failure), and - the timeout. - - The default callable (if none is provided) will translate a - CancelledError Failure into a defer.TimeoutError. Returns: - A new Deferred. + A new Deferred, which will errback with defer.TimeoutError on timeout. """ - new_d = defer.Deferred() timed_out = [False] @@ -502,18 +486,23 @@ def timeout_deferred( except: # noqa: E722, if we throw any exception it'll break time outs logger.exception("Canceller failed during timeout") + # the cancel() call should have set off a chain of errbacks which + # will have errbacked new_d, but in case it hasn't, errback it now. + if not new_d.called: - new_d.errback(defer.TimeoutError(timeout, "Deferred")) + new_d.errback(defer.TimeoutError("Timed out after %gs" % (timeout,))) delayed_call = reactor.callLater(timeout, time_it_out) - def convert_cancelled(value): - if timed_out[0]: - to_call = on_timeout_cancel or _cancelled_to_timed_out_error - return to_call(value, timeout) + def convert_cancelled(value: failure.Failure): + # if the orgininal deferred was cancelled, and our timeout has fired, then + # the reason it was cancelled was due to our timeout. Turn the CancelledError + # into a TimeoutError. + if timed_out[0] and value.check(CancelledError): + raise defer.TimeoutError("Timed out after %gs" % (timeout,)) return value - deferred.addBoth(convert_cancelled) + deferred.addErrback(convert_cancelled) def cancel_timeout(result): # stop the pending call to cancel the deferred if it's been fired |