diff options
author | Erik Johnston <erik@matrix.org> | 2020-08-11 22:03:14 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2020-08-11 22:03:14 +0100 |
commit | fdb46b5442c63212a52e2296491a23f1935f9929 (patch) | |
tree | c46d0940415f96e3c0383c35f1d39b7462d5c20c /synapse/http/matrixfederationclient.py | |
parent | Add comment explaining cast (diff) | |
parent | Auto set logging filter (#8051) (diff) | |
download | synapse-fdb46b5442c63212a52e2296491a23f1935f9929.tar.xz |
Merge remote-tracking branch 'origin/develop' into erikj/type_server
Diffstat (limited to 'synapse/http/matrixfederationclient.py')
-rw-r--r-- | synapse/http/matrixfederationclient.py | 94 |
1 files changed, 71 insertions, 23 deletions
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 2a6373937a..738be43f46 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -29,10 +29,11 @@ from zope.interface import implementer from twisted.internet import defer, protocol from twisted.internet.error import DNSLookupError -from twisted.internet.interfaces import IReactorPluggableNameResolver +from twisted.internet.interfaces import IReactorPluggableNameResolver, IReactorTime from twisted.internet.task import _EPSILON, Cooperator from twisted.web._newclient import ResponseDone from twisted.web.http_headers import Headers +from twisted.web.iweb import IResponse import synapse.metrics import synapse.util.retryutils @@ -74,7 +75,7 @@ MAXINT = sys.maxsize _next_id = 1 -@attr.s +@attr.s(frozen=True) class MatrixFederationRequest(object): method = attr.ib() """HTTP method @@ -110,26 +111,52 @@ class MatrixFederationRequest(object): :type: str|None """ + uri = attr.ib(init=False, type=bytes) + """The URI of this request + """ + def __attrs_post_init__(self): global _next_id - self.txn_id = "%s-O-%s" % (self.method, _next_id) + txn_id = "%s-O-%s" % (self.method, _next_id) _next_id = (_next_id + 1) % (MAXINT - 1) + object.__setattr__(self, "txn_id", txn_id) + + destination_bytes = self.destination.encode("ascii") + path_bytes = self.path.encode("ascii") + if self.query: + query_bytes = encode_query_args(self.query) + else: + query_bytes = b"" + + # The object is frozen so we can pre-compute this. + uri = urllib.parse.urlunparse( + (b"matrix", destination_bytes, path_bytes, None, query_bytes, b"") + ) + object.__setattr__(self, "uri", uri) + def get_json(self): if self.json_callback: return self.json_callback() return self.json -async def _handle_json_response(reactor, timeout_sec, request, response): +async def _handle_json_response( + reactor: IReactorTime, + timeout_sec: float, + request: MatrixFederationRequest, + response: IResponse, + start_ms: int, +): """ Reads the JSON body of a response, with a timeout Args: - reactor (IReactor): twisted reactor, for the timeout - timeout_sec (float): number of seconds to wait for response to complete - request (MatrixFederationRequest): the request that triggered the response - response (IResponse): response to the request + reactor: twisted reactor, for the timeout + timeout_sec: number of seconds to wait for response to complete + request: the request that triggered the response + response: response to the request + start_ms: Timestamp when request was made Returns: dict: parsed JSON response @@ -143,23 +170,35 @@ async def _handle_json_response(reactor, timeout_sec, request, response): body = await make_deferred_yieldable(d) except TimeoutError as e: logger.warning( - "{%s} [%s] Timed out reading response", request.txn_id, request.destination, + "{%s} [%s] Timed out reading response - %s %s", + request.txn_id, + request.destination, + request.method, + request.uri.decode("ascii"), ) raise RequestSendFailed(e, can_retry=True) from e except Exception as e: logger.warning( - "{%s} [%s] Error reading response: %s", + "{%s} [%s] Error reading response %s %s: %s", request.txn_id, request.destination, + request.method, + request.uri.decode("ascii"), e, ) raise + + time_taken_secs = reactor.seconds() - start_ms / 1000 + logger.info( - "{%s} [%s] Completed: %d %s", + "{%s} [%s] Completed request: %d %s in %.2f secs - %s %s", request.txn_id, request.destination, response.code, response.phrase.decode("ascii", errors="replace"), + time_taken_secs, + request.method, + request.uri.decode("ascii"), ) return body @@ -261,7 +300,9 @@ class MatrixFederationHttpClient(object): # 'M_UNRECOGNIZED' which some endpoints can return when omitting a # trailing slash on Synapse <= v0.99.3. logger.info("Retrying request with trailing slash") - request.path += "/" + + # Request is frozen so we create a new instance + request = attr.evolve(request, path=request.path + "/") response = await self._send_request(request, **send_request_args) @@ -373,9 +414,7 @@ class MatrixFederationHttpClient(object): else: retries_left = MAX_SHORT_RETRIES - url_bytes = urllib.parse.urlunparse( - (b"matrix", destination_bytes, path_bytes, None, query_bytes, b"") - ) + url_bytes = request.uri url_str = url_bytes.decode("ascii") url_to_sign_bytes = urllib.parse.urlunparse( @@ -402,7 +441,7 @@ class MatrixFederationHttpClient(object): headers_dict[b"Authorization"] = auth_headers - logger.info( + logger.debug( "{%s} [%s] Sending request: %s %s; timeout %fs", request.txn_id, request.destination, @@ -436,7 +475,6 @@ class MatrixFederationHttpClient(object): except DNSLookupError as e: raise RequestSendFailed(e, can_retry=retry_on_dns_fail) from e except Exception as e: - logger.info("Failed to send request: %s", e) raise RequestSendFailed(e, can_retry=True) from e incoming_responses_counter.labels( @@ -496,7 +534,7 @@ class MatrixFederationHttpClient(object): break except RequestSendFailed as e: - logger.warning( + logger.info( "{%s} [%s] Request failed: %s %s: %s", request.txn_id, request.destination, @@ -654,6 +692,8 @@ class MatrixFederationHttpClient(object): json=data, ) + start_ms = self.clock.time_msec() + response = await self._send_request_with_optional_trailing_slash( request, try_trailing_slash_on_400, @@ -664,7 +704,7 @@ class MatrixFederationHttpClient(object): ) body = await _handle_json_response( - self.reactor, self.default_timeout, request, response + self.reactor, self.default_timeout, request, response, start_ms ) return body @@ -720,6 +760,8 @@ class MatrixFederationHttpClient(object): method="POST", destination=destination, path=path, query=args, json=data ) + start_ms = self.clock.time_msec() + response = await self._send_request( request, long_retries=long_retries, @@ -733,7 +775,7 @@ class MatrixFederationHttpClient(object): _sec_timeout = self.default_timeout body = await _handle_json_response( - self.reactor, _sec_timeout, request, response + self.reactor, _sec_timeout, request, response, start_ms, ) return body @@ -786,6 +828,8 @@ class MatrixFederationHttpClient(object): method="GET", destination=destination, path=path, query=args ) + start_ms = self.clock.time_msec() + response = await self._send_request_with_optional_trailing_slash( request, try_trailing_slash_on_400, @@ -796,7 +840,7 @@ class MatrixFederationHttpClient(object): ) body = await _handle_json_response( - self.reactor, self.default_timeout, request, response + self.reactor, self.default_timeout, request, response, start_ms ) return body @@ -846,6 +890,8 @@ class MatrixFederationHttpClient(object): method="DELETE", destination=destination, path=path, query=args ) + start_ms = self.clock.time_msec() + response = await self._send_request( request, long_retries=long_retries, @@ -854,7 +900,7 @@ class MatrixFederationHttpClient(object): ) body = await _handle_json_response( - self.reactor, self.default_timeout, request, response + self.reactor, self.default_timeout, request, response, start_ms ) return body @@ -914,12 +960,14 @@ class MatrixFederationHttpClient(object): ) raise logger.info( - "{%s} [%s] Completed: %d %s [%d bytes]", + "{%s} [%s] Completed: %d %s [%d bytes] %s %s", request.txn_id, request.destination, response.code, response.phrase.decode("ascii", errors="replace"), length, + request.method, + request.uri.decode("ascii"), ) return (length, headers) |