summary refs log tree commit diff
path: root/synapse/http/matrixfederationclient.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2019-06-20 11:59:14 +0100
committerErik Johnston <erik@matrix.org>2019-06-20 11:59:14 +0100
commit45f28a9d2fc0466dcf2a05b0063b7caa3b7e12c3 (patch)
tree07bb21377c6611db89f64f948a2e27645662ff0e /synapse/http/matrixfederationclient.py
parentAdd descriptions and remove redundant set(..) (diff)
parentRun Black. (#5482) (diff)
downloadsynapse-45f28a9d2fc0466dcf2a05b0063b7caa3b7e12c3.tar.xz
Merge branch 'develop' of github.com:matrix-org/synapse into erikj/histogram_extremities
Diffstat (limited to 'synapse/http/matrixfederationclient.py')
-rw-r--r--synapse/http/matrixfederationclient.py242
1 files changed, 121 insertions, 121 deletions
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 663ea72a7a..5ef8bb60a3 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -54,10 +54,12 @@ from synapse.util.metrics import Measure
 
 logger = logging.getLogger(__name__)
 
-outgoing_requests_counter = Counter("synapse_http_matrixfederationclient_requests",
-                                    "", ["method"])
-incoming_responses_counter = Counter("synapse_http_matrixfederationclient_responses",
-                                     "", ["method", "code"])
+outgoing_requests_counter = Counter(
+    "synapse_http_matrixfederationclient_requests", "", ["method"]
+)
+incoming_responses_counter = Counter(
+    "synapse_http_matrixfederationclient_responses", "", ["method", "code"]
+)
 
 
 MAX_LONG_RETRIES = 10
@@ -137,11 +139,7 @@ def _handle_json_response(reactor, timeout_sec, request, response):
         check_content_type_is_json(response.headers)
 
         d = treq.json_content(response)
-        d = timeout_deferred(
-            d,
-            timeout=timeout_sec,
-            reactor=reactor,
-        )
+        d = timeout_deferred(d, timeout=timeout_sec, reactor=reactor)
 
         body = yield make_deferred_yieldable(d)
     except Exception as e:
@@ -157,7 +155,7 @@ def _handle_json_response(reactor, timeout_sec, request, response):
         request.txn_id,
         request.destination,
         response.code,
-        response.phrase.decode('ascii', errors='replace'),
+        response.phrase.decode("ascii", errors="replace"),
     )
     defer.returnValue(body)
 
@@ -181,7 +179,7 @@ class MatrixFederationHttpClient(object):
         # We need to use a DNS resolver which filters out blacklisted IP
         # addresses, to prevent DNS rebinding.
         nameResolver = IPBlacklistingResolver(
-            real_reactor, None, hs.config.federation_ip_range_blacklist,
+            real_reactor, None, hs.config.federation_ip_range_blacklist
         )
 
         @implementer(IReactorPluggableNameResolver)
@@ -194,21 +192,19 @@ class MatrixFederationHttpClient(object):
 
         self.reactor = Reactor()
 
-        self.agent = MatrixFederationAgent(
-            self.reactor,
-            tls_client_options_factory,
-        )
+        self.agent = MatrixFederationAgent(self.reactor, tls_client_options_factory)
 
         # Use a BlacklistingAgentWrapper to prevent circumventing the IP
         # blacklist via IP literals in server names
         self.agent = BlacklistingAgentWrapper(
-            self.agent, self.reactor,
+            self.agent,
+            self.reactor,
             ip_blacklist=hs.config.federation_ip_range_blacklist,
         )
 
         self.clock = hs.get_clock()
         self._store = hs.get_datastore()
-        self.version_string_bytes = hs.version_string.encode('ascii')
+        self.version_string_bytes = hs.version_string.encode("ascii")
         self.default_timeout = 60
 
         def schedule(x):
@@ -218,10 +214,7 @@ class MatrixFederationHttpClient(object):
 
     @defer.inlineCallbacks
     def _send_request_with_optional_trailing_slash(
-        self,
-        request,
-        try_trailing_slash_on_400=False,
-        **send_request_args
+        self, request, try_trailing_slash_on_400=False, **send_request_args
     ):
         """Wrapper for _send_request which can optionally retry the request
         upon receiving a combination of a 400 HTTP response code and a
@@ -244,9 +237,7 @@ class MatrixFederationHttpClient(object):
             Deferred[Dict]: Parsed JSON response body.
         """
         try:
-            response = yield self._send_request(
-                request, **send_request_args
-            )
+            response = yield self._send_request(request, **send_request_args)
         except HttpResponseException as e:
             # Received an HTTP error > 300. Check if it meets the requirements
             # to retry with a trailing slash
@@ -262,9 +253,7 @@ class MatrixFederationHttpClient(object):
             logger.info("Retrying request with trailing slash")
             request.path += "/"
 
-            response = yield self._send_request(
-                request, **send_request_args
-            )
+            response = yield self._send_request(request, **send_request_args)
 
         defer.returnValue(response)
 
@@ -329,8 +318,8 @@ class MatrixFederationHttpClient(object):
             _sec_timeout = self.default_timeout
 
         if (
-            self.hs.config.federation_domain_whitelist is not None and
-            request.destination not in self.hs.config.federation_domain_whitelist
+            self.hs.config.federation_domain_whitelist is not None
+            and request.destination not in self.hs.config.federation_domain_whitelist
         ):
             raise FederationDeniedError(request.destination)
 
@@ -350,9 +339,7 @@ class MatrixFederationHttpClient(object):
         else:
             query_bytes = b""
 
-        headers_dict = {
-            b"User-Agent": [self.version_string_bytes],
-        }
+        headers_dict = {b"User-Agent": [self.version_string_bytes]}
 
         with limiter:
             # XXX: Would be much nicer to retry only at the transaction-layer
@@ -362,16 +349,14 @@ class MatrixFederationHttpClient(object):
             else:
                 retries_left = MAX_SHORT_RETRIES
 
-            url_bytes = urllib.parse.urlunparse((
-                b"matrix", destination_bytes,
-                path_bytes, None, query_bytes, b"",
-            ))
-            url_str = url_bytes.decode('ascii')
+            url_bytes = urllib.parse.urlunparse(
+                (b"matrix", destination_bytes, path_bytes, None, query_bytes, b"")
+            )
+            url_str = url_bytes.decode("ascii")
 
-            url_to_sign_bytes = urllib.parse.urlunparse((
-                b"", b"",
-                path_bytes, None, query_bytes, b"",
-            ))
+            url_to_sign_bytes = urllib.parse.urlunparse(
+                (b"", b"", path_bytes, None, query_bytes, b"")
+            )
 
             while True:
                 try:
@@ -379,26 +364,27 @@ class MatrixFederationHttpClient(object):
                     if json:
                         headers_dict[b"Content-Type"] = [b"application/json"]
                         auth_headers = self.build_auth_headers(
-                            destination_bytes, method_bytes, url_to_sign_bytes,
-                            json,
+                            destination_bytes, method_bytes, url_to_sign_bytes, json
                         )
                         data = encode_canonical_json(json)
                         producer = QuieterFileBodyProducer(
-                            BytesIO(data),
-                            cooperator=self._cooperator,
+                            BytesIO(data), cooperator=self._cooperator
                         )
                     else:
                         producer = None
                         auth_headers = self.build_auth_headers(
-                            destination_bytes, method_bytes, url_to_sign_bytes,
+                            destination_bytes, method_bytes, url_to_sign_bytes
                         )
 
                     headers_dict[b"Authorization"] = auth_headers
 
                     logger.info(
                         "{%s} [%s] Sending request: %s %s; timeout %fs",
-                        request.txn_id, request.destination, request.method,
-                        url_str, _sec_timeout,
+                        request.txn_id,
+                        request.destination,
+                        request.method,
+                        url_str,
+                        _sec_timeout,
                     )
 
                     try:
@@ -430,7 +416,7 @@ class MatrixFederationHttpClient(object):
                         request.txn_id,
                         request.destination,
                         response.code,
-                        response.phrase.decode('ascii', errors='replace'),
+                        response.phrase.decode("ascii", errors="replace"),
                     )
 
                     if 200 <= response.code < 300:
@@ -440,9 +426,7 @@ class MatrixFederationHttpClient(object):
                         # Update transactions table?
                         d = treq.content(response)
                         d = timeout_deferred(
-                            d,
-                            timeout=_sec_timeout,
-                            reactor=self.reactor,
+                            d, timeout=_sec_timeout, reactor=self.reactor
                         )
 
                         try:
@@ -460,9 +444,7 @@ class MatrixFederationHttpClient(object):
                             )
                             body = None
 
-                        e = HttpResponseException(
-                            response.code, response.phrase, body
-                        )
+                        e = HttpResponseException(response.code, response.phrase, body)
 
                         # Retry if the error is a 429 (Too Many Requests),
                         # otherwise just raise a standard HttpResponseException
@@ -521,7 +503,7 @@ class MatrixFederationHttpClient(object):
             defer.returnValue(response)
 
     def build_auth_headers(
-        self, destination, method, url_bytes, content=None, destination_is=None,
+        self, destination, method, url_bytes, content=None, destination_is=None
     ):
         """
         Builds the Authorization headers for a federation request
@@ -538,11 +520,7 @@ class MatrixFederationHttpClient(object):
         Returns:
             list[bytes]: a list of headers to be added as "Authorization:" headers
         """
-        request = {
-            "method": method,
-            "uri": url_bytes,
-            "origin": self.server_name,
-        }
+        request = {"method": method, "uri": url_bytes, "origin": self.server_name}
 
         if destination is not None:
             request["destination"] = destination
@@ -558,20 +536,28 @@ class MatrixFederationHttpClient(object):
         auth_headers = []
 
         for key, sig in request["signatures"][self.server_name].items():
-            auth_headers.append((
-                "X-Matrix origin=%s,key=\"%s\",sig=\"%s\"" % (
-                    self.server_name, key, sig,
-                )).encode('ascii')
+            auth_headers.append(
+                (
+                    'X-Matrix origin=%s,key="%s",sig="%s"'
+                    % (self.server_name, key, sig)
+                ).encode("ascii")
             )
         return auth_headers
 
     @defer.inlineCallbacks
-    def put_json(self, destination, path, args={}, data={},
-                 json_data_callback=None,
-                 long_retries=False, timeout=None,
-                 ignore_backoff=False,
-                 backoff_on_404=False,
-                 try_trailing_slash_on_400=False):
+    def put_json(
+        self,
+        destination,
+        path,
+        args={},
+        data={},
+        json_data_callback=None,
+        long_retries=False,
+        timeout=None,
+        ignore_backoff=False,
+        backoff_on_404=False,
+        try_trailing_slash_on_400=False,
+    ):
         """ Sends the specifed json data using PUT
 
         Args:
@@ -635,14 +621,22 @@ class MatrixFederationHttpClient(object):
         )
 
         body = yield _handle_json_response(
-            self.reactor, self.default_timeout, request, response,
+            self.reactor, self.default_timeout, request, response
         )
 
         defer.returnValue(body)
 
     @defer.inlineCallbacks
-    def post_json(self, destination, path, data={}, long_retries=False,
-                  timeout=None, ignore_backoff=False, args={}):
+    def post_json(
+        self,
+        destination,
+        path,
+        data={},
+        long_retries=False,
+        timeout=None,
+        ignore_backoff=False,
+        args={},
+    ):
         """ Sends the specifed json data using POST
 
         Args:
@@ -681,11 +675,7 @@ class MatrixFederationHttpClient(object):
         """
 
         request = MatrixFederationRequest(
-            method="POST",
-            destination=destination,
-            path=path,
-            query=args,
-            json=data,
+            method="POST", destination=destination, path=path, query=args, json=data
         )
 
         response = yield self._send_request(
@@ -701,14 +691,21 @@ class MatrixFederationHttpClient(object):
             _sec_timeout = self.default_timeout
 
         body = yield _handle_json_response(
-            self.reactor, _sec_timeout, request, response,
+            self.reactor, _sec_timeout, request, response
         )
         defer.returnValue(body)
 
     @defer.inlineCallbacks
-    def get_json(self, destination, path, args=None, retry_on_dns_fail=True,
-                 timeout=None, ignore_backoff=False,
-                 try_trailing_slash_on_400=False):
+    def get_json(
+        self,
+        destination,
+        path,
+        args=None,
+        retry_on_dns_fail=True,
+        timeout=None,
+        ignore_backoff=False,
+        try_trailing_slash_on_400=False,
+    ):
         """ GETs some json from the given host homeserver and path
 
         Args:
@@ -745,10 +742,7 @@ class MatrixFederationHttpClient(object):
                 remote, due to e.g. DNS failures, connection timeouts etc.
         """
         request = MatrixFederationRequest(
-            method="GET",
-            destination=destination,
-            path=path,
-            query=args,
+            method="GET", destination=destination, path=path, query=args
         )
 
         response = yield self._send_request_with_optional_trailing_slash(
@@ -761,14 +755,21 @@ class MatrixFederationHttpClient(object):
         )
 
         body = yield _handle_json_response(
-            self.reactor, self.default_timeout, request, response,
+            self.reactor, self.default_timeout, request, response
         )
 
         defer.returnValue(body)
 
     @defer.inlineCallbacks
-    def delete_json(self, destination, path, long_retries=False,
-                    timeout=None, ignore_backoff=False, args={}):
+    def delete_json(
+        self,
+        destination,
+        path,
+        long_retries=False,
+        timeout=None,
+        ignore_backoff=False,
+        args={},
+    ):
         """Send a DELETE request to the remote expecting some json response
 
         Args:
@@ -802,10 +803,7 @@ class MatrixFederationHttpClient(object):
                 remote, due to e.g. DNS failures, connection timeouts etc.
         """
         request = MatrixFederationRequest(
-            method="DELETE",
-            destination=destination,
-            path=path,
-            query=args,
+            method="DELETE", destination=destination, path=path, query=args
         )
 
         response = yield self._send_request(
@@ -816,14 +814,21 @@ class MatrixFederationHttpClient(object):
         )
 
         body = yield _handle_json_response(
-            self.reactor, self.default_timeout, request, response,
+            self.reactor, self.default_timeout, request, response
         )
         defer.returnValue(body)
 
     @defer.inlineCallbacks
-    def get_file(self, destination, path, output_stream, args={},
-                 retry_on_dns_fail=True, max_size=None,
-                 ignore_backoff=False):
+    def get_file(
+        self,
+        destination,
+        path,
+        output_stream,
+        args={},
+        retry_on_dns_fail=True,
+        max_size=None,
+        ignore_backoff=False,
+    ):
         """GETs a file from a given homeserver
         Args:
             destination (str): The remote server to send the HTTP request to.
@@ -848,16 +853,11 @@ class MatrixFederationHttpClient(object):
                 remote, due to e.g. DNS failures, connection timeouts etc.
         """
         request = MatrixFederationRequest(
-            method="GET",
-            destination=destination,
-            path=path,
-            query=args,
+            method="GET", destination=destination, path=path, query=args
         )
 
         response = yield self._send_request(
-            request,
-            retry_on_dns_fail=retry_on_dns_fail,
-            ignore_backoff=ignore_backoff,
+            request, retry_on_dns_fail=retry_on_dns_fail, ignore_backoff=ignore_backoff
         )
 
         headers = dict(response.headers.getAllRawHeaders())
@@ -879,7 +879,7 @@ class MatrixFederationHttpClient(object):
             request.txn_id,
             request.destination,
             response.code,
-            response.phrase.decode('ascii', errors='replace'),
+            response.phrase.decode("ascii", errors="replace"),
             length,
         )
         defer.returnValue((length, headers))
@@ -896,11 +896,13 @@ class _ReadBodyToFileProtocol(protocol.Protocol):
         self.stream.write(data)
         self.length += len(data)
         if self.max_size is not None and self.length >= self.max_size:
-            self.deferred.errback(SynapseError(
-                502,
-                "Requested file is too large > %r bytes" % (self.max_size,),
-                Codes.TOO_LARGE,
-            ))
+            self.deferred.errback(
+                SynapseError(
+                    502,
+                    "Requested file is too large > %r bytes" % (self.max_size,),
+                    Codes.TOO_LARGE,
+                )
+            )
             self.deferred = defer.Deferred()
             self.transport.loseConnection()
 
@@ -920,8 +922,7 @@ def _readBodyToFile(response, stream, max_size):
 def _flatten_response_never_received(e):
     if hasattr(e, "reasons"):
         reasons = ", ".join(
-            _flatten_response_never_received(f.value)
-            for f in e.reasons
+            _flatten_response_never_received(f.value) for f in e.reasons
         )
 
         return "%s:[%s]" % (type(e).__name__, reasons)
@@ -943,16 +944,15 @@ def check_content_type_is_json(headers):
     """
     c_type = headers.getRawHeaders(b"Content-Type")
     if c_type is None:
-        raise RequestSendFailed(RuntimeError(
-            "No Content-Type header"
-        ), can_retry=False)
+        raise RequestSendFailed(RuntimeError("No Content-Type header"), can_retry=False)
 
-    c_type = c_type[0].decode('ascii')  # only the first header
+    c_type = c_type[0].decode("ascii")  # only the first header
     val, options = cgi.parse_header(c_type)
     if val != "application/json":
-        raise RequestSendFailed(RuntimeError(
-            "Content-Type not application/json: was '%s'" % c_type
-        ), can_retry=False)
+        raise RequestSendFailed(
+            RuntimeError("Content-Type not application/json: was '%s'" % c_type),
+            can_retry=False,
+        )
 
 
 def encode_query_args(args):
@@ -967,4 +967,4 @@ def encode_query_args(args):
 
     query_bytes = urllib.parse.urlencode(encoded_args, True)
 
-    return query_bytes.encode('utf8')
+    return query_bytes.encode("utf8")