diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 24b6110c20..1682c9af13 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -19,7 +19,7 @@ import random
import sys
from io import BytesIO
-from six import PY3, string_types
+from six import PY3, raise_from, string_types
from six.moves import urllib
import attr
@@ -32,7 +32,6 @@ from twisted.internet import defer, protocol
from twisted.internet.error import DNSLookupError
from twisted.internet.task import _EPSILON, Cooperator
from twisted.web._newclient import ResponseDone
-from twisted.web.client import Agent, FileBodyProducer, HTTPConnectionPool
from twisted.web.http_headers import Headers
import synapse.metrics
@@ -41,9 +40,11 @@ from synapse.api.errors import (
Codes,
FederationDeniedError,
HttpResponseException,
+ RequestSendFailed,
SynapseError,
)
-from synapse.http.endpoint import matrix_federation_endpoint
+from synapse.http import QuieterFileBodyProducer
+from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent
from synapse.util.async_helpers import timeout_deferred
from synapse.util.logcontext import make_deferred_yieldable
from synapse.util.metrics import Measure
@@ -65,20 +66,6 @@ else:
MAXINT = sys.maxint
-class MatrixFederationEndpointFactory(object):
- def __init__(self, hs):
- self.reactor = hs.get_reactor()
- self.tls_client_options_factory = hs.tls_client_options_factory
-
- def endpointForURI(self, uri):
- destination = uri.netloc.decode('ascii')
-
- return matrix_federation_endpoint(
- self.reactor, destination, timeout=10,
- tls_client_options_factory=self.tls_client_options_factory
- )
-
-
_next_id = 1
@@ -181,17 +168,15 @@ class MatrixFederationHttpClient(object):
requests.
"""
- def __init__(self, hs):
+ def __init__(self, hs, tls_client_options_factory):
self.hs = hs
self.signing_key = hs.config.signing_key[0]
self.server_name = hs.hostname
reactor = hs.get_reactor()
- pool = HTTPConnectionPool(reactor)
- pool.retryAutomatically = False
- pool.maxPersistentPerHost = 5
- pool.cachedConnectionTimeout = 2 * 60
- self.agent = Agent.usingEndpointFactory(
- reactor, MatrixFederationEndpointFactory(hs), pool=pool
+
+ self.agent = MatrixFederationAgent(
+ hs.get_reactor(),
+ tls_client_options_factory,
)
self.clock = hs.get_clock()
self._store = hs.get_datastore()
@@ -228,19 +213,18 @@ class MatrixFederationHttpClient(object):
backoff_on_404 (bool): Back off if we get a 404
Returns:
- Deferred: resolves with the http response object on success.
-
- Fails with ``HttpResponseException``: if we get an HTTP response
- code >= 300.
-
- Fails with ``NotRetryingDestination`` if we are not yet ready
- to retry this server.
-
- Fails with ``FederationDeniedError`` if this destination
- is not on our federation whitelist
-
- (May also fail with plenty of other Exceptions for things like DNS
- failures, connection failures, SSL failures.)
+ Deferred[twisted.web.client.Response]: resolves with the HTTP
+ response object on success.
+
+ Raises:
+ HttpResponseException: If we get an HTTP response code >= 300
+ (except 429).
+ NotRetryingDestination: If we are not yet ready to retry this
+ server.
+ FederationDeniedError: If this destination is not on our
+ federation whitelist
+ RequestSendFailed: If there were problems connecting to the
+ remote, due to e.g. DNS failures, connection timeouts etc.
"""
if timeout:
_sec_timeout = timeout / 1000
@@ -271,7 +255,6 @@ class MatrixFederationHttpClient(object):
headers_dict = {
b"User-Agent": [self.version_string_bytes],
- b"Host": [destination_bytes],
}
with limiter:
@@ -298,60 +281,111 @@ class MatrixFederationHttpClient(object):
json = request.get_json()
if json:
headers_dict[b"Content-Type"] = [b"application/json"]
- self.sign_request(
+ auth_headers = self.build_auth_headers(
destination_bytes, method_bytes, url_to_sign_bytes,
- headers_dict, json,
+ json,
)
data = encode_canonical_json(json)
- producer = FileBodyProducer(
+ producer = QuieterFileBodyProducer(
BytesIO(data),
cooperator=self._cooperator,
)
else:
producer = None
- self.sign_request(
+ auth_headers = self.build_auth_headers(
destination_bytes, method_bytes, url_to_sign_bytes,
- headers_dict,
)
+ headers_dict[b"Authorization"] = auth_headers
+
logger.info(
- "{%s} [%s] Sending request: %s %s",
+ "{%s} [%s] Sending request: %s %s; timeout %fs",
request.txn_id, request.destination, request.method,
- url_str,
+ url_str, _sec_timeout,
)
- # we don't want all the fancy cookie and redirect handling that
- # treq.request gives: just use the raw Agent.
- request_deferred = self.agent.request(
- method_bytes,
- url_bytes,
- headers=Headers(headers_dict),
- bodyProducer=producer,
- )
+ try:
+ with Measure(self.clock, "outbound_request"):
+ # we don't want all the fancy cookie and redirect handling
+ # that treq.request gives: just use the raw Agent.
+ request_deferred = self.agent.request(
+ method_bytes,
+ url_bytes,
+ headers=Headers(headers_dict),
+ bodyProducer=producer,
+ )
+
+ request_deferred = timeout_deferred(
+ request_deferred,
+ timeout=_sec_timeout,
+ reactor=self.hs.get_reactor(),
+ )
+
+ response = yield request_deferred
+ except DNSLookupError as e:
+ raise_from(RequestSendFailed(e, can_retry=retry_on_dns_fail), e)
+ except Exception as e:
+ logger.info("Failed to send request: %s", e)
+ raise_from(RequestSendFailed(e, can_retry=True), e)
- request_deferred = timeout_deferred(
- request_deferred,
- timeout=_sec_timeout,
- reactor=self.hs.get_reactor(),
+ logger.info(
+ "{%s} [%s] Got response headers: %d %s",
+ request.txn_id,
+ request.destination,
+ response.code,
+ response.phrase.decode('ascii', errors='replace'),
)
- with Measure(self.clock, "outbound_request"):
- response = yield make_deferred_yieldable(
- request_deferred,
+ if 200 <= response.code < 300:
+ pass
+ else:
+ # :'(
+ # Update transactions table?
+ d = treq.content(response)
+ d = timeout_deferred(
+ d,
+ timeout=_sec_timeout,
+ reactor=self.hs.get_reactor(),
+ )
+
+ try:
+ body = yield make_deferred_yieldable(d)
+ except Exception as e:
+ # Eh, we're already going to raise an exception so lets
+ # ignore if this fails.
+ logger.warn(
+ "{%s} [%s] Failed to get error response: %s %s: %s",
+ request.txn_id,
+ request.destination,
+ request.method,
+ url_str,
+ _flatten_response_never_received(e),
+ )
+ body = None
+
+ e = HttpResponseException(
+ response.code, response.phrase, body
)
+ # Retry if the error is a 429 (Too Many Requests),
+ # otherwise just raise a standard HttpResponseException
+ if response.code == 429:
+ raise_from(RequestSendFailed(e, can_retry=True), e)
+ else:
+ raise e
+
break
- except Exception as e:
+ except RequestSendFailed as e:
logger.warn(
"{%s} [%s] Request failed: %s %s: %s",
request.txn_id,
request.destination,
request.method,
url_str,
- _flatten_response_never_received(e),
+ _flatten_response_never_received(e.inner_exception),
)
- if not retry_on_dns_fail and isinstance(e, DNSLookupError):
+ if not e.can_retry:
raise
if retries_left and not timeout:
@@ -376,50 +410,36 @@ class MatrixFederationHttpClient(object):
else:
raise
- logger.info(
- "{%s} [%s] Got response headers: %d %s",
- request.txn_id,
- request.destination,
- response.code,
- response.phrase.decode('ascii', errors='replace'),
- )
-
- if 200 <= response.code < 300:
- pass
- else:
- # :'(
- # Update transactions table?
- d = treq.content(response)
- d = timeout_deferred(
- d,
- timeout=_sec_timeout,
- reactor=self.hs.get_reactor(),
- )
- body = yield make_deferred_yieldable(d)
- raise HttpResponseException(
- response.code, response.phrase, body
- )
+ except Exception as e:
+ logger.warn(
+ "{%s} [%s] Request failed: %s %s: %s",
+ request.txn_id,
+ request.destination,
+ request.method,
+ url_str,
+ _flatten_response_never_received(e),
+ )
+ raise
defer.returnValue(response)
- def sign_request(self, destination, method, url_bytes, headers_dict,
- content=None, destination_is=None):
+ def build_auth_headers(
+ self, destination, method, url_bytes, content=None, destination_is=None,
+ ):
"""
- Signs a request by adding an Authorization header to headers_dict
+ Builds the Authorization headers for a federation request
Args:
destination (bytes|None): The desination home server of the request.
May be None if the destination is an identity server, in which case
destination_is must be non-None.
method (bytes): The HTTP method of the request
url_bytes (bytes): The URI path of the request
- headers_dict (dict[bytes, list[bytes]]): Dictionary of request headers to
- append to
content (object): The body of the request
destination_is (bytes): As 'destination', but if the destination is an
identity server
Returns:
- None
+ list[bytes]: a list of headers to be added as "Authorization:" headers
"""
request = {
"method": method,
@@ -446,8 +466,7 @@ class MatrixFederationHttpClient(object):
self.server_name, key, sig,
)).encode('ascii')
)
-
- headers_dict[b"Authorization"] = auth_headers
+ return auth_headers
@defer.inlineCallbacks
def put_json(self, destination, path, args={}, data={},
@@ -477,17 +496,18 @@ class MatrixFederationHttpClient(object):
requests)
Returns:
- Deferred: Succeeds when we get a 2xx HTTP response. The result
- will be the decoded JSON body.
-
- Fails with ``HttpResponseException`` if we get an HTTP response
- code >= 300.
-
- Fails with ``NotRetryingDestination`` if we are not yet ready
- to retry this server.
-
- Fails with ``FederationDeniedError`` if this destination
- is not on our federation whitelist
+ Deferred[dict|list]: Succeeds when we get a 2xx HTTP response. The
+ result will be the decoded JSON body.
+
+ Raises:
+ HttpResponseException: If we get an HTTP response code >= 300
+ (except 429).
+ NotRetryingDestination: If we are not yet ready to retry this
+ server.
+ FederationDeniedError: If this destination is not on our
+ federation whitelist
+ RequestSendFailed: If there were problems connecting to the
+ remote, due to e.g. DNS failures, connection timeouts etc.
"""
request = MatrixFederationRequest(
@@ -531,17 +551,18 @@ class MatrixFederationHttpClient(object):
try the request anyway.
args (dict): query params
Returns:
- Deferred: Succeeds when we get a 2xx HTTP response. The result
- will be the decoded JSON body.
-
- Fails with ``HttpResponseException`` if we get an HTTP response
- code >= 300.
-
- Fails with ``NotRetryingDestination`` if we are not yet ready
- to retry this server.
-
- Fails with ``FederationDeniedError`` if this destination
- is not on our federation whitelist
+ Deferred[dict|list]: Succeeds when we get a 2xx HTTP response. The
+ result will be the decoded JSON body.
+
+ Raises:
+ HttpResponseException: If we get an HTTP response code >= 300
+ (except 429).
+ NotRetryingDestination: If we are not yet ready to retry this
+ server.
+ FederationDeniedError: If this destination is not on our
+ federation whitelist
+ RequestSendFailed: If there were problems connecting to the
+ remote, due to e.g. DNS failures, connection timeouts etc.
"""
request = MatrixFederationRequest(
@@ -586,17 +607,18 @@ class MatrixFederationHttpClient(object):
ignore_backoff (bool): true to ignore the historical backoff data
and try the request anyway.
Returns:
- Deferred: Succeeds when we get a 2xx HTTP response. The result
- will be the decoded JSON body.
-
- Fails with ``HttpResponseException`` if we get an HTTP response
- code >= 300.
-
- Fails with ``NotRetryingDestination`` if we are not yet ready
- to retry this server.
-
- Fails with ``FederationDeniedError`` if this destination
- is not on our federation whitelist
+ Deferred[dict|list]: Succeeds when we get a 2xx HTTP response. The
+ result will be the decoded JSON body.
+
+ Raises:
+ HttpResponseException: If we get an HTTP response code >= 300
+ (except 429).
+ NotRetryingDestination: If we are not yet ready to retry this
+ server.
+ FederationDeniedError: If this destination is not on our
+ federation whitelist
+ RequestSendFailed: If there were problems connecting to the
+ remote, due to e.g. DNS failures, connection timeouts etc.
"""
logger.debug("get_json args: %s", args)
@@ -637,17 +659,18 @@ class MatrixFederationHttpClient(object):
ignore_backoff (bool): true to ignore the historical backoff data and
try the request anyway.
Returns:
- Deferred: Succeeds when we get a 2xx HTTP response. The result
- will be the decoded JSON body.
-
- Fails with ``HttpResponseException`` if we get an HTTP response
- code >= 300.
-
- Fails with ``NotRetryingDestination`` if we are not yet ready
- to retry this server.
-
- Fails with ``FederationDeniedError`` if this destination
- is not on our federation whitelist
+ Deferred[dict|list]: Succeeds when we get a 2xx HTTP response. The
+ result will be the decoded JSON body.
+
+ Raises:
+ HttpResponseException: If we get an HTTP response code >= 300
+ (except 429).
+ NotRetryingDestination: If we are not yet ready to retry this
+ server.
+ FederationDeniedError: If this destination is not on our
+ federation whitelist
+ RequestSendFailed: If there were problems connecting to the
+ remote, due to e.g. DNS failures, connection timeouts etc.
"""
request = MatrixFederationRequest(
method="DELETE",
@@ -680,18 +703,20 @@ class MatrixFederationHttpClient(object):
args (dict): Optional dictionary used to create the query string.
ignore_backoff (bool): true to ignore the historical backoff data
and try the request anyway.
- Returns:
- Deferred: resolves with an (int,dict) tuple of the file length and
- a dict of the response headers.
-
- Fails with ``HttpResponseException`` if we get an HTTP response code
- >= 300
-
- Fails with ``NotRetryingDestination`` if we are not yet ready
- to retry this server.
- Fails with ``FederationDeniedError`` if this destination
- is not on our federation whitelist
+ Returns:
+ Deferred[tuple[int, dict]]: Resolves with an (int,dict) tuple of
+ the file length and a dict of the response headers.
+
+ Raises:
+ HttpResponseException: If we get an HTTP response code >= 300
+ (except 429).
+ NotRetryingDestination: If we are not yet ready to retry this
+ server.
+ FederationDeniedError: If this destination is not on our
+ federation whitelist
+ RequestSendFailed: If there were problems connecting to the
+ remote, due to e.g. DNS failures, connection timeouts etc.
"""
request = MatrixFederationRequest(
method="GET",
@@ -784,21 +809,21 @@ def check_content_type_is_json(headers):
headers (twisted.web.http_headers.Headers): headers to check
Raises:
- RuntimeError if the
+ RequestSendFailed: if the Content-Type header is missing or isn't JSON
"""
c_type = headers.getRawHeaders(b"Content-Type")
if c_type is None:
- raise RuntimeError(
+ raise RequestSendFailed(RuntimeError(
"No Content-Type header"
- )
+ ), can_retry=False)
c_type = c_type[0].decode('ascii') # only the first header
val, options = cgi.parse_header(c_type)
if val != "application/json":
- raise RuntimeError(
+ raise RequestSendFailed(RuntimeError(
"Content-Type not application/json: was '%s'" % c_type
- )
+ ), can_retry=False)
def encode_query_args(args):
|