diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index ab15570f7a..bc3e9607ca 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -21,8 +21,6 @@ import logging
import urllib.parse
from typing import Awaitable, Callable, Dict, List, Optional, Tuple
-from twisted.internet.error import TimeoutError
-
from synapse.api.errors import (
CodeMessageException,
Codes,
@@ -30,6 +28,7 @@ from synapse.api.errors import (
SynapseError,
)
from synapse.config.emailconfig import ThreepidBehaviour
+from synapse.http import RequestTimedOutError
from synapse.http.client import SimpleHttpClient
from synapse.types import JsonDict, Requester
from synapse.util import json_decoder
@@ -93,7 +92,7 @@ class IdentityHandler(BaseHandler):
try:
data = await self.http_client.get_json(url, query_params)
- except TimeoutError:
+ except RequestTimedOutError:
raise SynapseError(500, "Timed out contacting identity server")
except HttpResponseException as e:
logger.info(
@@ -173,7 +172,7 @@ class IdentityHandler(BaseHandler):
if e.code != 404 or not use_v2:
logger.error("3PID bind failed with Matrix error: %r", e)
raise e.to_synapse_error()
- except TimeoutError:
+ except RequestTimedOutError:
raise SynapseError(500, "Timed out contacting identity server")
except CodeMessageException as e:
data = json_decoder.decode(e.msg) # XXX WAT?
@@ -273,7 +272,7 @@ class IdentityHandler(BaseHandler):
else:
logger.error("Failed to unbind threepid on identity server: %s", e)
raise SynapseError(500, "Failed to contact identity server")
- except TimeoutError:
+ except RequestTimedOutError:
raise SynapseError(500, "Timed out contacting identity server")
await self.store.remove_user_bound_threepid(
@@ -419,7 +418,7 @@ class IdentityHandler(BaseHandler):
except HttpResponseException as e:
logger.info("Proxied requestToken failed: %r", e)
raise e.to_synapse_error()
- except TimeoutError:
+ except RequestTimedOutError:
raise SynapseError(500, "Timed out contacting identity server")
async def requestMsisdnToken(
@@ -471,7 +470,7 @@ class IdentityHandler(BaseHandler):
except HttpResponseException as e:
logger.info("Proxied requestToken failed: %r", e)
raise e.to_synapse_error()
- except TimeoutError:
+ except RequestTimedOutError:
raise SynapseError(500, "Timed out contacting identity server")
assert self.hs.config.public_baseurl
@@ -553,7 +552,7 @@ class IdentityHandler(BaseHandler):
id_server + "/_matrix/identity/api/v1/validate/msisdn/submitToken",
body,
)
- except TimeoutError:
+ except RequestTimedOutError:
raise SynapseError(500, "Timed out contacting identity server")
except HttpResponseException as e:
logger.warning("Error contacting msisdn account_threepid_delegate: %s", e)
@@ -627,7 +626,7 @@ class IdentityHandler(BaseHandler):
# require or validate it. See the following for context:
# https://github.com/matrix-org/synapse/issues/5253#issuecomment-666246950
return data["mxid"]
- except TimeoutError:
+ except RequestTimedOutError:
raise SynapseError(500, "Timed out contacting identity server")
except IOError as e:
logger.warning("Error from v1 identity server lookup: %s" % (e,))
@@ -655,7 +654,7 @@ class IdentityHandler(BaseHandler):
"%s%s/_matrix/identity/v2/hash_details" % (id_server_scheme, id_server),
{"access_token": id_access_token},
)
- except TimeoutError:
+ except RequestTimedOutError:
raise SynapseError(500, "Timed out contacting identity server")
if not isinstance(hash_details, dict):
@@ -727,7 +726,7 @@ class IdentityHandler(BaseHandler):
},
headers=headers,
)
- except TimeoutError:
+ except RequestTimedOutError:
raise SynapseError(500, "Timed out contacting identity server")
except Exception as e:
logger.warning("Error when performing a v2 3pid lookup: %s", e)
@@ -823,7 +822,7 @@ class IdentityHandler(BaseHandler):
invite_config,
{"Authorization": create_id_access_token_header(id_access_token)},
)
- except TimeoutError:
+ except RequestTimedOutError:
raise SynapseError(500, "Timed out contacting identity server")
except HttpResponseException as e:
if e.code != 404:
@@ -841,7 +840,7 @@ class IdentityHandler(BaseHandler):
data = await self.blacklisting_http_client.post_json_get_json(
url, invite_config
)
- except TimeoutError:
+ except RequestTimedOutError:
raise SynapseError(500, "Timed out contacting identity server")
except HttpResponseException as e:
logger.warning(
diff --git a/synapse/http/__init__.py b/synapse/http/__init__.py
index 8eb3638591..59b01b812c 100644
--- a/synapse/http/__init__.py
+++ b/synapse/http/__init__.py
@@ -16,8 +16,6 @@
import re
from twisted.internet import task
-from twisted.internet.defer import CancelledError
-from twisted.python import failure
from twisted.web.client import FileBodyProducer
from synapse.api.errors import SynapseError
@@ -26,19 +24,8 @@ from synapse.api.errors import SynapseError
class RequestTimedOutError(SynapseError):
"""Exception representing timeout of an outbound request"""
- def __init__(self):
- super().__init__(504, "Timed out")
-
-
-def cancelled_to_request_timed_out_error(value, timeout):
- """Turns CancelledErrors into RequestTimedOutErrors.
-
- For use with async.add_timeout_to_deferred
- """
- if isinstance(value, failure.Failure):
- value.trap(CancelledError)
- raise RequestTimedOutError()
- return value
+ def __init__(self, msg):
+ super().__init__(504, msg)
ACCESS_TOKEN_RE = re.compile(r"(\?.*access(_|%5[Ff])token=)[^&]*(.*)$")
diff --git a/synapse/http/client.py b/synapse/http/client.py
index 4694adc400..8324632cb6 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -13,7 +13,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
import logging
import urllib
from io import BytesIO
@@ -38,7 +37,7 @@ from zope.interface import implementer, provider
from OpenSSL import SSL
from OpenSSL.SSL import VERIFY_NONE
-from twisted.internet import defer, protocol, ssl
+from twisted.internet import defer, error as twisted_error, protocol, ssl
from twisted.internet.interfaces import (
IReactorPluggableNameResolver,
IResolutionReceiver,
@@ -46,17 +45,18 @@ from twisted.internet.interfaces import (
from twisted.internet.task import Cooperator
from twisted.python.failure import Failure
from twisted.web._newclient import ResponseDone
-from twisted.web.client import Agent, HTTPConnectionPool, readBody
+from twisted.web.client import (
+ Agent,
+ HTTPConnectionPool,
+ ResponseNeverReceived,
+ readBody,
+)
from twisted.web.http import PotentialDataLoss
from twisted.web.http_headers import Headers
from twisted.web.iweb import IResponse
from synapse.api.errors import Codes, HttpResponseException, SynapseError
-from synapse.http import (
- QuieterFileBodyProducer,
- cancelled_to_request_timed_out_error,
- redact_uri,
-)
+from synapse.http import QuieterFileBodyProducer, RequestTimedOutError, redact_uri
from synapse.http.proxyagent import ProxyAgent
from synapse.logging.context import make_deferred_yieldable
from synapse.logging.opentracing import set_tag, start_active_span, tags
@@ -332,8 +332,6 @@ class SimpleHttpClient:
RequestTimedOutError if the request times out before the headers are read
"""
- # A small wrapper around self.agent.request() so we can easily attach
- # counters to it
outgoing_requests_counter.labels(method).inc()
# log request but strip `access_token` (AS requests for example include this)
@@ -362,15 +360,17 @@ class SimpleHttpClient:
data=body_producer,
headers=headers,
**self._extra_treq_args
- )
+ ) # type: defer.Deferred
+
# we use our own timeout mechanism rather than treq's as a workaround
# for https://twistedmatrix.com/trac/ticket/9534.
request_deferred = timeout_deferred(
- request_deferred,
- 60,
- self.hs.get_reactor(),
- cancelled_to_request_timed_out_error,
+ request_deferred, 60, self.hs.get_reactor(),
)
+
+ # turn timeouts into RequestTimedOutErrors
+ request_deferred.addErrback(_timeout_to_request_timed_out_error)
+
response = await make_deferred_yieldable(request_deferred)
incoming_responses_counter.labels(method, response.code).inc()
@@ -410,7 +410,7 @@ class SimpleHttpClient:
parsed json
Raises:
- RequestTimedOutException: if there is a timeout before the response headers
+ RequestTimedOutError: if there is a timeout before the response headers
are received. Note there is currently no timeout on reading the response
body.
@@ -461,7 +461,7 @@ class SimpleHttpClient:
parsed json
Raises:
- RequestTimedOutException: if there is a timeout before the response headers
+ RequestTimedOutError: if there is a timeout before the response headers
are received. Note there is currently no timeout on reading the response
body.
@@ -506,7 +506,7 @@ class SimpleHttpClient:
Returns:
Succeeds when we get a 2xx HTTP response, with the HTTP body as JSON.
Raises:
- RequestTimedOutException: if there is a timeout before the response headers
+ RequestTimedOutError: if there is a timeout before the response headers
are received. Note there is currently no timeout on reading the response
body.
@@ -538,7 +538,7 @@ class SimpleHttpClient:
Returns:
Succeeds when we get a 2xx HTTP response, with the HTTP body as JSON.
Raises:
- RequestTimedOutException: if there is a timeout before the response headers
+ RequestTimedOutError: if there is a timeout before the response headers
are received. Note there is currently no timeout on reading the response
body.
@@ -586,7 +586,7 @@ class SimpleHttpClient:
Succeeds when we get a 2xx HTTP response, with the
HTTP body as bytes.
Raises:
- RequestTimedOutException: if there is a timeout before the response headers
+ RequestTimedOutError: if there is a timeout before the response headers
are received. Note there is currently no timeout on reading the response
body.
@@ -631,7 +631,7 @@ class SimpleHttpClient:
headers, absolute URI of the response and HTTP response code.
Raises:
- RequestTimedOutException: if there is a timeout before the response headers
+ RequestTimedOutError: if there is a timeout before the response headers
are received. Note there is currently no timeout on reading the response
body.
@@ -684,6 +684,18 @@ class SimpleHttpClient:
)
+def _timeout_to_request_timed_out_error(f: Failure):
+ if f.check(twisted_error.TimeoutError, twisted_error.ConnectingCancelledError):
+ # The TCP connection has its own timeout (set by the 'connectTimeout' param
+ # on the Agent), which raises twisted_error.TimeoutError exception.
+ raise RequestTimedOutError("Timeout connecting to remote server")
+ elif f.check(defer.TimeoutError, ResponseNeverReceived):
+ # this one means that we hit our overall timeout on the request
+ raise RequestTimedOutError("Timeout waiting for response from remote server")
+
+ return f
+
+
# XXX: FIXME: This is horribly copy-pasted from matrixfederationclient.
# The two should be factored out.
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index b02c74ab2d..c23a4d7c0c 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -171,7 +171,7 @@ async def _handle_json_response(
d = timeout_deferred(d, timeout=timeout_sec, reactor=reactor)
body = await make_deferred_yieldable(d)
- except TimeoutError as e:
+ except defer.TimeoutError as e:
logger.warning(
"{%s} [%s] Timed out reading response - %s %s",
request.txn_id,
@@ -655,10 +655,14 @@ class MatrixFederationHttpClient:
long_retries (bool): whether to use the long retry algorithm. See
docs on _send_request for details.
- timeout (int|None): number of milliseconds to wait for the response headers
- (including connecting to the server), *for each attempt*.
+ timeout (int|None): number of milliseconds to wait for the response.
self._default_timeout (60s) by default.
+ Note that we may make several attempts to send the request; this
+ timeout applies to the time spent waiting for response headers for
+ *each* attempt (including connection time) as well as the time spent
+ reading the response body after a 200 response.
+
ignore_backoff (bool): true to ignore the historical backoff data
and try the request anyway.
backoff_on_404 (bool): True if we should count a 404 response as
@@ -704,8 +708,13 @@ class MatrixFederationHttpClient:
timeout=timeout,
)
+ if timeout is not None:
+ _sec_timeout = timeout / 1000
+ else:
+ _sec_timeout = self.default_timeout
+
body = await _handle_json_response(
- self.reactor, self.default_timeout, request, response, start_ms
+ self.reactor, _sec_timeout, request, response, start_ms
)
return body
@@ -734,10 +743,14 @@ class MatrixFederationHttpClient:
long_retries (bool): whether to use the long retry algorithm. See
docs on _send_request for details.
- timeout (int|None): number of milliseconds to wait for the response headers
- (including connecting to the server), *for each attempt*.
+ timeout (int|None): number of milliseconds to wait for the response.
self._default_timeout (60s) by default.
+ Note that we may make several attempts to send the request; this
+ timeout applies to the time spent waiting for response headers for
+ *each* attempt (including connection time) as well as the time spent
+ reading the response body after a 200 response.
+
ignore_backoff (bool): true to ignore the historical backoff data and
try the request anyway.
@@ -801,10 +814,14 @@ class MatrixFederationHttpClient:
args (dict|None): A dictionary used to create query strings, defaults to
None.
- timeout (int|None): number of milliseconds to wait for the response headers
- (including connecting to the server), *for each attempt*.
+ timeout (int|None): number of milliseconds to wait for the response.
self._default_timeout (60s) by default.
+ Note that we may make several attempts to send the request; this
+ timeout applies to the time spent waiting for response headers for
+ *each* attempt (including connection time) as well as the time spent
+ reading the response body after a 200 response.
+
ignore_backoff (bool): true to ignore the historical backoff data
and try the request anyway.
@@ -840,8 +857,13 @@ class MatrixFederationHttpClient:
timeout=timeout,
)
+ if timeout is not None:
+ _sec_timeout = timeout / 1000
+ else:
+ _sec_timeout = self.default_timeout
+
body = await _handle_json_response(
- self.reactor, self.default_timeout, request, response, start_ms
+ self.reactor, _sec_timeout, request, response, start_ms
)
return body
@@ -865,10 +887,14 @@ class MatrixFederationHttpClient:
long_retries (bool): whether to use the long retry algorithm. See
docs on _send_request for details.
- timeout (int|None): number of milliseconds to wait for the response headers
- (including connecting to the server), *for each attempt*.
+ timeout (int|None): number of milliseconds to wait for the response.
self._default_timeout (60s) by default.
+ Note that we may make several attempts to send the request; this
+ timeout applies to the time spent waiting for response headers for
+ *each* attempt (including connection time) as well as the time spent
+ reading the response body after a 200 response.
+
ignore_backoff (bool): true to ignore the historical backoff data and
try the request anyway.
@@ -900,8 +926,13 @@ class MatrixFederationHttpClient:
ignore_backoff=ignore_backoff,
)
+ if timeout is not None:
+ _sec_timeout = timeout / 1000
+ else:
+ _sec_timeout = self.default_timeout
+
body = await _handle_json_response(
- self.reactor, self.default_timeout, request, response, start_ms
+ self.reactor, _sec_timeout, request, response, start_ms
)
return body
diff --git a/synapse/http/proxyagent.py b/synapse/http/proxyagent.py
index 332da02a8d..e32d3f43e0 100644
--- a/synapse/http/proxyagent.py
+++ b/synapse/http/proxyagent.py
@@ -44,8 +44,11 @@ class ProxyAgent(_AgentBase):
`BrowserLikePolicyForHTTPS`, so unless you have special
requirements you can leave this as-is.
- connectTimeout (float): The amount of time that this Agent will wait
- for the peer to accept a connection.
+ connectTimeout (Optional[float]): The amount of time that this Agent will wait
+ for the peer to accept a connection, in seconds. If 'None',
+ HostnameEndpoint's default (30s) will be used.
+
+ This is used for connections to both proxies and destination servers.
bindAddress (bytes): The local address for client sockets to bind to.
@@ -108,6 +111,15 @@ class ProxyAgent(_AgentBase):
Returns:
Deferred[IResponse]: completes when the header of the response has
been received (regardless of the response status code).
+
+ Can fail with:
+ SchemeNotSupported: if the uri is not http or https
+
+ twisted.internet.error.TimeoutError if the server we are connecting
+ to (proxy or destination) does not accept a connection before
+ connectTimeout.
+
+ ... other things too.
"""
uri = uri.strip()
if not _VALID_URI.match(uri):
diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py
index 67ce9a5f39..382f0cf3f0 100644
--- a/synapse/util/async_helpers.py
+++ b/synapse/util/async_helpers.py
@@ -449,18 +449,8 @@ class ReadWriteLock:
R = TypeVar("R")
-def _cancelled_to_timed_out_error(value: R, timeout: float) -> R:
- if isinstance(value, failure.Failure):
- value.trap(CancelledError)
- raise defer.TimeoutError(timeout, "Deferred")
- return value
-
-
def timeout_deferred(
- deferred: defer.Deferred,
- timeout: float,
- reactor: IReactorTime,
- on_timeout_cancel: Optional[Callable[[Any, float], Any]] = None,
+ deferred: defer.Deferred, timeout: float, reactor: IReactorTime,
) -> defer.Deferred:
"""The in built twisted `Deferred.addTimeout` fails to time out deferreds
that have a canceller that throws exceptions. This method creates a new
@@ -469,27 +459,21 @@ def timeout_deferred(
(See https://twistedmatrix.com/trac/ticket/9534)
- NOTE: Unlike `Deferred.addTimeout`, this function returns a new deferred
+ NOTE: Unlike `Deferred.addTimeout`, this function returns a new deferred.
+
+ NOTE: the TimeoutError raised by the resultant deferred is
+ twisted.internet.defer.TimeoutError, which is *different* to the built-in
+ TimeoutError, as well as various other TimeoutErrors you might have imported.
Args:
deferred: The Deferred to potentially timeout.
timeout: Timeout in seconds
reactor: The twisted reactor to use
- on_timeout_cancel: A callable which is called immediately
- after the deferred times out, and not if this deferred is
- otherwise cancelled before the timeout.
- It takes an arbitrary value, which is the value of the deferred at
- that exact point in time (probably a CancelledError Failure), and
- the timeout.
-
- The default callable (if none is provided) will translate a
- CancelledError Failure into a defer.TimeoutError.
Returns:
- A new Deferred.
+ A new Deferred, which will errback with defer.TimeoutError on timeout.
"""
-
new_d = defer.Deferred()
timed_out = [False]
@@ -502,18 +486,23 @@ def timeout_deferred(
except: # noqa: E722, if we throw any exception it'll break time outs
logger.exception("Canceller failed during timeout")
+ # the cancel() call should have set off a chain of errbacks which
+ # will have errbacked new_d, but in case it hasn't, errback it now.
+
if not new_d.called:
- new_d.errback(defer.TimeoutError(timeout, "Deferred"))
+ new_d.errback(defer.TimeoutError("Timed out after %gs" % (timeout,)))
delayed_call = reactor.callLater(timeout, time_it_out)
- def convert_cancelled(value):
- if timed_out[0]:
- to_call = on_timeout_cancel or _cancelled_to_timed_out_error
- return to_call(value, timeout)
+ def convert_cancelled(value: failure.Failure):
+ # if the orgininal deferred was cancelled, and our timeout has fired, then
+ # the reason it was cancelled was due to our timeout. Turn the CancelledError
+ # into a TimeoutError.
+ if timed_out[0] and value.check(CancelledError):
+ raise defer.TimeoutError("Timed out after %gs" % (timeout,))
return value
- deferred.addBoth(convert_cancelled)
+ deferred.addErrback(convert_cancelled)
def cancel_timeout(result):
# stop the pending call to cancel the deferred if it's been fired
|