diff options
Diffstat (limited to '')
-rw-r--r-- | synapse/http/matrixfederationclient.py | 158 |
1 files changed, 84 insertions, 74 deletions
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 7f3d8fc884..ed47e701e7 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -35,11 +35,13 @@ from syutil.crypto.jsonsign import sign_json import simplejson as json import logging +import sys import urllib import urlparse logger = logging.getLogger(__name__) +outbound_logger = logging.getLogger("synapse.http.outbound") metrics = synapse.metrics.get_metrics_for(__name__) @@ -109,6 +111,8 @@ class MatrixFederationHttpClient(object): self.clock = hs.get_clock() self.version_string = hs.version_string + self._next_id = 1 + @defer.inlineCallbacks def _create_request(self, destination, method, path_bytes, body_callback, headers_dict={}, param_bytes=b"", @@ -123,88 +127,98 @@ class MatrixFederationHttpClient(object): ("", "", path_bytes, param_bytes, query_bytes, "",) ) - logger.info("Sending request to %s: %s %s", - destination, method, url_bytes) + txn_id = "%s-O-%s" % (method, self._next_id) + self._next_id = (self._next_id + 1) % (sys.maxint - 1) - logger.debug( - "Types: %s", - [ - type(destination), type(method), type(path_bytes), - type(param_bytes), - type(query_bytes) - ] + outbound_logger.info( + "{%s} [%s] Sending request: %s %s", + txn_id, destination, method, url_bytes ) # XXX: Would be much nicer to retry only at the transaction-layer # (once we have reliable transactions in place) retries_left = 5 - endpoint = self._getEndpoint(reactor, destination) - - while True: - producer = None - if body_callback: - producer = body_callback(method, url_bytes, headers_dict) - - try: - request_deferred = preserve_context_over_fn( - self.agent.request, - destination, - endpoint, - method, - path_bytes, - param_bytes, - query_bytes, - Headers(headers_dict), - producer - ) + endpoint = preserve_context_over_fn( + self._getEndpoint, reactor, destination + ) - response = yield self.clock.time_bound_deferred( - request_deferred, - time_out=timeout/1000. if timeout else 60, - ) + log_result = None + try: + while True: + producer = None + if body_callback: + producer = body_callback(method, url_bytes, headers_dict) + + try: + def send_request(): + request_deferred = self.agent.request( + destination, + endpoint, + method, + path_bytes, + param_bytes, + query_bytes, + Headers(headers_dict), + producer + ) + + return self.clock.time_bound_deferred( + request_deferred, + time_out=timeout/1000. if timeout else 60, + ) + + response = yield preserve_context_over_fn( + send_request, + ) + + log_result = "%d %s" % (response.code, response.phrase,) + break + except Exception as e: + if not retry_on_dns_fail and isinstance(e, DNSLookupError): + logger.warn( + "DNS Lookup failed to %s with %s", + destination, + e + ) + log_result = "DNS Lookup failed to %s with %s" % ( + destination, e + ) + raise - logger.debug("Got response to %s", method) - break - except Exception as e: - if not retry_on_dns_fail and isinstance(e, DNSLookupError): logger.warn( - "DNS Lookup failed to %s with %s", + "{%s} Sending request failed to %s: %s %s: %s - %s", + txn_id, destination, - e + method, + url_bytes, + type(e).__name__, + _flatten_response_never_received(e), ) - raise - - logger.warn( - "Sending request failed to %s: %s %s: %s - %s", - destination, - method, - url_bytes, - type(e).__name__, - _flatten_response_never_received(e), - ) - if retries_left and not timeout: - yield sleep(2 ** (5 - retries_left)) - retries_left -= 1 - else: - raise - - logger.info( - "Received response %d %s for %s: %s %s", - response.code, - response.phrase, - destination, - method, - url_bytes - ) + log_result = "%s - %s" % ( + type(e).__name__, _flatten_response_never_received(e), + ) + + if retries_left and not timeout: + yield sleep(2 ** (5 - retries_left)) + retries_left -= 1 + else: + raise + finally: + outbound_logger.info( + "{%s} [%s] Result: %s", + txn_id, + destination, + log_result, + ) if 200 <= response.code < 300: pass else: # :'( # Update transactions table? - body = yield readBody(response) + body = yield preserve_context_over_fn(readBody, response) raise HttpResponseException( response.code, response.phrase, body ) @@ -284,10 +298,7 @@ class MatrixFederationHttpClient(object): "Content-Type not application/json" ) - logger.debug("Getting resp body") - body = yield readBody(response) - logger.debug("Got resp body") - + body = yield preserve_context_over_fn(readBody, response) defer.returnValue(json.loads(body)) @defer.inlineCallbacks @@ -330,9 +341,7 @@ class MatrixFederationHttpClient(object): "Content-Type not application/json" ) - logger.debug("Getting resp body") - body = yield readBody(response) - logger.debug("Got resp body") + body = yield preserve_context_over_fn(readBody, response) defer.returnValue(json.loads(body)) @@ -390,9 +399,7 @@ class MatrixFederationHttpClient(object): "Content-Type not application/json" ) - logger.debug("Getting resp body") - body = yield readBody(response) - logger.debug("Got resp body") + body = yield preserve_context_over_fn(readBody, response) defer.returnValue(json.loads(body)) @@ -435,7 +442,10 @@ class MatrixFederationHttpClient(object): headers = dict(response.headers.getAllRawHeaders()) try: - length = yield _readBodyToFile(response, output_stream, max_size) + length = yield preserve_context_over_fn( + _readBodyToFile, + response, output_stream, max_size + ) except: logger.exception("Failed to download body") raise |