diff --git a/synapse/http/__init__.py b/synapse/http/__init__.py
index 3880ce0d94..59b01b812c 100644
--- a/synapse/http/__init__.py
+++ b/synapse/http/__init__.py
@@ -16,8 +16,6 @@
import re
from twisted.internet import task
-from twisted.internet.defer import CancelledError
-from twisted.python import failure
from twisted.web.client import FileBodyProducer
from synapse.api.errors import SynapseError
@@ -26,19 +24,8 @@ from synapse.api.errors import SynapseError
class RequestTimedOutError(SynapseError):
"""Exception representing timeout of an outbound request"""
- def __init__(self):
- super(RequestTimedOutError, self).__init__(504, "Timed out")
-
-
-def cancelled_to_request_timed_out_error(value, timeout):
- """Turns CancelledErrors into RequestTimedOutErrors.
-
- For use with async.add_timeout_to_deferred
- """
- if isinstance(value, failure.Failure):
- value.trap(CancelledError)
- raise RequestTimedOutError()
- return value
+ def __init__(self, msg):
+ super().__init__(504, msg)
ACCESS_TOKEN_RE = re.compile(r"(\?.*access(_|%5[Ff])token=)[^&]*(.*)$")
diff --git a/synapse/http/client.py b/synapse/http/client.py
index 13fcab3378..8324632cb6 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -13,10 +13,21 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
import logging
import urllib
from io import BytesIO
+from typing import (
+ Any,
+ BinaryIO,
+ Dict,
+ Iterable,
+ List,
+ Mapping,
+ Optional,
+ Sequence,
+ Tuple,
+ Union,
+)
import treq
from canonicaljson import encode_canonical_json
@@ -26,7 +37,7 @@ from zope.interface import implementer, provider
from OpenSSL import SSL
from OpenSSL.SSL import VERIFY_NONE
-from twisted.internet import defer, protocol, ssl
+from twisted.internet import defer, error as twisted_error, protocol, ssl
from twisted.internet.interfaces import (
IReactorPluggableNameResolver,
IResolutionReceiver,
@@ -34,16 +45,18 @@ from twisted.internet.interfaces import (
from twisted.internet.task import Cooperator
from twisted.python.failure import Failure
from twisted.web._newclient import ResponseDone
-from twisted.web.client import Agent, HTTPConnectionPool, readBody
+from twisted.web.client import (
+ Agent,
+ HTTPConnectionPool,
+ ResponseNeverReceived,
+ readBody,
+)
from twisted.web.http import PotentialDataLoss
from twisted.web.http_headers import Headers
+from twisted.web.iweb import IResponse
from synapse.api.errors import Codes, HttpResponseException, SynapseError
-from synapse.http import (
- QuieterFileBodyProducer,
- cancelled_to_request_timed_out_error,
- redact_uri,
-)
+from synapse.http import QuieterFileBodyProducer, RequestTimedOutError, redact_uri
from synapse.http.proxyagent import ProxyAgent
from synapse.logging.context import make_deferred_yieldable
from synapse.logging.opentracing import set_tag, start_active_span, tags
@@ -57,6 +70,19 @@ incoming_responses_counter = Counter(
"synapse_http_client_responses", "", ["method", "code"]
)
+# the type of the headers list, to be passed to the t.w.h.Headers.
+# Actually we can mix str and bytes keys, but Mapping treats 'key' as invariant so
+# we simplify.
+RawHeaders = Union[Mapping[str, "RawHeaderValue"], Mapping[bytes, "RawHeaderValue"]]
+
+# the value actually has to be a List, but List is invariant so we can't specify that
+# the entries can either be Lists or bytes.
+RawHeaderValue = Sequence[Union[str, bytes]]
+
+# the type of the query params, to be passed into `urlencode`
+QueryParamValue = Union[str, bytes, Iterable[Union[str, bytes]]]
+QueryParams = Union[Mapping[str, QueryParamValue], Mapping[bytes, QueryParamValue]]
+
def check_against_blacklist(ip_address, ip_whitelist, ip_blacklist):
"""
@@ -285,16 +311,27 @@ class SimpleHttpClient:
ip_blacklist=self._ip_blacklist,
)
- async def request(self, method, uri, data=None, headers=None):
+ async def request(
+ self,
+ method: str,
+ uri: str,
+ data: Optional[bytes] = None,
+ headers: Optional[Headers] = None,
+ ) -> IResponse:
"""
Args:
- method (str): HTTP method to use.
- uri (str): URI to query.
- data (bytes): Data to send in the request body, if applicable.
- headers (t.w.http_headers.Headers): Request headers.
+ method: HTTP method to use.
+ uri: URI to query.
+ data: Data to send in the request body, if applicable.
+ headers: Request headers.
+
+ Returns:
+ Response object, once the headers have been read.
+
+ Raises:
+ RequestTimedOutError if the request times out before the headers are read
+
"""
- # A small wrapper around self.agent.request() so we can easily attach
- # counters to it
outgoing_requests_counter.labels(method).inc()
# log request but strip `access_token` (AS requests for example include this)
@@ -323,13 +360,17 @@ class SimpleHttpClient:
data=body_producer,
headers=headers,
**self._extra_treq_args
- )
+ ) # type: defer.Deferred
+
+ # we use our own timeout mechanism rather than treq's as a workaround
+ # for https://twistedmatrix.com/trac/ticket/9534.
request_deferred = timeout_deferred(
- request_deferred,
- 60,
- self.hs.get_reactor(),
- cancelled_to_request_timed_out_error,
+ request_deferred, 60, self.hs.get_reactor(),
)
+
+ # turn timeouts into RequestTimedOutErrors
+ request_deferred.addErrback(_timeout_to_request_timed_out_error)
+
response = await make_deferred_yieldable(request_deferred)
incoming_responses_counter.labels(method, response.code).inc()
@@ -353,18 +394,26 @@ class SimpleHttpClient:
set_tag("error_reason", e.args[0])
raise
- async def post_urlencoded_get_json(self, uri, args={}, headers=None):
+ async def post_urlencoded_get_json(
+ self,
+ uri: str,
+ args: Mapping[str, Union[str, List[str]]] = {},
+ headers: Optional[RawHeaders] = None,
+ ) -> Any:
"""
Args:
- uri (str):
- args (dict[str, str|List[str]]): query params
- headers (dict[str|bytes, List[str|bytes]]|None): If not None, a map from
- header name to a list of values for that header
+ uri: uri to query
+ args: parameters to be url-encoded in the body
+ headers: a map from header name to a list of values for that header
Returns:
- object: parsed json
+ parsed json
Raises:
+ RequestTimedOutError: if there is a timeout before the response headers
+ are received. Note there is currently no timeout on reading the response
+ body.
+
HttpResponseException: On a non-2xx HTTP response.
ValueError: if the response was not JSON
@@ -398,19 +447,24 @@ class SimpleHttpClient:
response.code, response.phrase.decode("ascii", errors="replace"), body
)
- async def post_json_get_json(self, uri, post_json, headers=None):
+ async def post_json_get_json(
+ self, uri: str, post_json: Any, headers: Optional[RawHeaders] = None
+ ) -> Any:
"""
Args:
- uri (str):
- post_json (object):
- headers (dict[str|bytes, List[str|bytes]]|None): If not None, a map from
- header name to a list of values for that header
+ uri: URI to query.
+ post_json: request body, to be encoded as json
+ headers: a map from header name to a list of values for that header
Returns:
- object: parsed json
+ parsed json
Raises:
+ RequestTimedOutError: if there is a timeout before the response headers
+ are received. Note there is currently no timeout on reading the response
+ body.
+
HttpResponseException: On a non-2xx HTTP response.
ValueError: if the response was not JSON
@@ -440,21 +494,22 @@ class SimpleHttpClient:
response.code, response.phrase.decode("ascii", errors="replace"), body
)
- async def get_json(self, uri, args={}, headers=None):
- """ Gets some json from the given URI.
+ async def get_json(
+ self, uri: str, args: QueryParams = {}, headers: Optional[RawHeaders] = None,
+ ) -> Any:
+ """Gets some json from the given URI.
Args:
- uri (str): The URI to request, not including query parameters
- args (dict): A dictionary used to create query strings, defaults to
- None.
- **Note**: The value of each key is assumed to be an iterable
- and *not* a string.
- headers (dict[str|bytes, List[str|bytes]]|None): If not None, a map from
- header name to a list of values for that header
+ uri: The URI to request, not including query parameters
+ args: A dictionary used to create query string
+ headers: a map from header name to a list of values for that header
Returns:
- Succeeds when we get *any* 2xx HTTP response, with the
- HTTP body as JSON.
+ Succeeds when we get a 2xx HTTP response, with the HTTP body as JSON.
Raises:
+ RequestTimedOutError: if there is a timeout before the response headers
+ are received. Note there is currently no timeout on reading the response
+ body.
+
HttpResponseException On a non-2xx HTTP response.
ValueError: if the response was not JSON
@@ -466,22 +521,27 @@ class SimpleHttpClient:
body = await self.get_raw(uri, args, headers=headers)
return json_decoder.decode(body.decode("utf-8"))
- async def put_json(self, uri, json_body, args={}, headers=None):
- """ Puts some json to the given URI.
+ async def put_json(
+ self,
+ uri: str,
+ json_body: Any,
+ args: QueryParams = {},
+ headers: RawHeaders = None,
+ ) -> Any:
+ """Puts some json to the given URI.
Args:
- uri (str): The URI to request, not including query parameters
- json_body (dict): The JSON to put in the HTTP body,
- args (dict): A dictionary used to create query strings, defaults to
- None.
- **Note**: The value of each key is assumed to be an iterable
- and *not* a string.
- headers (dict[str|bytes, List[str|bytes]]|None): If not None, a map from
- header name to a list of values for that header
+ uri: The URI to request, not including query parameters
+ json_body: The JSON to put in the HTTP body,
+ args: A dictionary used to create query strings
+ headers: a map from header name to a list of values for that header
Returns:
- Succeeds when we get *any* 2xx HTTP response, with the
- HTTP body as JSON.
+ Succeeds when we get a 2xx HTTP response, with the HTTP body as JSON.
Raises:
+ RequestTimedOutError: if there is a timeout before the response headers
+ are received. Note there is currently no timeout on reading the response
+ body.
+
HttpResponseException On a non-2xx HTTP response.
ValueError: if the response was not JSON
@@ -513,21 +573,23 @@ class SimpleHttpClient:
response.code, response.phrase.decode("ascii", errors="replace"), body
)
- async def get_raw(self, uri, args={}, headers=None):
- """ Gets raw text from the given URI.
+ async def get_raw(
+ self, uri: str, args: QueryParams = {}, headers: Optional[RawHeaders] = None
+ ) -> bytes:
+ """Gets raw text from the given URI.
Args:
- uri (str): The URI to request, not including query parameters
- args (dict): A dictionary used to create query strings, defaults to
- None.
- **Note**: The value of each key is assumed to be an iterable
- and *not* a string.
- headers (dict[str|bytes, List[str|bytes]]|None): If not None, a map from
- header name to a list of values for that header
+ uri: The URI to request, not including query parameters
+ args: A dictionary used to create query strings
+ headers: a map from header name to a list of values for that header
Returns:
- Succeeds when we get *any* 2xx HTTP response, with the
+ Succeeds when we get a 2xx HTTP response, with the
HTTP body as bytes.
Raises:
+ RequestTimedOutError: if there is a timeout before the response headers
+ are received. Note there is currently no timeout on reading the response
+ body.
+
HttpResponseException on a non-2xx HTTP response.
"""
if len(args):
@@ -552,16 +614,29 @@ class SimpleHttpClient:
# XXX: FIXME: This is horribly copy-pasted from matrixfederationclient.
# The two should be factored out.
- async def get_file(self, url, output_stream, max_size=None, headers=None):
+ async def get_file(
+ self,
+ url: str,
+ output_stream: BinaryIO,
+ max_size: Optional[int] = None,
+ headers: Optional[RawHeaders] = None,
+ ) -> Tuple[int, Dict[bytes, List[bytes]], str, int]:
"""GETs a file from a given URL
Args:
- url (str): The URL to GET
- output_stream (file): File to write the response body to.
- headers (dict[str|bytes, List[str|bytes]]|None): If not None, a map from
- header name to a list of values for that header
+ url: The URL to GET
+ output_stream: File to write the response body to.
+ headers: A map from header name to a list of values for that header
Returns:
- A (int,dict,string,int) tuple of the file length, dict of the response
+ A tuple of the file length, dict of the response
headers, absolute URI of the response and HTTP response code.
+
+ Raises:
+ RequestTimedOutError: if there is a timeout before the response headers
+ are received. Note there is currently no timeout on reading the response
+ body.
+
+ SynapseError: if the response is not a 2xx, the remote file is too large, or
+ another exception happens during the download.
"""
actual_headers = {b"User-Agent": [self.user_agent]}
@@ -609,6 +684,18 @@ class SimpleHttpClient:
)
+def _timeout_to_request_timed_out_error(f: Failure):
+ if f.check(twisted_error.TimeoutError, twisted_error.ConnectingCancelledError):
+ # The TCP connection has its own timeout (set by the 'connectTimeout' param
+ # on the Agent), which raises twisted_error.TimeoutError exception.
+ raise RequestTimedOutError("Timeout connecting to remote server")
+ elif f.check(defer.TimeoutError, ResponseNeverReceived):
+ # this one means that we hit our overall timeout on the request
+ raise RequestTimedOutError("Timeout waiting for response from remote server")
+
+ return f
+
+
# XXX: FIXME: This is horribly copy-pasted from matrixfederationclient.
# The two should be factored out.
diff --git a/synapse/http/federation/well_known_resolver.py b/synapse/http/federation/well_known_resolver.py
index e6f067ca29..a306faa267 100644
--- a/synapse/http/federation/well_known_resolver.py
+++ b/synapse/http/federation/well_known_resolver.py
@@ -311,7 +311,7 @@ def _parse_cache_control(headers: Headers) -> Dict[bytes, Optional[bytes]]:
return cache_controls
-@attr.s()
+@attr.s(slots=True)
class _FetchWellKnownFailure(Exception):
# True if we didn't get a non-5xx HTTP response, i.e. this may or may not be
# a temporary failure.
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 5eaf3151ce..c23a4d7c0c 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -76,7 +76,7 @@ MAXINT = sys.maxsize
_next_id = 1
-@attr.s(frozen=True)
+@attr.s(slots=True, frozen=True)
class MatrixFederationRequest:
method = attr.ib()
"""HTTP method
@@ -171,7 +171,7 @@ async def _handle_json_response(
d = timeout_deferred(d, timeout=timeout_sec, reactor=reactor)
body = await make_deferred_yieldable(d)
- except TimeoutError as e:
+ except defer.TimeoutError as e:
logger.warning(
"{%s} [%s] Timed out reading response - %s %s",
request.txn_id,
@@ -473,8 +473,6 @@ class MatrixFederationHttpClient:
)
response = await request_deferred
- except TimeoutError as e:
- raise RequestSendFailed(e, can_retry=True) from e
except DNSLookupError as e:
raise RequestSendFailed(e, can_retry=retry_on_dns_fail) from e
except Exception as e:
@@ -657,10 +655,14 @@ class MatrixFederationHttpClient:
long_retries (bool): whether to use the long retry algorithm. See
docs on _send_request for details.
- timeout (int|None): number of milliseconds to wait for the response headers
- (including connecting to the server), *for each attempt*.
+ timeout (int|None): number of milliseconds to wait for the response.
self._default_timeout (60s) by default.
+ Note that we may make several attempts to send the request; this
+ timeout applies to the time spent waiting for response headers for
+ *each* attempt (including connection time) as well as the time spent
+ reading the response body after a 200 response.
+
ignore_backoff (bool): true to ignore the historical backoff data
and try the request anyway.
backoff_on_404 (bool): True if we should count a 404 response as
@@ -706,8 +708,13 @@ class MatrixFederationHttpClient:
timeout=timeout,
)
+ if timeout is not None:
+ _sec_timeout = timeout / 1000
+ else:
+ _sec_timeout = self.default_timeout
+
body = await _handle_json_response(
- self.reactor, self.default_timeout, request, response, start_ms
+ self.reactor, _sec_timeout, request, response, start_ms
)
return body
@@ -736,10 +743,14 @@ class MatrixFederationHttpClient:
long_retries (bool): whether to use the long retry algorithm. See
docs on _send_request for details.
- timeout (int|None): number of milliseconds to wait for the response headers
- (including connecting to the server), *for each attempt*.
+ timeout (int|None): number of milliseconds to wait for the response.
self._default_timeout (60s) by default.
+ Note that we may make several attempts to send the request; this
+ timeout applies to the time spent waiting for response headers for
+ *each* attempt (including connection time) as well as the time spent
+ reading the response body after a 200 response.
+
ignore_backoff (bool): true to ignore the historical backoff data and
try the request anyway.
@@ -803,10 +814,14 @@ class MatrixFederationHttpClient:
args (dict|None): A dictionary used to create query strings, defaults to
None.
- timeout (int|None): number of milliseconds to wait for the response headers
- (including connecting to the server), *for each attempt*.
+ timeout (int|None): number of milliseconds to wait for the response.
self._default_timeout (60s) by default.
+ Note that we may make several attempts to send the request; this
+ timeout applies to the time spent waiting for response headers for
+ *each* attempt (including connection time) as well as the time spent
+ reading the response body after a 200 response.
+
ignore_backoff (bool): true to ignore the historical backoff data
and try the request anyway.
@@ -842,8 +857,13 @@ class MatrixFederationHttpClient:
timeout=timeout,
)
+ if timeout is not None:
+ _sec_timeout = timeout / 1000
+ else:
+ _sec_timeout = self.default_timeout
+
body = await _handle_json_response(
- self.reactor, self.default_timeout, request, response, start_ms
+ self.reactor, _sec_timeout, request, response, start_ms
)
return body
@@ -867,10 +887,14 @@ class MatrixFederationHttpClient:
long_retries (bool): whether to use the long retry algorithm. See
docs on _send_request for details.
- timeout (int|None): number of milliseconds to wait for the response headers
- (including connecting to the server), *for each attempt*.
+ timeout (int|None): number of milliseconds to wait for the response.
self._default_timeout (60s) by default.
+ Note that we may make several attempts to send the request; this
+ timeout applies to the time spent waiting for response headers for
+ *each* attempt (including connection time) as well as the time spent
+ reading the response body after a 200 response.
+
ignore_backoff (bool): true to ignore the historical backoff data and
try the request anyway.
@@ -902,8 +926,13 @@ class MatrixFederationHttpClient:
ignore_backoff=ignore_backoff,
)
+ if timeout is not None:
+ _sec_timeout = timeout / 1000
+ else:
+ _sec_timeout = self.default_timeout
+
body = await _handle_json_response(
- self.reactor, self.default_timeout, request, response, start_ms
+ self.reactor, _sec_timeout, request, response, start_ms
)
return body
diff --git a/synapse/http/proxyagent.py b/synapse/http/proxyagent.py
index 332da02a8d..e32d3f43e0 100644
--- a/synapse/http/proxyagent.py
+++ b/synapse/http/proxyagent.py
@@ -44,8 +44,11 @@ class ProxyAgent(_AgentBase):
`BrowserLikePolicyForHTTPS`, so unless you have special
requirements you can leave this as-is.
- connectTimeout (float): The amount of time that this Agent will wait
- for the peer to accept a connection.
+ connectTimeout (Optional[float]): The amount of time that this Agent will wait
+ for the peer to accept a connection, in seconds. If 'None',
+ HostnameEndpoint's default (30s) will be used.
+
+ This is used for connections to both proxies and destination servers.
bindAddress (bytes): The local address for client sockets to bind to.
@@ -108,6 +111,15 @@ class ProxyAgent(_AgentBase):
Returns:
Deferred[IResponse]: completes when the header of the response has
been received (regardless of the response status code).
+
+ Can fail with:
+ SchemeNotSupported: if the uri is not http or https
+
+ twisted.internet.error.TimeoutError if the server we are connecting
+ to (proxy or destination) does not accept a connection before
+ connectTimeout.
+
+ ... other things too.
"""
uri = uri.strip()
if not _VALID_URI.match(uri):
|