summary refs log tree commit diff
path: root/synapse/http/matrixfederationclient.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/http/matrixfederationclient.py')
-rw-r--r--synapse/http/matrixfederationclient.py333
1 files changed, 179 insertions, 154 deletions
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 24b6110c20..1682c9af13 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -19,7 +19,7 @@ import random
 import sys
 from io import BytesIO
 
-from six import PY3, string_types
+from six import PY3, raise_from, string_types
 from six.moves import urllib
 
 import attr
@@ -32,7 +32,6 @@ from twisted.internet import defer, protocol
 from twisted.internet.error import DNSLookupError
 from twisted.internet.task import _EPSILON, Cooperator
 from twisted.web._newclient import ResponseDone
-from twisted.web.client import Agent, FileBodyProducer, HTTPConnectionPool
 from twisted.web.http_headers import Headers
 
 import synapse.metrics
@@ -41,9 +40,11 @@ from synapse.api.errors import (
     Codes,
     FederationDeniedError,
     HttpResponseException,
+    RequestSendFailed,
     SynapseError,
 )
-from synapse.http.endpoint import matrix_federation_endpoint
+from synapse.http import QuieterFileBodyProducer
+from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent
 from synapse.util.async_helpers import timeout_deferred
 from synapse.util.logcontext import make_deferred_yieldable
 from synapse.util.metrics import Measure
@@ -65,20 +66,6 @@ else:
     MAXINT = sys.maxint
 
 
-class MatrixFederationEndpointFactory(object):
-    def __init__(self, hs):
-        self.reactor = hs.get_reactor()
-        self.tls_client_options_factory = hs.tls_client_options_factory
-
-    def endpointForURI(self, uri):
-        destination = uri.netloc.decode('ascii')
-
-        return matrix_federation_endpoint(
-            self.reactor, destination, timeout=10,
-            tls_client_options_factory=self.tls_client_options_factory
-        )
-
-
 _next_id = 1
 
 
@@ -181,17 +168,15 @@ class MatrixFederationHttpClient(object):
             requests.
     """
 
-    def __init__(self, hs):
+    def __init__(self, hs, tls_client_options_factory):
         self.hs = hs
         self.signing_key = hs.config.signing_key[0]
         self.server_name = hs.hostname
         reactor = hs.get_reactor()
-        pool = HTTPConnectionPool(reactor)
-        pool.retryAutomatically = False
-        pool.maxPersistentPerHost = 5
-        pool.cachedConnectionTimeout = 2 * 60
-        self.agent = Agent.usingEndpointFactory(
-            reactor, MatrixFederationEndpointFactory(hs), pool=pool
+
+        self.agent = MatrixFederationAgent(
+            hs.get_reactor(),
+            tls_client_options_factory,
         )
         self.clock = hs.get_clock()
         self._store = hs.get_datastore()
@@ -228,19 +213,18 @@ class MatrixFederationHttpClient(object):
             backoff_on_404 (bool): Back off if we get a 404
 
         Returns:
-            Deferred: resolves with the http response object on success.
-
-            Fails with ``HttpResponseException``: if we get an HTTP response
-                code >= 300.
-
-            Fails with ``NotRetryingDestination`` if we are not yet ready
-                to retry this server.
-
-            Fails with ``FederationDeniedError`` if this destination
-                is not on our federation whitelist
-
-            (May also fail with plenty of other Exceptions for things like DNS
-                failures, connection failures, SSL failures.)
+            Deferred[twisted.web.client.Response]: resolves with the HTTP
+            response object on success.
+
+        Raises:
+            HttpResponseException: If we get an HTTP response code >= 300
+                (except 429).
+            NotRetryingDestination: If we are not yet ready to retry this
+                server.
+            FederationDeniedError: If this destination  is not on our
+                federation whitelist
+            RequestSendFailed: If there were problems connecting to the
+                remote, due to e.g. DNS failures, connection timeouts etc.
         """
         if timeout:
             _sec_timeout = timeout / 1000
@@ -271,7 +255,6 @@ class MatrixFederationHttpClient(object):
 
         headers_dict = {
             b"User-Agent": [self.version_string_bytes],
-            b"Host": [destination_bytes],
         }
 
         with limiter:
@@ -298,60 +281,111 @@ class MatrixFederationHttpClient(object):
                     json = request.get_json()
                     if json:
                         headers_dict[b"Content-Type"] = [b"application/json"]
-                        self.sign_request(
+                        auth_headers = self.build_auth_headers(
                             destination_bytes, method_bytes, url_to_sign_bytes,
-                            headers_dict, json,
+                            json,
                         )
                         data = encode_canonical_json(json)
-                        producer = FileBodyProducer(
+                        producer = QuieterFileBodyProducer(
                             BytesIO(data),
                             cooperator=self._cooperator,
                         )
                     else:
                         producer = None
-                        self.sign_request(
+                        auth_headers = self.build_auth_headers(
                             destination_bytes, method_bytes, url_to_sign_bytes,
-                            headers_dict,
                         )
 
+                    headers_dict[b"Authorization"] = auth_headers
+
                     logger.info(
-                        "{%s} [%s] Sending request: %s %s",
+                        "{%s} [%s] Sending request: %s %s; timeout %fs",
                         request.txn_id, request.destination, request.method,
-                        url_str,
+                        url_str, _sec_timeout,
                     )
 
-                    # we don't want all the fancy cookie and redirect handling that
-                    # treq.request gives: just use the raw Agent.
-                    request_deferred = self.agent.request(
-                        method_bytes,
-                        url_bytes,
-                        headers=Headers(headers_dict),
-                        bodyProducer=producer,
-                    )
+                    try:
+                        with Measure(self.clock, "outbound_request"):
+                            # we don't want all the fancy cookie and redirect handling
+                            # that treq.request gives: just use the raw Agent.
+                            request_deferred = self.agent.request(
+                                method_bytes,
+                                url_bytes,
+                                headers=Headers(headers_dict),
+                                bodyProducer=producer,
+                            )
+
+                            request_deferred = timeout_deferred(
+                                request_deferred,
+                                timeout=_sec_timeout,
+                                reactor=self.hs.get_reactor(),
+                            )
+
+                            response = yield request_deferred
+                    except DNSLookupError as e:
+                        raise_from(RequestSendFailed(e, can_retry=retry_on_dns_fail), e)
+                    except Exception as e:
+                        logger.info("Failed to send request: %s", e)
+                        raise_from(RequestSendFailed(e, can_retry=True), e)
 
-                    request_deferred = timeout_deferred(
-                        request_deferred,
-                        timeout=_sec_timeout,
-                        reactor=self.hs.get_reactor(),
+                    logger.info(
+                        "{%s} [%s] Got response headers: %d %s",
+                        request.txn_id,
+                        request.destination,
+                        response.code,
+                        response.phrase.decode('ascii', errors='replace'),
                     )
 
-                    with Measure(self.clock, "outbound_request"):
-                        response = yield make_deferred_yieldable(
-                            request_deferred,
+                    if 200 <= response.code < 300:
+                        pass
+                    else:
+                        # :'(
+                        # Update transactions table?
+                        d = treq.content(response)
+                        d = timeout_deferred(
+                            d,
+                            timeout=_sec_timeout,
+                            reactor=self.hs.get_reactor(),
+                        )
+
+                        try:
+                            body = yield make_deferred_yieldable(d)
+                        except Exception as e:
+                            # Eh, we're already going to raise an exception so lets
+                            # ignore if this fails.
+                            logger.warn(
+                                "{%s} [%s] Failed to get error response: %s %s: %s",
+                                request.txn_id,
+                                request.destination,
+                                request.method,
+                                url_str,
+                                _flatten_response_never_received(e),
+                            )
+                            body = None
+
+                        e = HttpResponseException(
+                            response.code, response.phrase, body
                         )
 
+                        # Retry if the error is a 429 (Too Many Requests),
+                        # otherwise just raise a standard HttpResponseException
+                        if response.code == 429:
+                            raise_from(RequestSendFailed(e, can_retry=True), e)
+                        else:
+                            raise e
+
                     break
-                except Exception as e:
+                except RequestSendFailed as e:
                     logger.warn(
                         "{%s} [%s] Request failed: %s %s: %s",
                         request.txn_id,
                         request.destination,
                         request.method,
                         url_str,
-                        _flatten_response_never_received(e),
+                        _flatten_response_never_received(e.inner_exception),
                     )
 
-                    if not retry_on_dns_fail and isinstance(e, DNSLookupError):
+                    if not e.can_retry:
                         raise
 
                     if retries_left and not timeout:
@@ -376,50 +410,36 @@ class MatrixFederationHttpClient(object):
                     else:
                         raise
 
-            logger.info(
-                "{%s} [%s] Got response headers: %d %s",
-                request.txn_id,
-                request.destination,
-                response.code,
-                response.phrase.decode('ascii', errors='replace'),
-            )
-
-            if 200 <= response.code < 300:
-                pass
-            else:
-                # :'(
-                # Update transactions table?
-                d = treq.content(response)
-                d = timeout_deferred(
-                    d,
-                    timeout=_sec_timeout,
-                    reactor=self.hs.get_reactor(),
-                )
-                body = yield make_deferred_yieldable(d)
-                raise HttpResponseException(
-                    response.code, response.phrase, body
-                )
+                except Exception as e:
+                    logger.warn(
+                        "{%s} [%s] Request failed: %s %s: %s",
+                        request.txn_id,
+                        request.destination,
+                        request.method,
+                        url_str,
+                        _flatten_response_never_received(e),
+                    )
+                    raise
 
             defer.returnValue(response)
 
-    def sign_request(self, destination, method, url_bytes, headers_dict,
-                     content=None, destination_is=None):
+    def build_auth_headers(
+        self, destination, method, url_bytes, content=None, destination_is=None,
+    ):
         """
-        Signs a request by adding an Authorization header to headers_dict
+        Builds the Authorization headers for a federation request
         Args:
             destination (bytes|None): The desination home server of the request.
                 May be None if the destination is an identity server, in which case
                 destination_is must be non-None.
             method (bytes): The HTTP method of the request
             url_bytes (bytes): The URI path of the request
-            headers_dict (dict[bytes, list[bytes]]): Dictionary of request headers to
-                append to
             content (object): The body of the request
             destination_is (bytes): As 'destination', but if the destination is an
                 identity server
 
         Returns:
-            None
+            list[bytes]: a list of headers to be added as "Authorization:" headers
         """
         request = {
             "method": method,
@@ -446,8 +466,7 @@ class MatrixFederationHttpClient(object):
                     self.server_name, key, sig,
                 )).encode('ascii')
             )
-
-        headers_dict[b"Authorization"] = auth_headers
+        return auth_headers
 
     @defer.inlineCallbacks
     def put_json(self, destination, path, args={}, data={},
@@ -477,17 +496,18 @@ class MatrixFederationHttpClient(object):
                 requests)
 
         Returns:
-            Deferred: Succeeds when we get a 2xx HTTP response. The result
-            will be the decoded JSON body.
-
-            Fails with ``HttpResponseException`` if we get an HTTP response
-            code >= 300.
-
-            Fails with ``NotRetryingDestination`` if we are not yet ready
-            to retry this server.
-
-            Fails with ``FederationDeniedError`` if this destination
-            is not on our federation whitelist
+            Deferred[dict|list]: Succeeds when we get a 2xx HTTP response. The
+            result will be the decoded JSON body.
+
+        Raises:
+            HttpResponseException: If we get an HTTP response code >= 300
+                (except 429).
+            NotRetryingDestination: If we are not yet ready to retry this
+                server.
+            FederationDeniedError: If this destination  is not on our
+                federation whitelist
+            RequestSendFailed: If there were problems connecting to the
+                remote, due to e.g. DNS failures, connection timeouts etc.
         """
 
         request = MatrixFederationRequest(
@@ -531,17 +551,18 @@ class MatrixFederationHttpClient(object):
                 try the request anyway.
             args (dict): query params
         Returns:
-            Deferred: Succeeds when we get a 2xx HTTP response. The result
-            will be the decoded JSON body.
-
-            Fails with ``HttpResponseException`` if we get an HTTP response
-            code >= 300.
-
-            Fails with ``NotRetryingDestination`` if we are not yet ready
-            to retry this server.
-
-            Fails with ``FederationDeniedError`` if this destination
-            is not on our federation whitelist
+            Deferred[dict|list]: Succeeds when we get a 2xx HTTP response. The
+            result will be the decoded JSON body.
+
+        Raises:
+            HttpResponseException: If we get an HTTP response code >= 300
+                (except 429).
+            NotRetryingDestination: If we are not yet ready to retry this
+                server.
+            FederationDeniedError: If this destination  is not on our
+                federation whitelist
+            RequestSendFailed: If there were problems connecting to the
+                remote, due to e.g. DNS failures, connection timeouts etc.
         """
 
         request = MatrixFederationRequest(
@@ -586,17 +607,18 @@ class MatrixFederationHttpClient(object):
             ignore_backoff (bool): true to ignore the historical backoff data
                 and try the request anyway.
         Returns:
-            Deferred: Succeeds when we get a 2xx HTTP response. The result
-            will be the decoded JSON body.
-
-            Fails with ``HttpResponseException`` if we get an HTTP response
-            code >= 300.
-
-            Fails with ``NotRetryingDestination`` if we are not yet ready
-            to retry this server.
-
-            Fails with ``FederationDeniedError`` if this destination
-            is not on our federation whitelist
+            Deferred[dict|list]: Succeeds when we get a 2xx HTTP response. The
+            result will be the decoded JSON body.
+
+        Raises:
+            HttpResponseException: If we get an HTTP response code >= 300
+                (except 429).
+            NotRetryingDestination: If we are not yet ready to retry this
+                server.
+            FederationDeniedError: If this destination  is not on our
+                federation whitelist
+            RequestSendFailed: If there were problems connecting to the
+                remote, due to e.g. DNS failures, connection timeouts etc.
         """
         logger.debug("get_json args: %s", args)
 
@@ -637,17 +659,18 @@ class MatrixFederationHttpClient(object):
             ignore_backoff (bool): true to ignore the historical backoff data and
                 try the request anyway.
         Returns:
-            Deferred: Succeeds when we get a 2xx HTTP response. The result
-            will be the decoded JSON body.
-
-            Fails with ``HttpResponseException`` if we get an HTTP response
-            code >= 300.
-
-            Fails with ``NotRetryingDestination`` if we are not yet ready
-            to retry this server.
-
-            Fails with ``FederationDeniedError`` if this destination
-            is not on our federation whitelist
+            Deferred[dict|list]: Succeeds when we get a 2xx HTTP response. The
+            result will be the decoded JSON body.
+
+        Raises:
+            HttpResponseException: If we get an HTTP response code >= 300
+                (except 429).
+            NotRetryingDestination: If we are not yet ready to retry this
+                server.
+            FederationDeniedError: If this destination  is not on our
+                federation whitelist
+            RequestSendFailed: If there were problems connecting to the
+                remote, due to e.g. DNS failures, connection timeouts etc.
         """
         request = MatrixFederationRequest(
             method="DELETE",
@@ -680,18 +703,20 @@ class MatrixFederationHttpClient(object):
             args (dict): Optional dictionary used to create the query string.
             ignore_backoff (bool): true to ignore the historical backoff data
                 and try the request anyway.
-        Returns:
-            Deferred: resolves with an (int,dict) tuple of the file length and
-            a dict of the response headers.
-
-            Fails with ``HttpResponseException`` if we get an HTTP response code
-            >= 300
-
-            Fails with ``NotRetryingDestination`` if we are not yet ready
-            to retry this server.
 
-            Fails with ``FederationDeniedError`` if this destination
-            is not on our federation whitelist
+        Returns:
+            Deferred[tuple[int, dict]]: Resolves with an (int,dict) tuple of
+            the file length and a dict of the response headers.
+
+        Raises:
+            HttpResponseException: If we get an HTTP response code >= 300
+                (except 429).
+            NotRetryingDestination: If we are not yet ready to retry this
+                server.
+            FederationDeniedError: If this destination  is not on our
+                federation whitelist
+            RequestSendFailed: If there were problems connecting to the
+                remote, due to e.g. DNS failures, connection timeouts etc.
         """
         request = MatrixFederationRequest(
             method="GET",
@@ -784,21 +809,21 @@ def check_content_type_is_json(headers):
         headers (twisted.web.http_headers.Headers): headers to check
 
     Raises:
-        RuntimeError if the
+        RequestSendFailed: if the Content-Type header is missing or isn't JSON
 
     """
     c_type = headers.getRawHeaders(b"Content-Type")
     if c_type is None:
-        raise RuntimeError(
+        raise RequestSendFailed(RuntimeError(
             "No Content-Type header"
-        )
+        ), can_retry=False)
 
     c_type = c_type[0].decode('ascii')  # only the first header
     val, options = cgi.parse_header(c_type)
     if val != "application/json":
-        raise RuntimeError(
+        raise RequestSendFailed(RuntimeError(
             "Content-Type not application/json: was '%s'" % c_type
-        )
+        ), can_retry=False)
 
 
 def encode_query_args(args):