summary refs log tree commit diff
path: root/synapse/http/matrixfederationclient.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/http/matrixfederationclient.py')
-rw-r--r--synapse/http/matrixfederationclient.py233
1 files changed, 153 insertions, 80 deletions
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py

index 148eeb19dc..c23a4d7c0c 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py
@@ -29,10 +29,11 @@ from zope.interface import implementer from twisted.internet import defer, protocol from twisted.internet.error import DNSLookupError -from twisted.internet.interfaces import IReactorPluggableNameResolver +from twisted.internet.interfaces import IReactorPluggableNameResolver, IReactorTime from twisted.internet.task import _EPSILON, Cooperator from twisted.web._newclient import ResponseDone from twisted.web.http_headers import Headers +from twisted.web.iweb import IResponse import synapse.metrics import synapse.util.retryutils @@ -53,6 +54,7 @@ from synapse.logging.opentracing import ( start_active_span, tags, ) +from synapse.util import json_decoder from synapse.util.async_helpers import timeout_deferred from synapse.util.metrics import Measure @@ -74,8 +76,8 @@ MAXINT = sys.maxsize _next_id = 1 -@attr.s -class MatrixFederationRequest(object): +@attr.s(slots=True, frozen=True) +class MatrixFederationRequest: method = attr.ib() """HTTP method :type: str @@ -110,27 +112,52 @@ class MatrixFederationRequest(object): :type: str|None """ + uri = attr.ib(init=False, type=bytes) + """The URI of this request + """ + def __attrs_post_init__(self): global _next_id - self.txn_id = "%s-O-%s" % (self.method, _next_id) + txn_id = "%s-O-%s" % (self.method, _next_id) _next_id = (_next_id + 1) % (MAXINT - 1) + object.__setattr__(self, "txn_id", txn_id) + + destination_bytes = self.destination.encode("ascii") + path_bytes = self.path.encode("ascii") + if self.query: + query_bytes = encode_query_args(self.query) + else: + query_bytes = b"" + + # The object is frozen so we can pre-compute this. + uri = urllib.parse.urlunparse( + (b"matrix", destination_bytes, path_bytes, None, query_bytes, b"") + ) + object.__setattr__(self, "uri", uri) + def get_json(self): if self.json_callback: return self.json_callback() return self.json -@defer.inlineCallbacks -def _handle_json_response(reactor, timeout_sec, request, response): +async def _handle_json_response( + reactor: IReactorTime, + timeout_sec: float, + request: MatrixFederationRequest, + response: IResponse, + start_ms: int, +): """ Reads the JSON body of a response, with a timeout Args: - reactor (IReactor): twisted reactor, for the timeout - timeout_sec (float): number of seconds to wait for response to complete - request (MatrixFederationRequest): the request that triggered the response - response (IResponse): response to the request + reactor: twisted reactor, for the timeout + timeout_sec: number of seconds to wait for response to complete + request: the request that triggered the response + response: response to the request + start_ms: Timestamp when request was made Returns: dict: parsed JSON response @@ -138,34 +165,48 @@ def _handle_json_response(reactor, timeout_sec, request, response): try: check_content_type_is_json(response.headers) - d = treq.json_content(response) + # Use the custom JSON decoder (partially re-implements treq.json_content). + d = treq.text_content(response, encoding="utf-8") + d.addCallback(json_decoder.decode) d = timeout_deferred(d, timeout=timeout_sec, reactor=reactor) - body = yield make_deferred_yieldable(d) - except TimeoutError as e: + body = await make_deferred_yieldable(d) + except defer.TimeoutError as e: logger.warning( - "{%s} [%s] Timed out reading response", request.txn_id, request.destination, + "{%s} [%s] Timed out reading response - %s %s", + request.txn_id, + request.destination, + request.method, + request.uri.decode("ascii"), ) raise RequestSendFailed(e, can_retry=True) from e except Exception as e: logger.warning( - "{%s} [%s] Error reading response: %s", + "{%s} [%s] Error reading response %s %s: %s", request.txn_id, request.destination, + request.method, + request.uri.decode("ascii"), e, ) raise + + time_taken_secs = reactor.seconds() - start_ms / 1000 + logger.info( - "{%s} [%s] Completed: %d %s", + "{%s} [%s] Completed request: %d %s in %.2f secs - %s %s", request.txn_id, request.destination, response.code, response.phrase.decode("ascii", errors="replace"), + time_taken_secs, + request.method, + request.uri.decode("ascii"), ) return body -class MatrixFederationHttpClient(object): +class MatrixFederationHttpClient: """HTTP client used to talk to other homeservers over the federation protocol. Send client certificates and signs requests. @@ -188,7 +229,7 @@ class MatrixFederationHttpClient(object): ) @implementer(IReactorPluggableNameResolver) - class Reactor(object): + class Reactor: def __getattr__(_self, attr): if attr == "nameResolver": return nameResolver @@ -224,8 +265,7 @@ class MatrixFederationHttpClient(object): self._cooperator = Cooperator(scheduler=schedule) - @defer.inlineCallbacks - def _send_request_with_optional_trailing_slash( + async def _send_request_with_optional_trailing_slash( self, request, try_trailing_slash_on_400=False, **send_request_args ): """Wrapper for _send_request which can optionally retry the request @@ -246,10 +286,10 @@ class MatrixFederationHttpClient(object): (except 429). Returns: - Deferred[Dict]: Parsed JSON response body. + Dict: Parsed JSON response body. """ try: - response = yield self._send_request(request, **send_request_args) + response = await self._send_request(request, **send_request_args) except HttpResponseException as e: # Received an HTTP error > 300. Check if it meets the requirements # to retry with a trailing slash @@ -263,14 +303,15 @@ class MatrixFederationHttpClient(object): # 'M_UNRECOGNIZED' which some endpoints can return when omitting a # trailing slash on Synapse <= v0.99.3. logger.info("Retrying request with trailing slash") - request.path += "/" - response = yield self._send_request(request, **send_request_args) + # Request is frozen so we create a new instance + request = attr.evolve(request, path=request.path + "/") + + response = await self._send_request(request, **send_request_args) return response - @defer.inlineCallbacks - def _send_request( + async def _send_request( self, request, retry_on_dns_fail=True, @@ -311,7 +352,7 @@ class MatrixFederationHttpClient(object): backoff_on_404 (bool): Back off if we get a 404 Returns: - Deferred[twisted.web.client.Response]: resolves with the HTTP + twisted.web.client.Response: resolves with the HTTP response object on success. Raises: @@ -335,7 +376,7 @@ class MatrixFederationHttpClient(object): ): raise FederationDeniedError(request.destination) - limiter = yield synapse.util.retryutils.get_retry_limiter( + limiter = await synapse.util.retryutils.get_retry_limiter( request.destination, self.clock, self._store, @@ -376,9 +417,7 @@ class MatrixFederationHttpClient(object): else: retries_left = MAX_SHORT_RETRIES - url_bytes = urllib.parse.urlunparse( - (b"matrix", destination_bytes, path_bytes, None, query_bytes, b"") - ) + url_bytes = request.uri url_str = url_bytes.decode("ascii") url_to_sign_bytes = urllib.parse.urlunparse( @@ -405,7 +444,7 @@ class MatrixFederationHttpClient(object): headers_dict[b"Authorization"] = auth_headers - logger.info( + logger.debug( "{%s} [%s] Sending request: %s %s; timeout %fs", request.txn_id, request.destination, @@ -433,13 +472,10 @@ class MatrixFederationHttpClient(object): reactor=self.reactor, ) - response = yield request_deferred - except TimeoutError as e: - raise RequestSendFailed(e, can_retry=True) from e + response = await request_deferred except DNSLookupError as e: raise RequestSendFailed(e, can_retry=retry_on_dns_fail) from e except Exception as e: - logger.info("Failed to send request: %s", e) raise RequestSendFailed(e, can_retry=True) from e incoming_responses_counter.labels( @@ -447,6 +483,7 @@ class MatrixFederationHttpClient(object): ).inc() set_tag(tags.HTTP_STATUS_CODE, response.code) + response_phrase = response.phrase.decode("ascii", errors="replace") if 200 <= response.code < 300: logger.debug( @@ -454,7 +491,7 @@ class MatrixFederationHttpClient(object): request.txn_id, request.destination, response.code, - response.phrase.decode("ascii", errors="replace"), + response_phrase, ) pass else: @@ -463,7 +500,7 @@ class MatrixFederationHttpClient(object): request.txn_id, request.destination, response.code, - response.phrase.decode("ascii", errors="replace"), + response_phrase, ) # :'( # Update transactions table? @@ -473,7 +510,7 @@ class MatrixFederationHttpClient(object): ) try: - body = yield make_deferred_yieldable(d) + body = await make_deferred_yieldable(d) except Exception as e: # Eh, we're already going to raise an exception so lets # ignore if this fails. @@ -487,7 +524,7 @@ class MatrixFederationHttpClient(object): ) body = None - e = HttpResponseException(response.code, response.phrase, body) + e = HttpResponseException(response.code, response_phrase, body) # Retry if the error is a 429 (Too Many Requests), # otherwise just raise a standard HttpResponseException @@ -498,7 +535,7 @@ class MatrixFederationHttpClient(object): break except RequestSendFailed as e: - logger.warning( + logger.info( "{%s} [%s] Request failed: %s %s: %s", request.txn_id, request.destination, @@ -527,7 +564,7 @@ class MatrixFederationHttpClient(object): delay, ) - yield self.clock.sleep(delay) + await self.clock.sleep(delay) retries_left -= 1 else: raise @@ -590,8 +627,7 @@ class MatrixFederationHttpClient(object): ) return auth_headers - @defer.inlineCallbacks - def put_json( + async def put_json( self, destination, path, @@ -619,10 +655,14 @@ class MatrixFederationHttpClient(object): long_retries (bool): whether to use the long retry algorithm. See docs on _send_request for details. - timeout (int|None): number of milliseconds to wait for the response headers - (including connecting to the server), *for each attempt*. + timeout (int|None): number of milliseconds to wait for the response. self._default_timeout (60s) by default. + Note that we may make several attempts to send the request; this + timeout applies to the time spent waiting for response headers for + *each* attempt (including connection time) as well as the time spent + reading the response body after a 200 response. + ignore_backoff (bool): true to ignore the historical backoff data and try the request anyway. backoff_on_404 (bool): True if we should count a 404 response as @@ -635,7 +675,7 @@ class MatrixFederationHttpClient(object): enabled. Returns: - Deferred[dict|list]: Succeeds when we get a 2xx HTTP response. The + dict|list: Succeeds when we get a 2xx HTTP response. The result will be the decoded JSON body. Raises: @@ -657,7 +697,9 @@ class MatrixFederationHttpClient(object): json=data, ) - response = yield self._send_request_with_optional_trailing_slash( + start_ms = self.clock.time_msec() + + response = await self._send_request_with_optional_trailing_slash( request, try_trailing_slash_on_400, backoff_on_404=backoff_on_404, @@ -666,14 +708,18 @@ class MatrixFederationHttpClient(object): timeout=timeout, ) - body = yield _handle_json_response( - self.reactor, self.default_timeout, request, response + if timeout is not None: + _sec_timeout = timeout / 1000 + else: + _sec_timeout = self.default_timeout + + body = await _handle_json_response( + self.reactor, _sec_timeout, request, response, start_ms ) return body - @defer.inlineCallbacks - def post_json( + async def post_json( self, destination, path, @@ -697,16 +743,20 @@ class MatrixFederationHttpClient(object): long_retries (bool): whether to use the long retry algorithm. See docs on _send_request for details. - timeout (int|None): number of milliseconds to wait for the response headers - (including connecting to the server), *for each attempt*. + timeout (int|None): number of milliseconds to wait for the response. self._default_timeout (60s) by default. + Note that we may make several attempts to send the request; this + timeout applies to the time spent waiting for response headers for + *each* attempt (including connection time) as well as the time spent + reading the response body after a 200 response. + ignore_backoff (bool): true to ignore the historical backoff data and try the request anyway. args (dict): query params Returns: - Deferred[dict|list]: Succeeds when we get a 2xx HTTP response. The + dict|list: Succeeds when we get a 2xx HTTP response. The result will be the decoded JSON body. Raises: @@ -724,7 +774,9 @@ class MatrixFederationHttpClient(object): method="POST", destination=destination, path=path, query=args, json=data ) - response = yield self._send_request( + start_ms = self.clock.time_msec() + + response = await self._send_request( request, long_retries=long_retries, timeout=timeout, @@ -736,13 +788,12 @@ class MatrixFederationHttpClient(object): else: _sec_timeout = self.default_timeout - body = yield _handle_json_response( - self.reactor, _sec_timeout, request, response + body = await _handle_json_response( + self.reactor, _sec_timeout, request, response, start_ms, ) return body - @defer.inlineCallbacks - def get_json( + async def get_json( self, destination, path, @@ -763,10 +814,14 @@ class MatrixFederationHttpClient(object): args (dict|None): A dictionary used to create query strings, defaults to None. - timeout (int|None): number of milliseconds to wait for the response headers - (including connecting to the server), *for each attempt*. + timeout (int|None): number of milliseconds to wait for the response. self._default_timeout (60s) by default. + Note that we may make several attempts to send the request; this + timeout applies to the time spent waiting for response headers for + *each* attempt (including connection time) as well as the time spent + reading the response body after a 200 response. + ignore_backoff (bool): true to ignore the historical backoff data and try the request anyway. @@ -774,7 +829,7 @@ class MatrixFederationHttpClient(object): response we should try appending a trailing slash to the end of the request. Workaround for #3622 in Synapse <= v0.99.3. Returns: - Deferred[dict|list]: Succeeds when we get a 2xx HTTP response. The + dict|list: Succeeds when we get a 2xx HTTP response. The result will be the decoded JSON body. Raises: @@ -791,7 +846,9 @@ class MatrixFederationHttpClient(object): method="GET", destination=destination, path=path, query=args ) - response = yield self._send_request_with_optional_trailing_slash( + start_ms = self.clock.time_msec() + + response = await self._send_request_with_optional_trailing_slash( request, try_trailing_slash_on_400, backoff_on_404=False, @@ -800,14 +857,18 @@ class MatrixFederationHttpClient(object): timeout=timeout, ) - body = yield _handle_json_response( - self.reactor, self.default_timeout, request, response + if timeout is not None: + _sec_timeout = timeout / 1000 + else: + _sec_timeout = self.default_timeout + + body = await _handle_json_response( + self.reactor, _sec_timeout, request, response, start_ms ) return body - @defer.inlineCallbacks - def delete_json( + async def delete_json( self, destination, path, @@ -826,16 +887,20 @@ class MatrixFederationHttpClient(object): long_retries (bool): whether to use the long retry algorithm. See docs on _send_request for details. - timeout (int|None): number of milliseconds to wait for the response headers - (including connecting to the server), *for each attempt*. + timeout (int|None): number of milliseconds to wait for the response. self._default_timeout (60s) by default. + Note that we may make several attempts to send the request; this + timeout applies to the time spent waiting for response headers for + *each* attempt (including connection time) as well as the time spent + reading the response body after a 200 response. + ignore_backoff (bool): true to ignore the historical backoff data and try the request anyway. args (dict): query params Returns: - Deferred[dict|list]: Succeeds when we get a 2xx HTTP response. The + dict|list: Succeeds when we get a 2xx HTTP response. The result will be the decoded JSON body. Raises: @@ -852,20 +917,26 @@ class MatrixFederationHttpClient(object): method="DELETE", destination=destination, path=path, query=args ) - response = yield self._send_request( + start_ms = self.clock.time_msec() + + response = await self._send_request( request, long_retries=long_retries, timeout=timeout, ignore_backoff=ignore_backoff, ) - body = yield _handle_json_response( - self.reactor, self.default_timeout, request, response + if timeout is not None: + _sec_timeout = timeout / 1000 + else: + _sec_timeout = self.default_timeout + + body = await _handle_json_response( + self.reactor, _sec_timeout, request, response, start_ms ) return body - @defer.inlineCallbacks - def get_file( + async def get_file( self, destination, path, @@ -885,7 +956,7 @@ class MatrixFederationHttpClient(object): and try the request anyway. Returns: - Deferred[tuple[int, dict]]: Resolves with an (int,dict) tuple of + tuple[int, dict]: Resolves with an (int,dict) tuple of the file length and a dict of the response headers. Raises: @@ -902,7 +973,7 @@ class MatrixFederationHttpClient(object): method="GET", destination=destination, path=path, query=args ) - response = yield self._send_request( + response = await self._send_request( request, retry_on_dns_fail=retry_on_dns_fail, ignore_backoff=ignore_backoff ) @@ -911,7 +982,7 @@ class MatrixFederationHttpClient(object): try: d = _readBodyToFile(response, output_stream, max_size) d.addTimeout(self.default_timeout, self.reactor) - length = yield make_deferred_yieldable(d) + length = await make_deferred_yieldable(d) except Exception as e: logger.warning( "{%s} [%s] Error reading response: %s", @@ -921,12 +992,14 @@ class MatrixFederationHttpClient(object): ) raise logger.info( - "{%s} [%s] Completed: %d %s [%d bytes]", + "{%s} [%s] Completed: %d %s [%d bytes] %s %s", request.txn_id, request.destination, response.code, response.phrase.decode("ascii", errors="replace"), length, + request.method, + request.uri.decode("ascii"), ) return (length, headers)