diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 663ea72a7a..5ef8bb60a3 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -54,10 +54,12 @@ from synapse.util.metrics import Measure
logger = logging.getLogger(__name__)
-outgoing_requests_counter = Counter("synapse_http_matrixfederationclient_requests",
- "", ["method"])
-incoming_responses_counter = Counter("synapse_http_matrixfederationclient_responses",
- "", ["method", "code"])
+outgoing_requests_counter = Counter(
+ "synapse_http_matrixfederationclient_requests", "", ["method"]
+)
+incoming_responses_counter = Counter(
+ "synapse_http_matrixfederationclient_responses", "", ["method", "code"]
+)
MAX_LONG_RETRIES = 10
@@ -137,11 +139,7 @@ def _handle_json_response(reactor, timeout_sec, request, response):
check_content_type_is_json(response.headers)
d = treq.json_content(response)
- d = timeout_deferred(
- d,
- timeout=timeout_sec,
- reactor=reactor,
- )
+ d = timeout_deferred(d, timeout=timeout_sec, reactor=reactor)
body = yield make_deferred_yieldable(d)
except Exception as e:
@@ -157,7 +155,7 @@ def _handle_json_response(reactor, timeout_sec, request, response):
request.txn_id,
request.destination,
response.code,
- response.phrase.decode('ascii', errors='replace'),
+ response.phrase.decode("ascii", errors="replace"),
)
defer.returnValue(body)
@@ -181,7 +179,7 @@ class MatrixFederationHttpClient(object):
# We need to use a DNS resolver which filters out blacklisted IP
# addresses, to prevent DNS rebinding.
nameResolver = IPBlacklistingResolver(
- real_reactor, None, hs.config.federation_ip_range_blacklist,
+ real_reactor, None, hs.config.federation_ip_range_blacklist
)
@implementer(IReactorPluggableNameResolver)
@@ -194,21 +192,19 @@ class MatrixFederationHttpClient(object):
self.reactor = Reactor()
- self.agent = MatrixFederationAgent(
- self.reactor,
- tls_client_options_factory,
- )
+ self.agent = MatrixFederationAgent(self.reactor, tls_client_options_factory)
# Use a BlacklistingAgentWrapper to prevent circumventing the IP
# blacklist via IP literals in server names
self.agent = BlacklistingAgentWrapper(
- self.agent, self.reactor,
+ self.agent,
+ self.reactor,
ip_blacklist=hs.config.federation_ip_range_blacklist,
)
self.clock = hs.get_clock()
self._store = hs.get_datastore()
- self.version_string_bytes = hs.version_string.encode('ascii')
+ self.version_string_bytes = hs.version_string.encode("ascii")
self.default_timeout = 60
def schedule(x):
@@ -218,10 +214,7 @@ class MatrixFederationHttpClient(object):
@defer.inlineCallbacks
def _send_request_with_optional_trailing_slash(
- self,
- request,
- try_trailing_slash_on_400=False,
- **send_request_args
+ self, request, try_trailing_slash_on_400=False, **send_request_args
):
"""Wrapper for _send_request which can optionally retry the request
upon receiving a combination of a 400 HTTP response code and a
@@ -244,9 +237,7 @@ class MatrixFederationHttpClient(object):
Deferred[Dict]: Parsed JSON response body.
"""
try:
- response = yield self._send_request(
- request, **send_request_args
- )
+ response = yield self._send_request(request, **send_request_args)
except HttpResponseException as e:
# Received an HTTP error > 300. Check if it meets the requirements
# to retry with a trailing slash
@@ -262,9 +253,7 @@ class MatrixFederationHttpClient(object):
logger.info("Retrying request with trailing slash")
request.path += "/"
- response = yield self._send_request(
- request, **send_request_args
- )
+ response = yield self._send_request(request, **send_request_args)
defer.returnValue(response)
@@ -329,8 +318,8 @@ class MatrixFederationHttpClient(object):
_sec_timeout = self.default_timeout
if (
- self.hs.config.federation_domain_whitelist is not None and
- request.destination not in self.hs.config.federation_domain_whitelist
+ self.hs.config.federation_domain_whitelist is not None
+ and request.destination not in self.hs.config.federation_domain_whitelist
):
raise FederationDeniedError(request.destination)
@@ -350,9 +339,7 @@ class MatrixFederationHttpClient(object):
else:
query_bytes = b""
- headers_dict = {
- b"User-Agent": [self.version_string_bytes],
- }
+ headers_dict = {b"User-Agent": [self.version_string_bytes]}
with limiter:
# XXX: Would be much nicer to retry only at the transaction-layer
@@ -362,16 +349,14 @@ class MatrixFederationHttpClient(object):
else:
retries_left = MAX_SHORT_RETRIES
- url_bytes = urllib.parse.urlunparse((
- b"matrix", destination_bytes,
- path_bytes, None, query_bytes, b"",
- ))
- url_str = url_bytes.decode('ascii')
+ url_bytes = urllib.parse.urlunparse(
+ (b"matrix", destination_bytes, path_bytes, None, query_bytes, b"")
+ )
+ url_str = url_bytes.decode("ascii")
- url_to_sign_bytes = urllib.parse.urlunparse((
- b"", b"",
- path_bytes, None, query_bytes, b"",
- ))
+ url_to_sign_bytes = urllib.parse.urlunparse(
+ (b"", b"", path_bytes, None, query_bytes, b"")
+ )
while True:
try:
@@ -379,26 +364,27 @@ class MatrixFederationHttpClient(object):
if json:
headers_dict[b"Content-Type"] = [b"application/json"]
auth_headers = self.build_auth_headers(
- destination_bytes, method_bytes, url_to_sign_bytes,
- json,
+ destination_bytes, method_bytes, url_to_sign_bytes, json
)
data = encode_canonical_json(json)
producer = QuieterFileBodyProducer(
- BytesIO(data),
- cooperator=self._cooperator,
+ BytesIO(data), cooperator=self._cooperator
)
else:
producer = None
auth_headers = self.build_auth_headers(
- destination_bytes, method_bytes, url_to_sign_bytes,
+ destination_bytes, method_bytes, url_to_sign_bytes
)
headers_dict[b"Authorization"] = auth_headers
logger.info(
"{%s} [%s] Sending request: %s %s; timeout %fs",
- request.txn_id, request.destination, request.method,
- url_str, _sec_timeout,
+ request.txn_id,
+ request.destination,
+ request.method,
+ url_str,
+ _sec_timeout,
)
try:
@@ -430,7 +416,7 @@ class MatrixFederationHttpClient(object):
request.txn_id,
request.destination,
response.code,
- response.phrase.decode('ascii', errors='replace'),
+ response.phrase.decode("ascii", errors="replace"),
)
if 200 <= response.code < 300:
@@ -440,9 +426,7 @@ class MatrixFederationHttpClient(object):
# Update transactions table?
d = treq.content(response)
d = timeout_deferred(
- d,
- timeout=_sec_timeout,
- reactor=self.reactor,
+ d, timeout=_sec_timeout, reactor=self.reactor
)
try:
@@ -460,9 +444,7 @@ class MatrixFederationHttpClient(object):
)
body = None
- e = HttpResponseException(
- response.code, response.phrase, body
- )
+ e = HttpResponseException(response.code, response.phrase, body)
# Retry if the error is a 429 (Too Many Requests),
# otherwise just raise a standard HttpResponseException
@@ -521,7 +503,7 @@ class MatrixFederationHttpClient(object):
defer.returnValue(response)
def build_auth_headers(
- self, destination, method, url_bytes, content=None, destination_is=None,
+ self, destination, method, url_bytes, content=None, destination_is=None
):
"""
Builds the Authorization headers for a federation request
@@ -538,11 +520,7 @@ class MatrixFederationHttpClient(object):
Returns:
list[bytes]: a list of headers to be added as "Authorization:" headers
"""
- request = {
- "method": method,
- "uri": url_bytes,
- "origin": self.server_name,
- }
+ request = {"method": method, "uri": url_bytes, "origin": self.server_name}
if destination is not None:
request["destination"] = destination
@@ -558,20 +536,28 @@ class MatrixFederationHttpClient(object):
auth_headers = []
for key, sig in request["signatures"][self.server_name].items():
- auth_headers.append((
- "X-Matrix origin=%s,key=\"%s\",sig=\"%s\"" % (
- self.server_name, key, sig,
- )).encode('ascii')
+ auth_headers.append(
+ (
+ 'X-Matrix origin=%s,key="%s",sig="%s"'
+ % (self.server_name, key, sig)
+ ).encode("ascii")
)
return auth_headers
@defer.inlineCallbacks
- def put_json(self, destination, path, args={}, data={},
- json_data_callback=None,
- long_retries=False, timeout=None,
- ignore_backoff=False,
- backoff_on_404=False,
- try_trailing_slash_on_400=False):
+ def put_json(
+ self,
+ destination,
+ path,
+ args={},
+ data={},
+ json_data_callback=None,
+ long_retries=False,
+ timeout=None,
+ ignore_backoff=False,
+ backoff_on_404=False,
+ try_trailing_slash_on_400=False,
+ ):
""" Sends the specifed json data using PUT
Args:
@@ -635,14 +621,22 @@ class MatrixFederationHttpClient(object):
)
body = yield _handle_json_response(
- self.reactor, self.default_timeout, request, response,
+ self.reactor, self.default_timeout, request, response
)
defer.returnValue(body)
@defer.inlineCallbacks
- def post_json(self, destination, path, data={}, long_retries=False,
- timeout=None, ignore_backoff=False, args={}):
+ def post_json(
+ self,
+ destination,
+ path,
+ data={},
+ long_retries=False,
+ timeout=None,
+ ignore_backoff=False,
+ args={},
+ ):
""" Sends the specifed json data using POST
Args:
@@ -681,11 +675,7 @@ class MatrixFederationHttpClient(object):
"""
request = MatrixFederationRequest(
- method="POST",
- destination=destination,
- path=path,
- query=args,
- json=data,
+ method="POST", destination=destination, path=path, query=args, json=data
)
response = yield self._send_request(
@@ -701,14 +691,21 @@ class MatrixFederationHttpClient(object):
_sec_timeout = self.default_timeout
body = yield _handle_json_response(
- self.reactor, _sec_timeout, request, response,
+ self.reactor, _sec_timeout, request, response
)
defer.returnValue(body)
@defer.inlineCallbacks
- def get_json(self, destination, path, args=None, retry_on_dns_fail=True,
- timeout=None, ignore_backoff=False,
- try_trailing_slash_on_400=False):
+ def get_json(
+ self,
+ destination,
+ path,
+ args=None,
+ retry_on_dns_fail=True,
+ timeout=None,
+ ignore_backoff=False,
+ try_trailing_slash_on_400=False,
+ ):
""" GETs some json from the given host homeserver and path
Args:
@@ -745,10 +742,7 @@ class MatrixFederationHttpClient(object):
remote, due to e.g. DNS failures, connection timeouts etc.
"""
request = MatrixFederationRequest(
- method="GET",
- destination=destination,
- path=path,
- query=args,
+ method="GET", destination=destination, path=path, query=args
)
response = yield self._send_request_with_optional_trailing_slash(
@@ -761,14 +755,21 @@ class MatrixFederationHttpClient(object):
)
body = yield _handle_json_response(
- self.reactor, self.default_timeout, request, response,
+ self.reactor, self.default_timeout, request, response
)
defer.returnValue(body)
@defer.inlineCallbacks
- def delete_json(self, destination, path, long_retries=False,
- timeout=None, ignore_backoff=False, args={}):
+ def delete_json(
+ self,
+ destination,
+ path,
+ long_retries=False,
+ timeout=None,
+ ignore_backoff=False,
+ args={},
+ ):
"""Send a DELETE request to the remote expecting some json response
Args:
@@ -802,10 +803,7 @@ class MatrixFederationHttpClient(object):
remote, due to e.g. DNS failures, connection timeouts etc.
"""
request = MatrixFederationRequest(
- method="DELETE",
- destination=destination,
- path=path,
- query=args,
+ method="DELETE", destination=destination, path=path, query=args
)
response = yield self._send_request(
@@ -816,14 +814,21 @@ class MatrixFederationHttpClient(object):
)
body = yield _handle_json_response(
- self.reactor, self.default_timeout, request, response,
+ self.reactor, self.default_timeout, request, response
)
defer.returnValue(body)
@defer.inlineCallbacks
- def get_file(self, destination, path, output_stream, args={},
- retry_on_dns_fail=True, max_size=None,
- ignore_backoff=False):
+ def get_file(
+ self,
+ destination,
+ path,
+ output_stream,
+ args={},
+ retry_on_dns_fail=True,
+ max_size=None,
+ ignore_backoff=False,
+ ):
"""GETs a file from a given homeserver
Args:
destination (str): The remote server to send the HTTP request to.
@@ -848,16 +853,11 @@ class MatrixFederationHttpClient(object):
remote, due to e.g. DNS failures, connection timeouts etc.
"""
request = MatrixFederationRequest(
- method="GET",
- destination=destination,
- path=path,
- query=args,
+ method="GET", destination=destination, path=path, query=args
)
response = yield self._send_request(
- request,
- retry_on_dns_fail=retry_on_dns_fail,
- ignore_backoff=ignore_backoff,
+ request, retry_on_dns_fail=retry_on_dns_fail, ignore_backoff=ignore_backoff
)
headers = dict(response.headers.getAllRawHeaders())
@@ -879,7 +879,7 @@ class MatrixFederationHttpClient(object):
request.txn_id,
request.destination,
response.code,
- response.phrase.decode('ascii', errors='replace'),
+ response.phrase.decode("ascii", errors="replace"),
length,
)
defer.returnValue((length, headers))
@@ -896,11 +896,13 @@ class _ReadBodyToFileProtocol(protocol.Protocol):
self.stream.write(data)
self.length += len(data)
if self.max_size is not None and self.length >= self.max_size:
- self.deferred.errback(SynapseError(
- 502,
- "Requested file is too large > %r bytes" % (self.max_size,),
- Codes.TOO_LARGE,
- ))
+ self.deferred.errback(
+ SynapseError(
+ 502,
+ "Requested file is too large > %r bytes" % (self.max_size,),
+ Codes.TOO_LARGE,
+ )
+ )
self.deferred = defer.Deferred()
self.transport.loseConnection()
@@ -920,8 +922,7 @@ def _readBodyToFile(response, stream, max_size):
def _flatten_response_never_received(e):
if hasattr(e, "reasons"):
reasons = ", ".join(
- _flatten_response_never_received(f.value)
- for f in e.reasons
+ _flatten_response_never_received(f.value) for f in e.reasons
)
return "%s:[%s]" % (type(e).__name__, reasons)
@@ -943,16 +944,15 @@ def check_content_type_is_json(headers):
"""
c_type = headers.getRawHeaders(b"Content-Type")
if c_type is None:
- raise RequestSendFailed(RuntimeError(
- "No Content-Type header"
- ), can_retry=False)
+ raise RequestSendFailed(RuntimeError("No Content-Type header"), can_retry=False)
- c_type = c_type[0].decode('ascii') # only the first header
+ c_type = c_type[0].decode("ascii") # only the first header
val, options = cgi.parse_header(c_type)
if val != "application/json":
- raise RequestSendFailed(RuntimeError(
- "Content-Type not application/json: was '%s'" % c_type
- ), can_retry=False)
+ raise RequestSendFailed(
+ RuntimeError("Content-Type not application/json: was '%s'" % c_type),
+ can_retry=False,
+ )
def encode_query_args(args):
@@ -967,4 +967,4 @@ def encode_query_args(args):
query_bytes = urllib.parse.urlencode(encoded_args, True)
- return query_bytes.encode('utf8')
+ return query_bytes.encode("utf8")
|