summary refs log tree commit diff
path: root/synapse/http
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/http')
-rw-r--r--synapse/http/matrixfederationclient.py209
1 files changed, 123 insertions, 86 deletions
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 6a1fc8ca55..cf920bc041 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -177,11 +177,6 @@ class MatrixFederationHttpClient(object):
             txn_id = "%s-O-%s" % (method, self._next_id)
             self._next_id = (self._next_id + 1) % (MAXINT - 1)
 
-            outbound_logger.info(
-                "{%s} [%s] Sending request: %s %s",
-                txn_id, destination, method, url
-            )
-
             # XXX: Would be much nicer to retry only at the transaction-layer
             # (once we have reliable transactions in place)
             if long_retries:
@@ -194,85 +189,89 @@ class MatrixFederationHttpClient(object):
             ).decode('ascii')
 
             log_result = None
-            try:
-                while True:
-                    try:
-                        if json_callback:
-                            json = json_callback()
-
-                        if json:
-                            data = encode_canonical_json(json)
-                            headers_dict["Content-Type"] = ["application/json"]
-                            self.sign_request(
-                                destination, method, http_url, headers_dict, json
-                            )
-                        else:
-                            data = None
-                            self.sign_request(destination, method, http_url, headers_dict)
-
-                        request_deferred = treq.request(
-                            method,
-                            url,
-                            headers=Headers(headers_dict),
-                            data=data,
-                            agent=self.agent,
-                        )
-                        add_timeout_to_deferred(
-                            request_deferred,
-                            timeout / 1000. if timeout else 60,
-                            self.hs.get_reactor(),
-                            cancelled_to_request_timed_out_error,
+            while True:
+                try:
+                    if json_callback:
+                        json = json_callback()
+
+                    if json:
+                        data = encode_canonical_json(json)
+                        headers_dict["Content-Type"] = ["application/json"]
+                        self.sign_request(
+                            destination, method, http_url, headers_dict, json
                         )
-                        response = yield make_deferred_yieldable(
-                            request_deferred,
-                        )
-
-                        log_result = "%d %s" % (response.code, response.phrase,)
-                        break
-                    except Exception as e:
-                        if not retry_on_dns_fail and isinstance(e, DNSLookupError):
-                            logger.warn(
-                                "DNS Lookup failed to %s with %s",
-                                destination,
-                                e
-                            )
-                            log_result = "DNS Lookup failed to %s with %s" % (
-                                destination, e
-                            )
-                            raise
-
+                    else:
+                        data = None
+                        self.sign_request(destination, method, http_url, headers_dict)
+
+                    outbound_logger.info(
+                        "{%s} [%s] Sending request: %s %s",
+                        txn_id, destination, method, url
+                    )
+
+                    request_deferred = treq.request(
+                        method,
+                        url,
+                        headers=Headers(headers_dict),
+                        data=data,
+                        agent=self.agent,
+                    )
+                    add_timeout_to_deferred(
+                        request_deferred,
+                        timeout / 1000. if timeout else 60,
+                        self.hs.get_reactor(),
+                        cancelled_to_request_timed_out_error,
+                    )
+                    response = yield make_deferred_yieldable(
+                        request_deferred,
+                    )
+
+                    log_result = "%d %s" % (response.code, response.phrase,)
+                    break
+                except Exception as e:
+                    if not retry_on_dns_fail and isinstance(e, DNSLookupError):
                         logger.warn(
-                            "{%s} Sending request failed to %s: %s %s: %s",
-                            txn_id,
+                            "DNS Lookup failed to %s with %s",
                             destination,
-                            method,
-                            url,
-                            _flatten_response_never_received(e),
+                            e
                         )
-
-                        log_result = _flatten_response_never_received(e)
-
-                        if retries_left and not timeout:
-                            if long_retries:
-                                delay = 4 ** (MAX_LONG_RETRIES + 1 - retries_left)
-                                delay = min(delay, 60)
-                                delay *= random.uniform(0.8, 1.4)
-                            else:
-                                delay = 0.5 * 2 ** (MAX_SHORT_RETRIES - retries_left)
-                                delay = min(delay, 2)
-                                delay *= random.uniform(0.8, 1.4)
-
-                            yield self.clock.sleep(delay)
-                            retries_left -= 1
+                        log_result = "DNS Lookup failed to %s with %s" % (
+                            destination, e
+                        )
+                        raise
+
+                    logger.warn(
+                        "{%s} Sending request failed to %s: %s %s: %s",
+                        txn_id,
+                        destination,
+                        method,
+                        url,
+                        _flatten_response_never_received(e),
+                    )
+
+                    log_result = _flatten_response_never_received(e)
+
+                    if retries_left and not timeout:
+                        if long_retries:
+                            delay = 4 ** (MAX_LONG_RETRIES + 1 - retries_left)
+                            delay = min(delay, 60)
+                            delay *= random.uniform(0.8, 1.4)
                         else:
-                            raise
-            finally:
-                outbound_logger.info(
-                    "{%s} [%s] Result: %s",
-                    txn_id,
-                    destination,
-                    log_result,
-                )
+                            delay = 0.5 * 2 ** (MAX_SHORT_RETRIES - retries_left)
+                            delay = min(delay, 2)
+                            delay *= random.uniform(0.8, 1.4)
+
+                        yield self.clock.sleep(delay)
+                        retries_left -= 1
+                    else:
+                        raise
+                finally:
+                    outbound_logger.info(
+                        "{%s} [%s] Result: %s",
+                        txn_id,
+                        destination,
+                        log_result,
+                    )
 
             if 200 <= response.code < 300:
                 pass
@@ -280,7 +279,10 @@ class MatrixFederationHttpClient(object):
                 # :'(
                 # Update transactions table?
                 with logcontext.PreserveLoggingContext():
-                    body = yield treq.content(response)
+                    body = yield self._timeout_deferred(
+                        treq.content(response),
+                        timeout,
+                    )
                 raise HttpResponseException(
                     response.code, response.phrase, body
                 )
@@ -394,7 +396,10 @@ class MatrixFederationHttpClient(object):
             check_content_type_is_json(response.headers)
 
         with logcontext.PreserveLoggingContext():
-            body = yield treq.json_content(response)
+            body = yield self._timeout_deferred(
+                treq.json_content(response),
+                timeout,
+            )
         defer.returnValue(body)
 
     @defer.inlineCallbacks
@@ -444,7 +449,10 @@ class MatrixFederationHttpClient(object):
             check_content_type_is_json(response.headers)
 
         with logcontext.PreserveLoggingContext():
-            body = yield treq.json_content(response)
+            body = yield self._timeout_deferred(
+                treq.json_content(response),
+                timeout,
+            )
 
         defer.returnValue(body)
 
@@ -496,7 +504,10 @@ class MatrixFederationHttpClient(object):
             check_content_type_is_json(response.headers)
 
         with logcontext.PreserveLoggingContext():
-            body = yield treq.json_content(response)
+            body = yield self._timeout_deferred(
+                treq.json_content(response),
+                timeout,
+            )
 
         defer.returnValue(body)
 
@@ -543,7 +554,10 @@ class MatrixFederationHttpClient(object):
             check_content_type_is_json(response.headers)
 
         with logcontext.PreserveLoggingContext():
-            body = yield treq.json_content(response)
+            body = yield self._timeout_deferred(
+                treq.json_content(response),
+                timeout,
+            )
 
         defer.returnValue(body)
 
@@ -585,8 +599,10 @@ class MatrixFederationHttpClient(object):
 
         try:
             with logcontext.PreserveLoggingContext():
-                length = yield _readBodyToFile(
-                    response, output_stream, max_size
+                length = yield self._timeout_deferred(
+                    _readBodyToFile(
+                        response, output_stream, max_size
+                    ),
                 )
         except Exception:
             logger.exception("Failed to download body")
@@ -594,6 +610,27 @@ class MatrixFederationHttpClient(object):
 
         defer.returnValue((length, headers))
 
+    def _timeout_deferred(self, deferred, timeout_ms=None):
+        """Times the deferred out after `timeout_ms` ms
+
+        Args:
+            deferred (Deferred)
+            timeout_ms (int|None): Timeout in milliseconds. If None defaults
+                to 60 seconds.
+
+        Returns:
+            Deferred
+        """
+
+        add_timeout_to_deferred(
+            deferred,
+            timeout_ms / 1000. if timeout_ms else 60,
+            self.hs.get_reactor(),
+            cancelled_to_request_timed_out_error,
+        )
+
+        return deferred
+
 
 class _ReadBodyToFileProtocol(protocol.Protocol):
     def __init__(self, stream, deferred, max_size):