diff options
Diffstat (limited to 'synapse/http')
-rw-r--r-- | synapse/http/matrixfederationclient.py | 209 |
1 files changed, 123 insertions, 86 deletions
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 6a1fc8ca55..cf920bc041 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -177,11 +177,6 @@ class MatrixFederationHttpClient(object): txn_id = "%s-O-%s" % (method, self._next_id) self._next_id = (self._next_id + 1) % (MAXINT - 1) - outbound_logger.info( - "{%s} [%s] Sending request: %s %s", - txn_id, destination, method, url - ) - # XXX: Would be much nicer to retry only at the transaction-layer # (once we have reliable transactions in place) if long_retries: @@ -194,85 +189,89 @@ class MatrixFederationHttpClient(object): ).decode('ascii') log_result = None - try: - while True: - try: - if json_callback: - json = json_callback() - - if json: - data = encode_canonical_json(json) - headers_dict["Content-Type"] = ["application/json"] - self.sign_request( - destination, method, http_url, headers_dict, json - ) - else: - data = None - self.sign_request(destination, method, http_url, headers_dict) - - request_deferred = treq.request( - method, - url, - headers=Headers(headers_dict), - data=data, - agent=self.agent, - ) - add_timeout_to_deferred( - request_deferred, - timeout / 1000. if timeout else 60, - self.hs.get_reactor(), - cancelled_to_request_timed_out_error, + while True: + try: + if json_callback: + json = json_callback() + + if json: + data = encode_canonical_json(json) + headers_dict["Content-Type"] = ["application/json"] + self.sign_request( + destination, method, http_url, headers_dict, json ) - response = yield make_deferred_yieldable( - request_deferred, - ) - - log_result = "%d %s" % (response.code, response.phrase,) - break - except Exception as e: - if not retry_on_dns_fail and isinstance(e, DNSLookupError): - logger.warn( - "DNS Lookup failed to %s with %s", - destination, - e - ) - log_result = "DNS Lookup failed to %s with %s" % ( - destination, e - ) - raise - + else: + data = None + self.sign_request(destination, method, http_url, headers_dict) + + outbound_logger.info( + "{%s} [%s] Sending request: %s %s", + txn_id, destination, method, url + ) + + request_deferred = treq.request( + method, + url, + headers=Headers(headers_dict), + data=data, + agent=self.agent, + ) + add_timeout_to_deferred( + request_deferred, + timeout / 1000. if timeout else 60, + self.hs.get_reactor(), + cancelled_to_request_timed_out_error, + ) + response = yield make_deferred_yieldable( + request_deferred, + ) + + log_result = "%d %s" % (response.code, response.phrase,) + break + except Exception as e: + if not retry_on_dns_fail and isinstance(e, DNSLookupError): logger.warn( - "{%s} Sending request failed to %s: %s %s: %s", - txn_id, + "DNS Lookup failed to %s with %s", destination, - method, - url, - _flatten_response_never_received(e), + e ) - - log_result = _flatten_response_never_received(e) - - if retries_left and not timeout: - if long_retries: - delay = 4 ** (MAX_LONG_RETRIES + 1 - retries_left) - delay = min(delay, 60) - delay *= random.uniform(0.8, 1.4) - else: - delay = 0.5 * 2 ** (MAX_SHORT_RETRIES - retries_left) - delay = min(delay, 2) - delay *= random.uniform(0.8, 1.4) - - yield self.clock.sleep(delay) - retries_left -= 1 + log_result = "DNS Lookup failed to %s with %s" % ( + destination, e + ) + raise + + logger.warn( + "{%s} Sending request failed to %s: %s %s: %s", + txn_id, + destination, + method, + url, + _flatten_response_never_received(e), + ) + + log_result = _flatten_response_never_received(e) + + if retries_left and not timeout: + if long_retries: + delay = 4 ** (MAX_LONG_RETRIES + 1 - retries_left) + delay = min(delay, 60) + delay *= random.uniform(0.8, 1.4) else: - raise - finally: - outbound_logger.info( - "{%s} [%s] Result: %s", - txn_id, - destination, - log_result, - ) + delay = 0.5 * 2 ** (MAX_SHORT_RETRIES - retries_left) + delay = min(delay, 2) + delay *= random.uniform(0.8, 1.4) + + yield self.clock.sleep(delay) + retries_left -= 1 + else: + raise + finally: + outbound_logger.info( + "{%s} [%s] Result: %s", + txn_id, + destination, + log_result, + ) if 200 <= response.code < 300: pass @@ -280,7 +279,10 @@ class MatrixFederationHttpClient(object): # :'( # Update transactions table? with logcontext.PreserveLoggingContext(): - body = yield treq.content(response) + body = yield self._timeout_deferred( + treq.content(response), + timeout, + ) raise HttpResponseException( response.code, response.phrase, body ) @@ -394,7 +396,10 @@ class MatrixFederationHttpClient(object): check_content_type_is_json(response.headers) with logcontext.PreserveLoggingContext(): - body = yield treq.json_content(response) + body = yield self._timeout_deferred( + treq.json_content(response), + timeout, + ) defer.returnValue(body) @defer.inlineCallbacks @@ -444,7 +449,10 @@ class MatrixFederationHttpClient(object): check_content_type_is_json(response.headers) with logcontext.PreserveLoggingContext(): - body = yield treq.json_content(response) + body = yield self._timeout_deferred( + treq.json_content(response), + timeout, + ) defer.returnValue(body) @@ -496,7 +504,10 @@ class MatrixFederationHttpClient(object): check_content_type_is_json(response.headers) with logcontext.PreserveLoggingContext(): - body = yield treq.json_content(response) + body = yield self._timeout_deferred( + treq.json_content(response), + timeout, + ) defer.returnValue(body) @@ -543,7 +554,10 @@ class MatrixFederationHttpClient(object): check_content_type_is_json(response.headers) with logcontext.PreserveLoggingContext(): - body = yield treq.json_content(response) + body = yield self._timeout_deferred( + treq.json_content(response), + timeout, + ) defer.returnValue(body) @@ -585,8 +599,10 @@ class MatrixFederationHttpClient(object): try: with logcontext.PreserveLoggingContext(): - length = yield _readBodyToFile( - response, output_stream, max_size + length = yield self._timeout_deferred( + _readBodyToFile( + response, output_stream, max_size + ), ) except Exception: logger.exception("Failed to download body") @@ -594,6 +610,27 @@ class MatrixFederationHttpClient(object): defer.returnValue((length, headers)) + def _timeout_deferred(self, deferred, timeout_ms=None): + """Times the deferred out after `timeout_ms` ms + + Args: + deferred (Deferred) + timeout_ms (int|None): Timeout in milliseconds. If None defaults + to 60 seconds. + + Returns: + Deferred + """ + + add_timeout_to_deferred( + deferred, + timeout_ms / 1000. if timeout_ms else 60, + self.hs.get_reactor(), + cancelled_to_request_timed_out_error, + ) + + return deferred + class _ReadBodyToFileProtocol(protocol.Protocol): def __init__(self, stream, deferred, max_size): |