diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 148eeb19dc..c23a4d7c0c 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -29,10 +29,11 @@ from zope.interface import implementer
from twisted.internet import defer, protocol
from twisted.internet.error import DNSLookupError
-from twisted.internet.interfaces import IReactorPluggableNameResolver
+from twisted.internet.interfaces import IReactorPluggableNameResolver, IReactorTime
from twisted.internet.task import _EPSILON, Cooperator
from twisted.web._newclient import ResponseDone
from twisted.web.http_headers import Headers
+from twisted.web.iweb import IResponse
import synapse.metrics
import synapse.util.retryutils
@@ -53,6 +54,7 @@ from synapse.logging.opentracing import (
start_active_span,
tags,
)
+from synapse.util import json_decoder
from synapse.util.async_helpers import timeout_deferred
from synapse.util.metrics import Measure
@@ -74,8 +76,8 @@ MAXINT = sys.maxsize
_next_id = 1
-@attr.s
-class MatrixFederationRequest(object):
+@attr.s(slots=True, frozen=True)
+class MatrixFederationRequest:
method = attr.ib()
"""HTTP method
:type: str
@@ -110,27 +112,52 @@ class MatrixFederationRequest(object):
:type: str|None
"""
+ uri = attr.ib(init=False, type=bytes)
+ """The URI of this request
+ """
+
def __attrs_post_init__(self):
global _next_id
- self.txn_id = "%s-O-%s" % (self.method, _next_id)
+ txn_id = "%s-O-%s" % (self.method, _next_id)
_next_id = (_next_id + 1) % (MAXINT - 1)
+ object.__setattr__(self, "txn_id", txn_id)
+
+ destination_bytes = self.destination.encode("ascii")
+ path_bytes = self.path.encode("ascii")
+ if self.query:
+ query_bytes = encode_query_args(self.query)
+ else:
+ query_bytes = b""
+
+ # The object is frozen so we can pre-compute this.
+ uri = urllib.parse.urlunparse(
+ (b"matrix", destination_bytes, path_bytes, None, query_bytes, b"")
+ )
+ object.__setattr__(self, "uri", uri)
+
def get_json(self):
if self.json_callback:
return self.json_callback()
return self.json
-@defer.inlineCallbacks
-def _handle_json_response(reactor, timeout_sec, request, response):
+async def _handle_json_response(
+ reactor: IReactorTime,
+ timeout_sec: float,
+ request: MatrixFederationRequest,
+ response: IResponse,
+ start_ms: int,
+):
"""
Reads the JSON body of a response, with a timeout
Args:
- reactor (IReactor): twisted reactor, for the timeout
- timeout_sec (float): number of seconds to wait for response to complete
- request (MatrixFederationRequest): the request that triggered the response
- response (IResponse): response to the request
+ reactor: twisted reactor, for the timeout
+ timeout_sec: number of seconds to wait for response to complete
+ request: the request that triggered the response
+ response: response to the request
+ start_ms: Timestamp when request was made
Returns:
dict: parsed JSON response
@@ -138,34 +165,48 @@ def _handle_json_response(reactor, timeout_sec, request, response):
try:
check_content_type_is_json(response.headers)
- d = treq.json_content(response)
+ # Use the custom JSON decoder (partially re-implements treq.json_content).
+ d = treq.text_content(response, encoding="utf-8")
+ d.addCallback(json_decoder.decode)
d = timeout_deferred(d, timeout=timeout_sec, reactor=reactor)
- body = yield make_deferred_yieldable(d)
- except TimeoutError as e:
+ body = await make_deferred_yieldable(d)
+ except defer.TimeoutError as e:
logger.warning(
- "{%s} [%s] Timed out reading response", request.txn_id, request.destination,
+ "{%s} [%s] Timed out reading response - %s %s",
+ request.txn_id,
+ request.destination,
+ request.method,
+ request.uri.decode("ascii"),
)
raise RequestSendFailed(e, can_retry=True) from e
except Exception as e:
logger.warning(
- "{%s} [%s] Error reading response: %s",
+ "{%s} [%s] Error reading response %s %s: %s",
request.txn_id,
request.destination,
+ request.method,
+ request.uri.decode("ascii"),
e,
)
raise
+
+ time_taken_secs = reactor.seconds() - start_ms / 1000
+
logger.info(
- "{%s} [%s] Completed: %d %s",
+ "{%s} [%s] Completed request: %d %s in %.2f secs - %s %s",
request.txn_id,
request.destination,
response.code,
response.phrase.decode("ascii", errors="replace"),
+ time_taken_secs,
+ request.method,
+ request.uri.decode("ascii"),
)
return body
-class MatrixFederationHttpClient(object):
+class MatrixFederationHttpClient:
"""HTTP client used to talk to other homeservers over the federation
protocol. Send client certificates and signs requests.
@@ -188,7 +229,7 @@ class MatrixFederationHttpClient(object):
)
@implementer(IReactorPluggableNameResolver)
- class Reactor(object):
+ class Reactor:
def __getattr__(_self, attr):
if attr == "nameResolver":
return nameResolver
@@ -224,8 +265,7 @@ class MatrixFederationHttpClient(object):
self._cooperator = Cooperator(scheduler=schedule)
- @defer.inlineCallbacks
- def _send_request_with_optional_trailing_slash(
+ async def _send_request_with_optional_trailing_slash(
self, request, try_trailing_slash_on_400=False, **send_request_args
):
"""Wrapper for _send_request which can optionally retry the request
@@ -246,10 +286,10 @@ class MatrixFederationHttpClient(object):
(except 429).
Returns:
- Deferred[Dict]: Parsed JSON response body.
+ Dict: Parsed JSON response body.
"""
try:
- response = yield self._send_request(request, **send_request_args)
+ response = await self._send_request(request, **send_request_args)
except HttpResponseException as e:
# Received an HTTP error > 300. Check if it meets the requirements
# to retry with a trailing slash
@@ -263,14 +303,15 @@ class MatrixFederationHttpClient(object):
# 'M_UNRECOGNIZED' which some endpoints can return when omitting a
# trailing slash on Synapse <= v0.99.3.
logger.info("Retrying request with trailing slash")
- request.path += "/"
- response = yield self._send_request(request, **send_request_args)
+ # Request is frozen so we create a new instance
+ request = attr.evolve(request, path=request.path + "/")
+
+ response = await self._send_request(request, **send_request_args)
return response
- @defer.inlineCallbacks
- def _send_request(
+ async def _send_request(
self,
request,
retry_on_dns_fail=True,
@@ -311,7 +352,7 @@ class MatrixFederationHttpClient(object):
backoff_on_404 (bool): Back off if we get a 404
Returns:
- Deferred[twisted.web.client.Response]: resolves with the HTTP
+ twisted.web.client.Response: resolves with the HTTP
response object on success.
Raises:
@@ -335,7 +376,7 @@ class MatrixFederationHttpClient(object):
):
raise FederationDeniedError(request.destination)
- limiter = yield synapse.util.retryutils.get_retry_limiter(
+ limiter = await synapse.util.retryutils.get_retry_limiter(
request.destination,
self.clock,
self._store,
@@ -376,9 +417,7 @@ class MatrixFederationHttpClient(object):
else:
retries_left = MAX_SHORT_RETRIES
- url_bytes = urllib.parse.urlunparse(
- (b"matrix", destination_bytes, path_bytes, None, query_bytes, b"")
- )
+ url_bytes = request.uri
url_str = url_bytes.decode("ascii")
url_to_sign_bytes = urllib.parse.urlunparse(
@@ -405,7 +444,7 @@ class MatrixFederationHttpClient(object):
headers_dict[b"Authorization"] = auth_headers
- logger.info(
+ logger.debug(
"{%s} [%s] Sending request: %s %s; timeout %fs",
request.txn_id,
request.destination,
@@ -433,13 +472,10 @@ class MatrixFederationHttpClient(object):
reactor=self.reactor,
)
- response = yield request_deferred
- except TimeoutError as e:
- raise RequestSendFailed(e, can_retry=True) from e
+ response = await request_deferred
except DNSLookupError as e:
raise RequestSendFailed(e, can_retry=retry_on_dns_fail) from e
except Exception as e:
- logger.info("Failed to send request: %s", e)
raise RequestSendFailed(e, can_retry=True) from e
incoming_responses_counter.labels(
@@ -447,6 +483,7 @@ class MatrixFederationHttpClient(object):
).inc()
set_tag(tags.HTTP_STATUS_CODE, response.code)
+ response_phrase = response.phrase.decode("ascii", errors="replace")
if 200 <= response.code < 300:
logger.debug(
@@ -454,7 +491,7 @@ class MatrixFederationHttpClient(object):
request.txn_id,
request.destination,
response.code,
- response.phrase.decode("ascii", errors="replace"),
+ response_phrase,
)
pass
else:
@@ -463,7 +500,7 @@ class MatrixFederationHttpClient(object):
request.txn_id,
request.destination,
response.code,
- response.phrase.decode("ascii", errors="replace"),
+ response_phrase,
)
# :'(
# Update transactions table?
@@ -473,7 +510,7 @@ class MatrixFederationHttpClient(object):
)
try:
- body = yield make_deferred_yieldable(d)
+ body = await make_deferred_yieldable(d)
except Exception as e:
# Eh, we're already going to raise an exception so lets
# ignore if this fails.
@@ -487,7 +524,7 @@ class MatrixFederationHttpClient(object):
)
body = None
- e = HttpResponseException(response.code, response.phrase, body)
+ e = HttpResponseException(response.code, response_phrase, body)
# Retry if the error is a 429 (Too Many Requests),
# otherwise just raise a standard HttpResponseException
@@ -498,7 +535,7 @@ class MatrixFederationHttpClient(object):
break
except RequestSendFailed as e:
- logger.warning(
+ logger.info(
"{%s} [%s] Request failed: %s %s: %s",
request.txn_id,
request.destination,
@@ -527,7 +564,7 @@ class MatrixFederationHttpClient(object):
delay,
)
- yield self.clock.sleep(delay)
+ await self.clock.sleep(delay)
retries_left -= 1
else:
raise
@@ -590,8 +627,7 @@ class MatrixFederationHttpClient(object):
)
return auth_headers
- @defer.inlineCallbacks
- def put_json(
+ async def put_json(
self,
destination,
path,
@@ -619,10 +655,14 @@ class MatrixFederationHttpClient(object):
long_retries (bool): whether to use the long retry algorithm. See
docs on _send_request for details.
- timeout (int|None): number of milliseconds to wait for the response headers
- (including connecting to the server), *for each attempt*.
+ timeout (int|None): number of milliseconds to wait for the response.
self._default_timeout (60s) by default.
+ Note that we may make several attempts to send the request; this
+ timeout applies to the time spent waiting for response headers for
+ *each* attempt (including connection time) as well as the time spent
+ reading the response body after a 200 response.
+
ignore_backoff (bool): true to ignore the historical backoff data
and try the request anyway.
backoff_on_404 (bool): True if we should count a 404 response as
@@ -635,7 +675,7 @@ class MatrixFederationHttpClient(object):
enabled.
Returns:
- Deferred[dict|list]: Succeeds when we get a 2xx HTTP response. The
+ dict|list: Succeeds when we get a 2xx HTTP response. The
result will be the decoded JSON body.
Raises:
@@ -657,7 +697,9 @@ class MatrixFederationHttpClient(object):
json=data,
)
- response = yield self._send_request_with_optional_trailing_slash(
+ start_ms = self.clock.time_msec()
+
+ response = await self._send_request_with_optional_trailing_slash(
request,
try_trailing_slash_on_400,
backoff_on_404=backoff_on_404,
@@ -666,14 +708,18 @@ class MatrixFederationHttpClient(object):
timeout=timeout,
)
- body = yield _handle_json_response(
- self.reactor, self.default_timeout, request, response
+ if timeout is not None:
+ _sec_timeout = timeout / 1000
+ else:
+ _sec_timeout = self.default_timeout
+
+ body = await _handle_json_response(
+ self.reactor, _sec_timeout, request, response, start_ms
)
return body
- @defer.inlineCallbacks
- def post_json(
+ async def post_json(
self,
destination,
path,
@@ -697,16 +743,20 @@ class MatrixFederationHttpClient(object):
long_retries (bool): whether to use the long retry algorithm. See
docs on _send_request for details.
- timeout (int|None): number of milliseconds to wait for the response headers
- (including connecting to the server), *for each attempt*.
+ timeout (int|None): number of milliseconds to wait for the response.
self._default_timeout (60s) by default.
+ Note that we may make several attempts to send the request; this
+ timeout applies to the time spent waiting for response headers for
+ *each* attempt (including connection time) as well as the time spent
+ reading the response body after a 200 response.
+
ignore_backoff (bool): true to ignore the historical backoff data and
try the request anyway.
args (dict): query params
Returns:
- Deferred[dict|list]: Succeeds when we get a 2xx HTTP response. The
+ dict|list: Succeeds when we get a 2xx HTTP response. The
result will be the decoded JSON body.
Raises:
@@ -724,7 +774,9 @@ class MatrixFederationHttpClient(object):
method="POST", destination=destination, path=path, query=args, json=data
)
- response = yield self._send_request(
+ start_ms = self.clock.time_msec()
+
+ response = await self._send_request(
request,
long_retries=long_retries,
timeout=timeout,
@@ -736,13 +788,12 @@ class MatrixFederationHttpClient(object):
else:
_sec_timeout = self.default_timeout
- body = yield _handle_json_response(
- self.reactor, _sec_timeout, request, response
+ body = await _handle_json_response(
+ self.reactor, _sec_timeout, request, response, start_ms,
)
return body
- @defer.inlineCallbacks
- def get_json(
+ async def get_json(
self,
destination,
path,
@@ -763,10 +814,14 @@ class MatrixFederationHttpClient(object):
args (dict|None): A dictionary used to create query strings, defaults to
None.
- timeout (int|None): number of milliseconds to wait for the response headers
- (including connecting to the server), *for each attempt*.
+ timeout (int|None): number of milliseconds to wait for the response.
self._default_timeout (60s) by default.
+ Note that we may make several attempts to send the request; this
+ timeout applies to the time spent waiting for response headers for
+ *each* attempt (including connection time) as well as the time spent
+ reading the response body after a 200 response.
+
ignore_backoff (bool): true to ignore the historical backoff data
and try the request anyway.
@@ -774,7 +829,7 @@ class MatrixFederationHttpClient(object):
response we should try appending a trailing slash to the end of
the request. Workaround for #3622 in Synapse <= v0.99.3.
Returns:
- Deferred[dict|list]: Succeeds when we get a 2xx HTTP response. The
+ dict|list: Succeeds when we get a 2xx HTTP response. The
result will be the decoded JSON body.
Raises:
@@ -791,7 +846,9 @@ class MatrixFederationHttpClient(object):
method="GET", destination=destination, path=path, query=args
)
- response = yield self._send_request_with_optional_trailing_slash(
+ start_ms = self.clock.time_msec()
+
+ response = await self._send_request_with_optional_trailing_slash(
request,
try_trailing_slash_on_400,
backoff_on_404=False,
@@ -800,14 +857,18 @@ class MatrixFederationHttpClient(object):
timeout=timeout,
)
- body = yield _handle_json_response(
- self.reactor, self.default_timeout, request, response
+ if timeout is not None:
+ _sec_timeout = timeout / 1000
+ else:
+ _sec_timeout = self.default_timeout
+
+ body = await _handle_json_response(
+ self.reactor, _sec_timeout, request, response, start_ms
)
return body
- @defer.inlineCallbacks
- def delete_json(
+ async def delete_json(
self,
destination,
path,
@@ -826,16 +887,20 @@ class MatrixFederationHttpClient(object):
long_retries (bool): whether to use the long retry algorithm. See
docs on _send_request for details.
- timeout (int|None): number of milliseconds to wait for the response headers
- (including connecting to the server), *for each attempt*.
+ timeout (int|None): number of milliseconds to wait for the response.
self._default_timeout (60s) by default.
+ Note that we may make several attempts to send the request; this
+ timeout applies to the time spent waiting for response headers for
+ *each* attempt (including connection time) as well as the time spent
+ reading the response body after a 200 response.
+
ignore_backoff (bool): true to ignore the historical backoff data and
try the request anyway.
args (dict): query params
Returns:
- Deferred[dict|list]: Succeeds when we get a 2xx HTTP response. The
+ dict|list: Succeeds when we get a 2xx HTTP response. The
result will be the decoded JSON body.
Raises:
@@ -852,20 +917,26 @@ class MatrixFederationHttpClient(object):
method="DELETE", destination=destination, path=path, query=args
)
- response = yield self._send_request(
+ start_ms = self.clock.time_msec()
+
+ response = await self._send_request(
request,
long_retries=long_retries,
timeout=timeout,
ignore_backoff=ignore_backoff,
)
- body = yield _handle_json_response(
- self.reactor, self.default_timeout, request, response
+ if timeout is not None:
+ _sec_timeout = timeout / 1000
+ else:
+ _sec_timeout = self.default_timeout
+
+ body = await _handle_json_response(
+ self.reactor, _sec_timeout, request, response, start_ms
)
return body
- @defer.inlineCallbacks
- def get_file(
+ async def get_file(
self,
destination,
path,
@@ -885,7 +956,7 @@ class MatrixFederationHttpClient(object):
and try the request anyway.
Returns:
- Deferred[tuple[int, dict]]: Resolves with an (int,dict) tuple of
+ tuple[int, dict]: Resolves with an (int,dict) tuple of
the file length and a dict of the response headers.
Raises:
@@ -902,7 +973,7 @@ class MatrixFederationHttpClient(object):
method="GET", destination=destination, path=path, query=args
)
- response = yield self._send_request(
+ response = await self._send_request(
request, retry_on_dns_fail=retry_on_dns_fail, ignore_backoff=ignore_backoff
)
@@ -911,7 +982,7 @@ class MatrixFederationHttpClient(object):
try:
d = _readBodyToFile(response, output_stream, max_size)
d.addTimeout(self.default_timeout, self.reactor)
- length = yield make_deferred_yieldable(d)
+ length = await make_deferred_yieldable(d)
except Exception as e:
logger.warning(
"{%s} [%s] Error reading response: %s",
@@ -921,12 +992,14 @@ class MatrixFederationHttpClient(object):
)
raise
logger.info(
- "{%s} [%s] Completed: %d %s [%d bytes]",
+ "{%s} [%s] Completed: %d %s [%d bytes] %s %s",
request.txn_id,
request.destination,
response.code,
response.phrase.decode("ascii", errors="replace"),
length,
+ request.method,
+ request.uri.decode("ascii"),
)
return (length, headers)
|