summary refs log tree commit diff
path: root/synapse/http/matrixfederationclient.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/http/matrixfederationclient.py')
-rw-r--r--synapse/http/matrixfederationclient.py365
1 files changed, 184 insertions, 181 deletions
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index b34bb8e31a..13b19f7626 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -17,19 +17,19 @@ import cgi
 import logging
 import random
 import sys
-import urllib
 
-from six import string_types
-from six.moves.urllib import parse as urlparse
+from six import PY3, string_types
+from six.moves import urllib
 
-from canonicaljson import encode_canonical_json, json
+import treq
+from canonicaljson import encode_canonical_json
 from prometheus_client import Counter
 from signedjson.sign import sign_json
 
-from twisted.internet import defer, protocol, reactor
+from twisted.internet import defer, protocol
 from twisted.internet.error import DNSLookupError
 from twisted.web._newclient import ResponseDone
-from twisted.web.client import Agent, HTTPConnectionPool, readBody
+from twisted.web.client import Agent, HTTPConnectionPool
 from twisted.web.http_headers import Headers
 
 import synapse.metrics
@@ -40,11 +40,11 @@ from synapse.api.errors import (
     HttpResponseException,
     SynapseError,
 )
-from synapse.http import cancelled_to_request_timed_out_error
 from synapse.http.endpoint import matrix_federation_endpoint
 from synapse.util import logcontext
-from synapse.util.async_helpers import add_timeout_to_deferred
+from synapse.util.async_helpers import timeout_no_seriously
 from synapse.util.logcontext import make_deferred_yieldable
+from synapse.util.metrics import Measure
 
 logger = logging.getLogger(__name__)
 outbound_logger = logging.getLogger("synapse.http.outbound")
@@ -58,16 +58,22 @@ incoming_responses_counter = Counter("synapse_http_matrixfederationclient_respon
 MAX_LONG_RETRIES = 10
 MAX_SHORT_RETRIES = 3
 
+if PY3:
+    MAXINT = sys.maxsize
+else:
+    MAXINT = sys.maxint
+
 
 class MatrixFederationEndpointFactory(object):
     def __init__(self, hs):
+        self.reactor = hs.get_reactor()
         self.tls_client_options_factory = hs.tls_client_options_factory
 
     def endpointForURI(self, uri):
-        destination = uri.netloc
+        destination = uri.netloc.decode('ascii')
 
         return matrix_federation_endpoint(
-            reactor, destination, timeout=10,
+            self.reactor, destination, timeout=10,
             tls_client_options_factory=self.tls_client_options_factory
         )
 
@@ -85,7 +91,9 @@ class MatrixFederationHttpClient(object):
         self.hs = hs
         self.signing_key = hs.config.signing_key[0]
         self.server_name = hs.hostname
+        reactor = hs.get_reactor()
         pool = HTTPConnectionPool(reactor)
+        pool.retryAutomatically = False
         pool.maxPersistentPerHost = 5
         pool.cachedConnectionTimeout = 2 * 60
         self.agent = Agent.usingEndpointFactory(
@@ -93,26 +101,33 @@ class MatrixFederationHttpClient(object):
         )
         self.clock = hs.get_clock()
         self._store = hs.get_datastore()
-        self.version_string = hs.version_string
+        self.version_string = hs.version_string.encode('ascii')
         self._next_id = 1
+        self.default_timeout = 60
 
     def _create_url(self, destination, path_bytes, param_bytes, query_bytes):
-        return urlparse.urlunparse(
-            ("matrix", destination, path_bytes, param_bytes, query_bytes, "")
+        return urllib.parse.urlunparse(
+            (b"matrix", destination, path_bytes, param_bytes, query_bytes, b"")
         )
 
     @defer.inlineCallbacks
     def _request(self, destination, method, path,
-                 body_callback, headers_dict={}, param_bytes=b"",
-                 query_bytes=b"", retry_on_dns_fail=True,
+                 json=None, json_callback=None,
+                 param_bytes=b"",
+                 query=None, retry_on_dns_fail=True,
                  timeout=None, long_retries=False,
                  ignore_backoff=False,
                  backoff_on_404=False):
-        """ Creates and sends a request to the given server
+        """
+        Creates and sends a request to the given server.
+
         Args:
             destination (str): The remote server to send the HTTP request to.
             method (str): HTTP method
             path (str): The HTTP path
+            json (dict or None): JSON to send in the body.
+            json_callback (func or None): A callback to generate the JSON.
+            query (dict or None): Query arguments.
             ignore_backoff (bool): true to ignore the historical backoff data
                 and try the request anyway.
             backoff_on_404 (bool): Back off if we get a 404
@@ -132,6 +147,11 @@ class MatrixFederationHttpClient(object):
             (May also fail with plenty of other Exceptions for things like DNS
                 failures, connection failures, SSL failures.)
         """
+        if timeout:
+            _sec_timeout = timeout / 1000
+        else:
+            _sec_timeout = self.default_timeout
+
         if (
             self.hs.config.federation_domain_whitelist is not None and
             destination not in self.hs.config.federation_domain_whitelist
@@ -146,23 +166,25 @@ class MatrixFederationHttpClient(object):
             ignore_backoff=ignore_backoff,
         )
 
-        destination = destination.encode("ascii")
+        headers_dict = {}
         path_bytes = path.encode("ascii")
-        with limiter:
-            headers_dict[b"User-Agent"] = [self.version_string]
-            headers_dict[b"Host"] = [destination]
+        if query:
+            query_bytes = encode_query_args(query)
+        else:
+            query_bytes = b""
 
-            url_bytes = self._create_url(
-                destination, path_bytes, param_bytes, query_bytes
-            )
+        headers_dict = {
+            "User-Agent": [self.version_string],
+            "Host": [destination],
+        }
 
-            txn_id = "%s-O-%s" % (method, self._next_id)
-            self._next_id = (self._next_id + 1) % (sys.maxint - 1)
+        with limiter:
+            url = self._create_url(
+                destination.encode("ascii"), path_bytes, param_bytes, query_bytes
+            ).decode('ascii')
 
-            outbound_logger.info(
-                "{%s} [%s] Sending request: %s %s",
-                txn_id, destination, method, url_bytes
-            )
+            txn_id = "%s-O-%s" % (method, self._next_id)
+            self._next_id = (self._next_id + 1) % (MAXINT - 1)
 
             # XXX: Would be much nicer to retry only at the transaction-layer
             # (once we have reliable transactions in place)
@@ -171,80 +193,110 @@ class MatrixFederationHttpClient(object):
             else:
                 retries_left = MAX_SHORT_RETRIES
 
-            http_url_bytes = urlparse.urlunparse(
-                ("", "", path_bytes, param_bytes, query_bytes, "")
-            )
+            http_url = urllib.parse.urlunparse(
+                (b"", b"", path_bytes, param_bytes, query_bytes, b"")
+            ).decode('ascii')
 
             log_result = None
-            try:
-                while True:
-                    producer = None
-                    if body_callback:
-                        producer = body_callback(method, http_url_bytes, headers_dict)
-
-                    try:
-                        request_deferred = self.agent.request(
-                            method,
-                            url_bytes,
-                            Headers(headers_dict),
-                            producer
-                        )
-                        add_timeout_to_deferred(
-                            request_deferred,
-                            timeout / 1000. if timeout else 60,
-                            self.hs.get_reactor(),
-                            cancelled_to_request_timed_out_error,
+            while True:
+                try:
+                    if json_callback:
+                        json = json_callback()
+
+                    if json:
+                        data = encode_canonical_json(json)
+                        headers_dict["Content-Type"] = ["application/json"]
+                        self.sign_request(
+                            destination, method, http_url, headers_dict, json
                         )
+                    else:
+                        data = None
+                        self.sign_request(destination, method, http_url, headers_dict)
+
+                    outbound_logger.info(
+                        "{%s} [%s] Sending request: %s %s",
+                        txn_id, destination, method, url
+                    )
+
+                    request_deferred = treq.request(
+                        method,
+                        url,
+                        headers=Headers(headers_dict),
+                        data=data,
+                        agent=self.agent,
+                        reactor=self.hs.get_reactor(),
+                        unbuffered=True
+                    )
+                    request_deferred.addTimeout(_sec_timeout, self.hs.get_reactor())
+
+                    # Sometimes the timeout above doesn't work, so lets hack yet
+                    # another layer of timeouts in in the vain hope that at some
+                    # point the world made sense and this really really really
+                    # should work.
+                    request_deferred = timeout_no_seriously(
+                        request_deferred,
+                        timeout=_sec_timeout * 2,
+                        reactor=self.hs.get_reactor(),
+                    )
+
+                    with Measure(self.clock, "outbound_request"):
                         response = yield make_deferred_yieldable(
                             request_deferred,
                         )
 
-                        log_result = "%d %s" % (response.code, response.phrase,)
-                        break
-                    except Exception as e:
-                        if not retry_on_dns_fail and isinstance(e, DNSLookupError):
-                            logger.warn(
-                                "DNS Lookup failed to %s with %s",
-                                destination,
-                                e
-                            )
-                            log_result = "DNS Lookup failed to %s with %s" % (
-                                destination, e
-                            )
-                            raise
-
+                    log_result = "%d %s" % (response.code, response.phrase,)
+                    break
+                except Exception as e:
+                    if not retry_on_dns_fail and isinstance(e, DNSLookupError):
                         logger.warn(
-                            "{%s} Sending request failed to %s: %s %s: %s",
-                            txn_id,
+                            "DNS Lookup failed to %s with %s",
                             destination,
-                            method,
-                            url_bytes,
-                            _flatten_response_never_received(e),
+                            e
                         )
+                        log_result = "DNS Lookup failed to %s with %s" % (
+                            destination, e
+                        )
+                        raise
+
+                    logger.warn(
+                        "{%s} Sending request failed to %s: %s %s: %s",
+                        txn_id,
+                        destination,
+                        method,
+                        url,
+                        _flatten_response_never_received(e),
+                    )
+
+                    log_result = _flatten_response_never_received(e)
+
+                    if retries_left and not timeout:
+                        if long_retries:
+                            delay = 4 ** (MAX_LONG_RETRIES + 1 - retries_left)
+                            delay = min(delay, 60)
+                            delay *= random.uniform(0.8, 1.4)
+                        else:
+                            delay = 0.5 * 2 ** (MAX_SHORT_RETRIES - retries_left)
+                            delay = min(delay, 2)
+                            delay *= random.uniform(0.8, 1.4)
 
-                        log_result = _flatten_response_never_received(e)
-
-                        if retries_left and not timeout:
-                            if long_retries:
-                                delay = 4 ** (MAX_LONG_RETRIES + 1 - retries_left)
-                                delay = min(delay, 60)
-                                delay *= random.uniform(0.8, 1.4)
-                            else:
-                                delay = 0.5 * 2 ** (MAX_SHORT_RETRIES - retries_left)
-                                delay = min(delay, 2)
-                                delay *= random.uniform(0.8, 1.4)
+                        logger.debug(
+                            "{%s} Waiting %s before sending to %s...",
+                            txn_id,
+                            delay,
+                            destination
+                        )
 
-                            yield self.clock.sleep(delay)
-                            retries_left -= 1
-                        else:
-                            raise
-            finally:
-                outbound_logger.info(
-                    "{%s} [%s] Result: %s",
-                    txn_id,
-                    destination,
-                    log_result,
-                )
+                        yield self.clock.sleep(delay)
+                        retries_left -= 1
+                    else:
+                        raise
+                finally:
+                    outbound_logger.info(
+                        "{%s} [%s] Result: %s",
+                        txn_id,
+                        destination,
+                        log_result,
+                    )
 
             if 200 <= response.code < 300:
                 pass
@@ -252,7 +304,9 @@ class MatrixFederationHttpClient(object):
                 # :'(
                 # Update transactions table?
                 with logcontext.PreserveLoggingContext():
-                    body = yield readBody(response)
+                    d = treq.content(response)
+                    d.addTimeout(_sec_timeout, self.hs.get_reactor())
+                    body = yield make_deferred_yieldable(d)
                 raise HttpResponseException(
                     response.code, response.phrase, body
                 )
@@ -297,11 +351,11 @@ class MatrixFederationHttpClient(object):
         auth_headers = []
 
         for key, sig in request["signatures"][self.server_name].items():
-            auth_headers.append(bytes(
+            auth_headers.append((
                 "X-Matrix origin=%s,key=\"%s\",sig=\"%s\"" % (
                     self.server_name, key, sig,
-                )
-            ))
+                )).encode('ascii')
+            )
 
         headers_dict[b"Authorization"] = auth_headers
 
@@ -347,24 +401,14 @@ class MatrixFederationHttpClient(object):
         """
 
         if not json_data_callback:
-            def json_data_callback():
-                return data
-
-        def body_callback(method, url_bytes, headers_dict):
-            json_data = json_data_callback()
-            self.sign_request(
-                destination, method, url_bytes, headers_dict, json_data
-            )
-            producer = _JsonProducer(json_data)
-            return producer
+            json_data_callback = lambda: data
 
         response = yield self._request(
             destination,
             "PUT",
             path,
-            body_callback=body_callback,
-            headers_dict={"Content-Type": ["application/json"]},
-            query_bytes=encode_query_args(args),
+            json_callback=json_data_callback,
+            query=args,
             long_retries=long_retries,
             timeout=timeout,
             ignore_backoff=ignore_backoff,
@@ -376,8 +420,10 @@ class MatrixFederationHttpClient(object):
             check_content_type_is_json(response.headers)
 
         with logcontext.PreserveLoggingContext():
-            body = yield readBody(response)
-        defer.returnValue(json.loads(body))
+            d = treq.json_content(response)
+            d.addTimeout(self.default_timeout, self.hs.get_reactor())
+            body = yield make_deferred_yieldable(d)
+        defer.returnValue(body)
 
     @defer.inlineCallbacks
     def post_json(self, destination, path, data={}, long_retries=False,
@@ -410,20 +456,12 @@ class MatrixFederationHttpClient(object):
             Fails with ``FederationDeniedError`` if this destination
             is not on our federation whitelist
         """
-
-        def body_callback(method, url_bytes, headers_dict):
-            self.sign_request(
-                destination, method, url_bytes, headers_dict, data
-            )
-            return _JsonProducer(data)
-
         response = yield self._request(
             destination,
             "POST",
             path,
-            query_bytes=encode_query_args(args),
-            body_callback=body_callback,
-            headers_dict={"Content-Type": ["application/json"]},
+            query=args,
+            json=data,
             long_retries=long_retries,
             timeout=timeout,
             ignore_backoff=ignore_backoff,
@@ -434,9 +472,16 @@ class MatrixFederationHttpClient(object):
             check_content_type_is_json(response.headers)
 
         with logcontext.PreserveLoggingContext():
-            body = yield readBody(response)
+            d = treq.json_content(response)
+            if timeout:
+                _sec_timeout = timeout / 1000
+            else:
+                _sec_timeout = self.default_timeout
+
+            d.addTimeout(_sec_timeout, self.hs.get_reactor())
+            body = yield make_deferred_yieldable(d)
 
-        defer.returnValue(json.loads(body))
+        defer.returnValue(body)
 
     @defer.inlineCallbacks
     def get_json(self, destination, path, args=None, retry_on_dns_fail=True,
@@ -471,16 +516,11 @@ class MatrixFederationHttpClient(object):
 
         logger.debug("Query bytes: %s Retry DNS: %s", args, retry_on_dns_fail)
 
-        def body_callback(method, url_bytes, headers_dict):
-            self.sign_request(destination, method, url_bytes, headers_dict)
-            return None
-
         response = yield self._request(
             destination,
             "GET",
             path,
-            query_bytes=encode_query_args(args),
-            body_callback=body_callback,
+            query=args,
             retry_on_dns_fail=retry_on_dns_fail,
             timeout=timeout,
             ignore_backoff=ignore_backoff,
@@ -491,9 +531,11 @@ class MatrixFederationHttpClient(object):
             check_content_type_is_json(response.headers)
 
         with logcontext.PreserveLoggingContext():
-            body = yield readBody(response)
+            d = treq.json_content(response)
+            d.addTimeout(self.default_timeout, self.hs.get_reactor())
+            body = yield make_deferred_yieldable(d)
 
-        defer.returnValue(json.loads(body))
+        defer.returnValue(body)
 
     @defer.inlineCallbacks
     def delete_json(self, destination, path, long_retries=False,
@@ -523,13 +565,11 @@ class MatrixFederationHttpClient(object):
             Fails with ``FederationDeniedError`` if this destination
             is not on our federation whitelist
         """
-
         response = yield self._request(
             destination,
             "DELETE",
             path,
-            query_bytes=encode_query_args(args),
-            headers_dict={"Content-Type": ["application/json"]},
+            query=args,
             long_retries=long_retries,
             timeout=timeout,
             ignore_backoff=ignore_backoff,
@@ -540,9 +580,11 @@ class MatrixFederationHttpClient(object):
             check_content_type_is_json(response.headers)
 
         with logcontext.PreserveLoggingContext():
-            body = yield readBody(response)
+            d = treq.json_content(response)
+            d.addTimeout(self.default_timeout, self.hs.get_reactor())
+            body = yield make_deferred_yieldable(d)
 
-        defer.returnValue(json.loads(body))
+        defer.returnValue(body)
 
     @defer.inlineCallbacks
     def get_file(self, destination, path, output_stream, args={},
@@ -569,26 +611,11 @@ class MatrixFederationHttpClient(object):
             Fails with ``FederationDeniedError`` if this destination
             is not on our federation whitelist
         """
-
-        encoded_args = {}
-        for k, vs in args.items():
-            if isinstance(vs, string_types):
-                vs = [vs]
-            encoded_args[k] = [v.encode("UTF-8") for v in vs]
-
-        query_bytes = urllib.urlencode(encoded_args, True)
-        logger.debug("Query bytes: %s Retry DNS: %s", query_bytes, retry_on_dns_fail)
-
-        def body_callback(method, url_bytes, headers_dict):
-            self.sign_request(destination, method, url_bytes, headers_dict)
-            return None
-
         response = yield self._request(
             destination,
             "GET",
             path,
-            query_bytes=query_bytes,
-            body_callback=body_callback,
+            query=args,
             retry_on_dns_fail=retry_on_dns_fail,
             ignore_backoff=ignore_backoff,
         )
@@ -597,9 +624,9 @@ class MatrixFederationHttpClient(object):
 
         try:
             with logcontext.PreserveLoggingContext():
-                length = yield _readBodyToFile(
-                    response, output_stream, max_size
-                )
+                d = _readBodyToFile(response, output_stream, max_size)
+                d.addTimeout(self.default_timeout, self.hs.get_reactor())
+                length = yield make_deferred_yieldable(d)
         except Exception:
             logger.exception("Failed to download body")
             raise
@@ -639,30 +666,6 @@ def _readBodyToFile(response, stream, max_size):
     return d
 
 
-class _JsonProducer(object):
-    """ Used by the twisted http client to create the HTTP body from json
-    """
-    def __init__(self, jsn):
-        self.reset(jsn)
-
-    def reset(self, jsn):
-        self.body = encode_canonical_json(jsn)
-        self.length = len(self.body)
-
-    def startProducing(self, consumer):
-        consumer.write(self.body)
-        return defer.succeed(None)
-
-    def pauseProducing(self):
-        pass
-
-    def stopProducing(self):
-        pass
-
-    def resumeProducing(self):
-        pass
-
-
 def _flatten_response_never_received(e):
     if hasattr(e, "reasons"):
         reasons = ", ".join(
@@ -693,7 +696,7 @@ def check_content_type_is_json(headers):
             "No Content-Type header"
         )
 
-    c_type = c_type[0]  # only the first header
+    c_type = c_type[0].decode('ascii')  # only the first header
     val, options = cgi.parse_header(c_type)
     if val != "application/json":
         raise RuntimeError(
@@ -711,6 +714,6 @@ def encode_query_args(args):
             vs = [vs]
         encoded_args[k] = [v.encode("UTF-8") for v in vs]
 
-    query_bytes = urllib.urlencode(encoded_args, True)
+    query_bytes = urllib.parse.urlencode(encoded_args, True)
 
-    return query_bytes
+    return query_bytes.encode('utf8')