diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index b34bb8e31a..13b19f7626 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -17,19 +17,19 @@ import cgi
import logging
import random
import sys
-import urllib
-from six import string_types
-from six.moves.urllib import parse as urlparse
+from six import PY3, string_types
+from six.moves import urllib
-from canonicaljson import encode_canonical_json, json
+import treq
+from canonicaljson import encode_canonical_json
from prometheus_client import Counter
from signedjson.sign import sign_json
-from twisted.internet import defer, protocol, reactor
+from twisted.internet import defer, protocol
from twisted.internet.error import DNSLookupError
from twisted.web._newclient import ResponseDone
-from twisted.web.client import Agent, HTTPConnectionPool, readBody
+from twisted.web.client import Agent, HTTPConnectionPool
from twisted.web.http_headers import Headers
import synapse.metrics
@@ -40,11 +40,11 @@ from synapse.api.errors import (
HttpResponseException,
SynapseError,
)
-from synapse.http import cancelled_to_request_timed_out_error
from synapse.http.endpoint import matrix_federation_endpoint
from synapse.util import logcontext
-from synapse.util.async_helpers import add_timeout_to_deferred
+from synapse.util.async_helpers import timeout_no_seriously
from synapse.util.logcontext import make_deferred_yieldable
+from synapse.util.metrics import Measure
logger = logging.getLogger(__name__)
outbound_logger = logging.getLogger("synapse.http.outbound")
@@ -58,16 +58,22 @@ incoming_responses_counter = Counter("synapse_http_matrixfederationclient_respon
MAX_LONG_RETRIES = 10
MAX_SHORT_RETRIES = 3
+if PY3:
+ MAXINT = sys.maxsize
+else:
+ MAXINT = sys.maxint
+
class MatrixFederationEndpointFactory(object):
def __init__(self, hs):
+ self.reactor = hs.get_reactor()
self.tls_client_options_factory = hs.tls_client_options_factory
def endpointForURI(self, uri):
- destination = uri.netloc
+ destination = uri.netloc.decode('ascii')
return matrix_federation_endpoint(
- reactor, destination, timeout=10,
+ self.reactor, destination, timeout=10,
tls_client_options_factory=self.tls_client_options_factory
)
@@ -85,7 +91,9 @@ class MatrixFederationHttpClient(object):
self.hs = hs
self.signing_key = hs.config.signing_key[0]
self.server_name = hs.hostname
+ reactor = hs.get_reactor()
pool = HTTPConnectionPool(reactor)
+ pool.retryAutomatically = False
pool.maxPersistentPerHost = 5
pool.cachedConnectionTimeout = 2 * 60
self.agent = Agent.usingEndpointFactory(
@@ -93,26 +101,33 @@ class MatrixFederationHttpClient(object):
)
self.clock = hs.get_clock()
self._store = hs.get_datastore()
- self.version_string = hs.version_string
+ self.version_string = hs.version_string.encode('ascii')
self._next_id = 1
+ self.default_timeout = 60
def _create_url(self, destination, path_bytes, param_bytes, query_bytes):
- return urlparse.urlunparse(
- ("matrix", destination, path_bytes, param_bytes, query_bytes, "")
+ return urllib.parse.urlunparse(
+ (b"matrix", destination, path_bytes, param_bytes, query_bytes, b"")
)
@defer.inlineCallbacks
def _request(self, destination, method, path,
- body_callback, headers_dict={}, param_bytes=b"",
- query_bytes=b"", retry_on_dns_fail=True,
+ json=None, json_callback=None,
+ param_bytes=b"",
+ query=None, retry_on_dns_fail=True,
timeout=None, long_retries=False,
ignore_backoff=False,
backoff_on_404=False):
- """ Creates and sends a request to the given server
+ """
+ Creates and sends a request to the given server.
+
Args:
destination (str): The remote server to send the HTTP request to.
method (str): HTTP method
path (str): The HTTP path
+ json (dict or None): JSON to send in the body.
+ json_callback (func or None): A callback to generate the JSON.
+ query (dict or None): Query arguments.
ignore_backoff (bool): true to ignore the historical backoff data
and try the request anyway.
backoff_on_404 (bool): Back off if we get a 404
@@ -132,6 +147,11 @@ class MatrixFederationHttpClient(object):
(May also fail with plenty of other Exceptions for things like DNS
failures, connection failures, SSL failures.)
"""
+ if timeout:
+ _sec_timeout = timeout / 1000
+ else:
+ _sec_timeout = self.default_timeout
+
if (
self.hs.config.federation_domain_whitelist is not None and
destination not in self.hs.config.federation_domain_whitelist
@@ -146,23 +166,25 @@ class MatrixFederationHttpClient(object):
ignore_backoff=ignore_backoff,
)
- destination = destination.encode("ascii")
+ headers_dict = {}
path_bytes = path.encode("ascii")
- with limiter:
- headers_dict[b"User-Agent"] = [self.version_string]
- headers_dict[b"Host"] = [destination]
+ if query:
+ query_bytes = encode_query_args(query)
+ else:
+ query_bytes = b""
- url_bytes = self._create_url(
- destination, path_bytes, param_bytes, query_bytes
- )
+ headers_dict = {
+ "User-Agent": [self.version_string],
+ "Host": [destination],
+ }
- txn_id = "%s-O-%s" % (method, self._next_id)
- self._next_id = (self._next_id + 1) % (sys.maxint - 1)
+ with limiter:
+ url = self._create_url(
+ destination.encode("ascii"), path_bytes, param_bytes, query_bytes
+ ).decode('ascii')
- outbound_logger.info(
- "{%s} [%s] Sending request: %s %s",
- txn_id, destination, method, url_bytes
- )
+ txn_id = "%s-O-%s" % (method, self._next_id)
+ self._next_id = (self._next_id + 1) % (MAXINT - 1)
# XXX: Would be much nicer to retry only at the transaction-layer
# (once we have reliable transactions in place)
@@ -171,80 +193,110 @@ class MatrixFederationHttpClient(object):
else:
retries_left = MAX_SHORT_RETRIES
- http_url_bytes = urlparse.urlunparse(
- ("", "", path_bytes, param_bytes, query_bytes, "")
- )
+ http_url = urllib.parse.urlunparse(
+ (b"", b"", path_bytes, param_bytes, query_bytes, b"")
+ ).decode('ascii')
log_result = None
- try:
- while True:
- producer = None
- if body_callback:
- producer = body_callback(method, http_url_bytes, headers_dict)
-
- try:
- request_deferred = self.agent.request(
- method,
- url_bytes,
- Headers(headers_dict),
- producer
- )
- add_timeout_to_deferred(
- request_deferred,
- timeout / 1000. if timeout else 60,
- self.hs.get_reactor(),
- cancelled_to_request_timed_out_error,
+ while True:
+ try:
+ if json_callback:
+ json = json_callback()
+
+ if json:
+ data = encode_canonical_json(json)
+ headers_dict["Content-Type"] = ["application/json"]
+ self.sign_request(
+ destination, method, http_url, headers_dict, json
)
+ else:
+ data = None
+ self.sign_request(destination, method, http_url, headers_dict)
+
+ outbound_logger.info(
+ "{%s} [%s] Sending request: %s %s",
+ txn_id, destination, method, url
+ )
+
+ request_deferred = treq.request(
+ method,
+ url,
+ headers=Headers(headers_dict),
+ data=data,
+ agent=self.agent,
+ reactor=self.hs.get_reactor(),
+ unbuffered=True
+ )
+ request_deferred.addTimeout(_sec_timeout, self.hs.get_reactor())
+
+ # Sometimes the timeout above doesn't work, so lets hack yet
+ # another layer of timeouts in in the vain hope that at some
+ # point the world made sense and this really really really
+ # should work.
+ request_deferred = timeout_no_seriously(
+ request_deferred,
+ timeout=_sec_timeout * 2,
+ reactor=self.hs.get_reactor(),
+ )
+
+ with Measure(self.clock, "outbound_request"):
response = yield make_deferred_yieldable(
request_deferred,
)
- log_result = "%d %s" % (response.code, response.phrase,)
- break
- except Exception as e:
- if not retry_on_dns_fail and isinstance(e, DNSLookupError):
- logger.warn(
- "DNS Lookup failed to %s with %s",
- destination,
- e
- )
- log_result = "DNS Lookup failed to %s with %s" % (
- destination, e
- )
- raise
-
+ log_result = "%d %s" % (response.code, response.phrase,)
+ break
+ except Exception as e:
+ if not retry_on_dns_fail and isinstance(e, DNSLookupError):
logger.warn(
- "{%s} Sending request failed to %s: %s %s: %s",
- txn_id,
+ "DNS Lookup failed to %s with %s",
destination,
- method,
- url_bytes,
- _flatten_response_never_received(e),
+ e
)
+ log_result = "DNS Lookup failed to %s with %s" % (
+ destination, e
+ )
+ raise
+
+ logger.warn(
+ "{%s} Sending request failed to %s: %s %s: %s",
+ txn_id,
+ destination,
+ method,
+ url,
+ _flatten_response_never_received(e),
+ )
+
+ log_result = _flatten_response_never_received(e)
+
+ if retries_left and not timeout:
+ if long_retries:
+ delay = 4 ** (MAX_LONG_RETRIES + 1 - retries_left)
+ delay = min(delay, 60)
+ delay *= random.uniform(0.8, 1.4)
+ else:
+ delay = 0.5 * 2 ** (MAX_SHORT_RETRIES - retries_left)
+ delay = min(delay, 2)
+ delay *= random.uniform(0.8, 1.4)
- log_result = _flatten_response_never_received(e)
-
- if retries_left and not timeout:
- if long_retries:
- delay = 4 ** (MAX_LONG_RETRIES + 1 - retries_left)
- delay = min(delay, 60)
- delay *= random.uniform(0.8, 1.4)
- else:
- delay = 0.5 * 2 ** (MAX_SHORT_RETRIES - retries_left)
- delay = min(delay, 2)
- delay *= random.uniform(0.8, 1.4)
+ logger.debug(
+ "{%s} Waiting %s before sending to %s...",
+ txn_id,
+ delay,
+ destination
+ )
- yield self.clock.sleep(delay)
- retries_left -= 1
- else:
- raise
- finally:
- outbound_logger.info(
- "{%s} [%s] Result: %s",
- txn_id,
- destination,
- log_result,
- )
+ yield self.clock.sleep(delay)
+ retries_left -= 1
+ else:
+ raise
+ finally:
+ outbound_logger.info(
+ "{%s} [%s] Result: %s",
+ txn_id,
+ destination,
+ log_result,
+ )
if 200 <= response.code < 300:
pass
@@ -252,7 +304,9 @@ class MatrixFederationHttpClient(object):
# :'(
# Update transactions table?
with logcontext.PreserveLoggingContext():
- body = yield readBody(response)
+ d = treq.content(response)
+ d.addTimeout(_sec_timeout, self.hs.get_reactor())
+ body = yield make_deferred_yieldable(d)
raise HttpResponseException(
response.code, response.phrase, body
)
@@ -297,11 +351,11 @@ class MatrixFederationHttpClient(object):
auth_headers = []
for key, sig in request["signatures"][self.server_name].items():
- auth_headers.append(bytes(
+ auth_headers.append((
"X-Matrix origin=%s,key=\"%s\",sig=\"%s\"" % (
self.server_name, key, sig,
- )
- ))
+ )).encode('ascii')
+ )
headers_dict[b"Authorization"] = auth_headers
@@ -347,24 +401,14 @@ class MatrixFederationHttpClient(object):
"""
if not json_data_callback:
- def json_data_callback():
- return data
-
- def body_callback(method, url_bytes, headers_dict):
- json_data = json_data_callback()
- self.sign_request(
- destination, method, url_bytes, headers_dict, json_data
- )
- producer = _JsonProducer(json_data)
- return producer
+ json_data_callback = lambda: data
response = yield self._request(
destination,
"PUT",
path,
- body_callback=body_callback,
- headers_dict={"Content-Type": ["application/json"]},
- query_bytes=encode_query_args(args),
+ json_callback=json_data_callback,
+ query=args,
long_retries=long_retries,
timeout=timeout,
ignore_backoff=ignore_backoff,
@@ -376,8 +420,10 @@ class MatrixFederationHttpClient(object):
check_content_type_is_json(response.headers)
with logcontext.PreserveLoggingContext():
- body = yield readBody(response)
- defer.returnValue(json.loads(body))
+ d = treq.json_content(response)
+ d.addTimeout(self.default_timeout, self.hs.get_reactor())
+ body = yield make_deferred_yieldable(d)
+ defer.returnValue(body)
@defer.inlineCallbacks
def post_json(self, destination, path, data={}, long_retries=False,
@@ -410,20 +456,12 @@ class MatrixFederationHttpClient(object):
Fails with ``FederationDeniedError`` if this destination
is not on our federation whitelist
"""
-
- def body_callback(method, url_bytes, headers_dict):
- self.sign_request(
- destination, method, url_bytes, headers_dict, data
- )
- return _JsonProducer(data)
-
response = yield self._request(
destination,
"POST",
path,
- query_bytes=encode_query_args(args),
- body_callback=body_callback,
- headers_dict={"Content-Type": ["application/json"]},
+ query=args,
+ json=data,
long_retries=long_retries,
timeout=timeout,
ignore_backoff=ignore_backoff,
@@ -434,9 +472,16 @@ class MatrixFederationHttpClient(object):
check_content_type_is_json(response.headers)
with logcontext.PreserveLoggingContext():
- body = yield readBody(response)
+ d = treq.json_content(response)
+ if timeout:
+ _sec_timeout = timeout / 1000
+ else:
+ _sec_timeout = self.default_timeout
+
+ d.addTimeout(_sec_timeout, self.hs.get_reactor())
+ body = yield make_deferred_yieldable(d)
- defer.returnValue(json.loads(body))
+ defer.returnValue(body)
@defer.inlineCallbacks
def get_json(self, destination, path, args=None, retry_on_dns_fail=True,
@@ -471,16 +516,11 @@ class MatrixFederationHttpClient(object):
logger.debug("Query bytes: %s Retry DNS: %s", args, retry_on_dns_fail)
- def body_callback(method, url_bytes, headers_dict):
- self.sign_request(destination, method, url_bytes, headers_dict)
- return None
-
response = yield self._request(
destination,
"GET",
path,
- query_bytes=encode_query_args(args),
- body_callback=body_callback,
+ query=args,
retry_on_dns_fail=retry_on_dns_fail,
timeout=timeout,
ignore_backoff=ignore_backoff,
@@ -491,9 +531,11 @@ class MatrixFederationHttpClient(object):
check_content_type_is_json(response.headers)
with logcontext.PreserveLoggingContext():
- body = yield readBody(response)
+ d = treq.json_content(response)
+ d.addTimeout(self.default_timeout, self.hs.get_reactor())
+ body = yield make_deferred_yieldable(d)
- defer.returnValue(json.loads(body))
+ defer.returnValue(body)
@defer.inlineCallbacks
def delete_json(self, destination, path, long_retries=False,
@@ -523,13 +565,11 @@ class MatrixFederationHttpClient(object):
Fails with ``FederationDeniedError`` if this destination
is not on our federation whitelist
"""
-
response = yield self._request(
destination,
"DELETE",
path,
- query_bytes=encode_query_args(args),
- headers_dict={"Content-Type": ["application/json"]},
+ query=args,
long_retries=long_retries,
timeout=timeout,
ignore_backoff=ignore_backoff,
@@ -540,9 +580,11 @@ class MatrixFederationHttpClient(object):
check_content_type_is_json(response.headers)
with logcontext.PreserveLoggingContext():
- body = yield readBody(response)
+ d = treq.json_content(response)
+ d.addTimeout(self.default_timeout, self.hs.get_reactor())
+ body = yield make_deferred_yieldable(d)
- defer.returnValue(json.loads(body))
+ defer.returnValue(body)
@defer.inlineCallbacks
def get_file(self, destination, path, output_stream, args={},
@@ -569,26 +611,11 @@ class MatrixFederationHttpClient(object):
Fails with ``FederationDeniedError`` if this destination
is not on our federation whitelist
"""
-
- encoded_args = {}
- for k, vs in args.items():
- if isinstance(vs, string_types):
- vs = [vs]
- encoded_args[k] = [v.encode("UTF-8") for v in vs]
-
- query_bytes = urllib.urlencode(encoded_args, True)
- logger.debug("Query bytes: %s Retry DNS: %s", query_bytes, retry_on_dns_fail)
-
- def body_callback(method, url_bytes, headers_dict):
- self.sign_request(destination, method, url_bytes, headers_dict)
- return None
-
response = yield self._request(
destination,
"GET",
path,
- query_bytes=query_bytes,
- body_callback=body_callback,
+ query=args,
retry_on_dns_fail=retry_on_dns_fail,
ignore_backoff=ignore_backoff,
)
@@ -597,9 +624,9 @@ class MatrixFederationHttpClient(object):
try:
with logcontext.PreserveLoggingContext():
- length = yield _readBodyToFile(
- response, output_stream, max_size
- )
+ d = _readBodyToFile(response, output_stream, max_size)
+ d.addTimeout(self.default_timeout, self.hs.get_reactor())
+ length = yield make_deferred_yieldable(d)
except Exception:
logger.exception("Failed to download body")
raise
@@ -639,30 +666,6 @@ def _readBodyToFile(response, stream, max_size):
return d
-class _JsonProducer(object):
- """ Used by the twisted http client to create the HTTP body from json
- """
- def __init__(self, jsn):
- self.reset(jsn)
-
- def reset(self, jsn):
- self.body = encode_canonical_json(jsn)
- self.length = len(self.body)
-
- def startProducing(self, consumer):
- consumer.write(self.body)
- return defer.succeed(None)
-
- def pauseProducing(self):
- pass
-
- def stopProducing(self):
- pass
-
- def resumeProducing(self):
- pass
-
-
def _flatten_response_never_received(e):
if hasattr(e, "reasons"):
reasons = ", ".join(
@@ -693,7 +696,7 @@ def check_content_type_is_json(headers):
"No Content-Type header"
)
- c_type = c_type[0] # only the first header
+ c_type = c_type[0].decode('ascii') # only the first header
val, options = cgi.parse_header(c_type)
if val != "application/json":
raise RuntimeError(
@@ -711,6 +714,6 @@ def encode_query_args(args):
vs = [vs]
encoded_args[k] = [v.encode("UTF-8") for v in vs]
- query_bytes = urllib.urlencode(encoded_args, True)
+ query_bytes = urllib.parse.urlencode(encoded_args, True)
- return query_bytes
+ return query_bytes.encode('utf8')
|