diff options
Diffstat (limited to '')
-rw-r--r-- | synapse/http/matrixfederationclient.py | 242 |
1 files changed, 121 insertions, 121 deletions
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 663ea72a7a..5ef8bb60a3 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -54,10 +54,12 @@ from synapse.util.metrics import Measure logger = logging.getLogger(__name__) -outgoing_requests_counter = Counter("synapse_http_matrixfederationclient_requests", - "", ["method"]) -incoming_responses_counter = Counter("synapse_http_matrixfederationclient_responses", - "", ["method", "code"]) +outgoing_requests_counter = Counter( + "synapse_http_matrixfederationclient_requests", "", ["method"] +) +incoming_responses_counter = Counter( + "synapse_http_matrixfederationclient_responses", "", ["method", "code"] +) MAX_LONG_RETRIES = 10 @@ -137,11 +139,7 @@ def _handle_json_response(reactor, timeout_sec, request, response): check_content_type_is_json(response.headers) d = treq.json_content(response) - d = timeout_deferred( - d, - timeout=timeout_sec, - reactor=reactor, - ) + d = timeout_deferred(d, timeout=timeout_sec, reactor=reactor) body = yield make_deferred_yieldable(d) except Exception as e: @@ -157,7 +155,7 @@ def _handle_json_response(reactor, timeout_sec, request, response): request.txn_id, request.destination, response.code, - response.phrase.decode('ascii', errors='replace'), + response.phrase.decode("ascii", errors="replace"), ) defer.returnValue(body) @@ -181,7 +179,7 @@ class MatrixFederationHttpClient(object): # We need to use a DNS resolver which filters out blacklisted IP # addresses, to prevent DNS rebinding. nameResolver = IPBlacklistingResolver( - real_reactor, None, hs.config.federation_ip_range_blacklist, + real_reactor, None, hs.config.federation_ip_range_blacklist ) @implementer(IReactorPluggableNameResolver) @@ -194,21 +192,19 @@ class MatrixFederationHttpClient(object): self.reactor = Reactor() - self.agent = MatrixFederationAgent( - self.reactor, - tls_client_options_factory, - ) + self.agent = MatrixFederationAgent(self.reactor, tls_client_options_factory) # Use a BlacklistingAgentWrapper to prevent circumventing the IP # blacklist via IP literals in server names self.agent = BlacklistingAgentWrapper( - self.agent, self.reactor, + self.agent, + self.reactor, ip_blacklist=hs.config.federation_ip_range_blacklist, ) self.clock = hs.get_clock() self._store = hs.get_datastore() - self.version_string_bytes = hs.version_string.encode('ascii') + self.version_string_bytes = hs.version_string.encode("ascii") self.default_timeout = 60 def schedule(x): @@ -218,10 +214,7 @@ class MatrixFederationHttpClient(object): @defer.inlineCallbacks def _send_request_with_optional_trailing_slash( - self, - request, - try_trailing_slash_on_400=False, - **send_request_args + self, request, try_trailing_slash_on_400=False, **send_request_args ): """Wrapper for _send_request which can optionally retry the request upon receiving a combination of a 400 HTTP response code and a @@ -244,9 +237,7 @@ class MatrixFederationHttpClient(object): Deferred[Dict]: Parsed JSON response body. """ try: - response = yield self._send_request( - request, **send_request_args - ) + response = yield self._send_request(request, **send_request_args) except HttpResponseException as e: # Received an HTTP error > 300. Check if it meets the requirements # to retry with a trailing slash @@ -262,9 +253,7 @@ class MatrixFederationHttpClient(object): logger.info("Retrying request with trailing slash") request.path += "/" - response = yield self._send_request( - request, **send_request_args - ) + response = yield self._send_request(request, **send_request_args) defer.returnValue(response) @@ -329,8 +318,8 @@ class MatrixFederationHttpClient(object): _sec_timeout = self.default_timeout if ( - self.hs.config.federation_domain_whitelist is not None and - request.destination not in self.hs.config.federation_domain_whitelist + self.hs.config.federation_domain_whitelist is not None + and request.destination not in self.hs.config.federation_domain_whitelist ): raise FederationDeniedError(request.destination) @@ -350,9 +339,7 @@ class MatrixFederationHttpClient(object): else: query_bytes = b"" - headers_dict = { - b"User-Agent": [self.version_string_bytes], - } + headers_dict = {b"User-Agent": [self.version_string_bytes]} with limiter: # XXX: Would be much nicer to retry only at the transaction-layer @@ -362,16 +349,14 @@ class MatrixFederationHttpClient(object): else: retries_left = MAX_SHORT_RETRIES - url_bytes = urllib.parse.urlunparse(( - b"matrix", destination_bytes, - path_bytes, None, query_bytes, b"", - )) - url_str = url_bytes.decode('ascii') + url_bytes = urllib.parse.urlunparse( + (b"matrix", destination_bytes, path_bytes, None, query_bytes, b"") + ) + url_str = url_bytes.decode("ascii") - url_to_sign_bytes = urllib.parse.urlunparse(( - b"", b"", - path_bytes, None, query_bytes, b"", - )) + url_to_sign_bytes = urllib.parse.urlunparse( + (b"", b"", path_bytes, None, query_bytes, b"") + ) while True: try: @@ -379,26 +364,27 @@ class MatrixFederationHttpClient(object): if json: headers_dict[b"Content-Type"] = [b"application/json"] auth_headers = self.build_auth_headers( - destination_bytes, method_bytes, url_to_sign_bytes, - json, + destination_bytes, method_bytes, url_to_sign_bytes, json ) data = encode_canonical_json(json) producer = QuieterFileBodyProducer( - BytesIO(data), - cooperator=self._cooperator, + BytesIO(data), cooperator=self._cooperator ) else: producer = None auth_headers = self.build_auth_headers( - destination_bytes, method_bytes, url_to_sign_bytes, + destination_bytes, method_bytes, url_to_sign_bytes ) headers_dict[b"Authorization"] = auth_headers logger.info( "{%s} [%s] Sending request: %s %s; timeout %fs", - request.txn_id, request.destination, request.method, - url_str, _sec_timeout, + request.txn_id, + request.destination, + request.method, + url_str, + _sec_timeout, ) try: @@ -430,7 +416,7 @@ class MatrixFederationHttpClient(object): request.txn_id, request.destination, response.code, - response.phrase.decode('ascii', errors='replace'), + response.phrase.decode("ascii", errors="replace"), ) if 200 <= response.code < 300: @@ -440,9 +426,7 @@ class MatrixFederationHttpClient(object): # Update transactions table? d = treq.content(response) d = timeout_deferred( - d, - timeout=_sec_timeout, - reactor=self.reactor, + d, timeout=_sec_timeout, reactor=self.reactor ) try: @@ -460,9 +444,7 @@ class MatrixFederationHttpClient(object): ) body = None - e = HttpResponseException( - response.code, response.phrase, body - ) + e = HttpResponseException(response.code, response.phrase, body) # Retry if the error is a 429 (Too Many Requests), # otherwise just raise a standard HttpResponseException @@ -521,7 +503,7 @@ class MatrixFederationHttpClient(object): defer.returnValue(response) def build_auth_headers( - self, destination, method, url_bytes, content=None, destination_is=None, + self, destination, method, url_bytes, content=None, destination_is=None ): """ Builds the Authorization headers for a federation request @@ -538,11 +520,7 @@ class MatrixFederationHttpClient(object): Returns: list[bytes]: a list of headers to be added as "Authorization:" headers """ - request = { - "method": method, - "uri": url_bytes, - "origin": self.server_name, - } + request = {"method": method, "uri": url_bytes, "origin": self.server_name} if destination is not None: request["destination"] = destination @@ -558,20 +536,28 @@ class MatrixFederationHttpClient(object): auth_headers = [] for key, sig in request["signatures"][self.server_name].items(): - auth_headers.append(( - "X-Matrix origin=%s,key=\"%s\",sig=\"%s\"" % ( - self.server_name, key, sig, - )).encode('ascii') + auth_headers.append( + ( + 'X-Matrix origin=%s,key="%s",sig="%s"' + % (self.server_name, key, sig) + ).encode("ascii") ) return auth_headers @defer.inlineCallbacks - def put_json(self, destination, path, args={}, data={}, - json_data_callback=None, - long_retries=False, timeout=None, - ignore_backoff=False, - backoff_on_404=False, - try_trailing_slash_on_400=False): + def put_json( + self, + destination, + path, + args={}, + data={}, + json_data_callback=None, + long_retries=False, + timeout=None, + ignore_backoff=False, + backoff_on_404=False, + try_trailing_slash_on_400=False, + ): """ Sends the specifed json data using PUT Args: @@ -635,14 +621,22 @@ class MatrixFederationHttpClient(object): ) body = yield _handle_json_response( - self.reactor, self.default_timeout, request, response, + self.reactor, self.default_timeout, request, response ) defer.returnValue(body) @defer.inlineCallbacks - def post_json(self, destination, path, data={}, long_retries=False, - timeout=None, ignore_backoff=False, args={}): + def post_json( + self, + destination, + path, + data={}, + long_retries=False, + timeout=None, + ignore_backoff=False, + args={}, + ): """ Sends the specifed json data using POST Args: @@ -681,11 +675,7 @@ class MatrixFederationHttpClient(object): """ request = MatrixFederationRequest( - method="POST", - destination=destination, - path=path, - query=args, - json=data, + method="POST", destination=destination, path=path, query=args, json=data ) response = yield self._send_request( @@ -701,14 +691,21 @@ class MatrixFederationHttpClient(object): _sec_timeout = self.default_timeout body = yield _handle_json_response( - self.reactor, _sec_timeout, request, response, + self.reactor, _sec_timeout, request, response ) defer.returnValue(body) @defer.inlineCallbacks - def get_json(self, destination, path, args=None, retry_on_dns_fail=True, - timeout=None, ignore_backoff=False, - try_trailing_slash_on_400=False): + def get_json( + self, + destination, + path, + args=None, + retry_on_dns_fail=True, + timeout=None, + ignore_backoff=False, + try_trailing_slash_on_400=False, + ): """ GETs some json from the given host homeserver and path Args: @@ -745,10 +742,7 @@ class MatrixFederationHttpClient(object): remote, due to e.g. DNS failures, connection timeouts etc. """ request = MatrixFederationRequest( - method="GET", - destination=destination, - path=path, - query=args, + method="GET", destination=destination, path=path, query=args ) response = yield self._send_request_with_optional_trailing_slash( @@ -761,14 +755,21 @@ class MatrixFederationHttpClient(object): ) body = yield _handle_json_response( - self.reactor, self.default_timeout, request, response, + self.reactor, self.default_timeout, request, response ) defer.returnValue(body) @defer.inlineCallbacks - def delete_json(self, destination, path, long_retries=False, - timeout=None, ignore_backoff=False, args={}): + def delete_json( + self, + destination, + path, + long_retries=False, + timeout=None, + ignore_backoff=False, + args={}, + ): """Send a DELETE request to the remote expecting some json response Args: @@ -802,10 +803,7 @@ class MatrixFederationHttpClient(object): remote, due to e.g. DNS failures, connection timeouts etc. """ request = MatrixFederationRequest( - method="DELETE", - destination=destination, - path=path, - query=args, + method="DELETE", destination=destination, path=path, query=args ) response = yield self._send_request( @@ -816,14 +814,21 @@ class MatrixFederationHttpClient(object): ) body = yield _handle_json_response( - self.reactor, self.default_timeout, request, response, + self.reactor, self.default_timeout, request, response ) defer.returnValue(body) @defer.inlineCallbacks - def get_file(self, destination, path, output_stream, args={}, - retry_on_dns_fail=True, max_size=None, - ignore_backoff=False): + def get_file( + self, + destination, + path, + output_stream, + args={}, + retry_on_dns_fail=True, + max_size=None, + ignore_backoff=False, + ): """GETs a file from a given homeserver Args: destination (str): The remote server to send the HTTP request to. @@ -848,16 +853,11 @@ class MatrixFederationHttpClient(object): remote, due to e.g. DNS failures, connection timeouts etc. """ request = MatrixFederationRequest( - method="GET", - destination=destination, - path=path, - query=args, + method="GET", destination=destination, path=path, query=args ) response = yield self._send_request( - request, - retry_on_dns_fail=retry_on_dns_fail, - ignore_backoff=ignore_backoff, + request, retry_on_dns_fail=retry_on_dns_fail, ignore_backoff=ignore_backoff ) headers = dict(response.headers.getAllRawHeaders()) @@ -879,7 +879,7 @@ class MatrixFederationHttpClient(object): request.txn_id, request.destination, response.code, - response.phrase.decode('ascii', errors='replace'), + response.phrase.decode("ascii", errors="replace"), length, ) defer.returnValue((length, headers)) @@ -896,11 +896,13 @@ class _ReadBodyToFileProtocol(protocol.Protocol): self.stream.write(data) self.length += len(data) if self.max_size is not None and self.length >= self.max_size: - self.deferred.errback(SynapseError( - 502, - "Requested file is too large > %r bytes" % (self.max_size,), - Codes.TOO_LARGE, - )) + self.deferred.errback( + SynapseError( + 502, + "Requested file is too large > %r bytes" % (self.max_size,), + Codes.TOO_LARGE, + ) + ) self.deferred = defer.Deferred() self.transport.loseConnection() @@ -920,8 +922,7 @@ def _readBodyToFile(response, stream, max_size): def _flatten_response_never_received(e): if hasattr(e, "reasons"): reasons = ", ".join( - _flatten_response_never_received(f.value) - for f in e.reasons + _flatten_response_never_received(f.value) for f in e.reasons ) return "%s:[%s]" % (type(e).__name__, reasons) @@ -943,16 +944,15 @@ def check_content_type_is_json(headers): """ c_type = headers.getRawHeaders(b"Content-Type") if c_type is None: - raise RequestSendFailed(RuntimeError( - "No Content-Type header" - ), can_retry=False) + raise RequestSendFailed(RuntimeError("No Content-Type header"), can_retry=False) - c_type = c_type[0].decode('ascii') # only the first header + c_type = c_type[0].decode("ascii") # only the first header val, options = cgi.parse_header(c_type) if val != "application/json": - raise RequestSendFailed(RuntimeError( - "Content-Type not application/json: was '%s'" % c_type - ), can_retry=False) + raise RequestSendFailed( + RuntimeError("Content-Type not application/json: was '%s'" % c_type), + can_retry=False, + ) def encode_query_args(args): @@ -967,4 +967,4 @@ def encode_query_args(args): query_bytes = urllib.parse.urlencode(encoded_args, True) - return query_bytes.encode('utf8') + return query_bytes.encode("utf8") |