summary refs log tree commit diff
path: root/synapse/http/matrixfederationclient.py
diff options
context:
space:
mode:
authorMark Haines <mark.haines@matrix.org>2015-08-12 17:07:22 +0100
committerMark Haines <mark.haines@matrix.org>2015-08-12 17:21:14 +0100
commit998a72d4d9ec6e73000888dcdf51437ec427fbee (patch)
treef8fa9d5deb820b49eb3a216e194ee83e14fb9eda /synapse/http/matrixfederationclient.py
parentBump the version of twisted needed for setup_requires to 15.2.1 (diff)
parentMerge pull request #220 from matrix-org/markjh/generate_keys (diff)
downloadsynapse-998a72d4d9ec6e73000888dcdf51437ec427fbee.tar.xz
Merge branch 'develop' into markjh/twisted-15
Conflicts:
	synapse/http/matrixfederationclient.py
Diffstat (limited to 'synapse/http/matrixfederationclient.py')
-rw-r--r--synapse/http/matrixfederationclient.py145
1 files changed, 77 insertions, 68 deletions
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index ec5b06ddca..4d74bd5d78 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -35,11 +35,13 @@ from syutil.crypto.jsonsign import sign_json
 
 import simplejson as json
 import logging
+import sys
 import urllib
 import urlparse
 
 
 logger = logging.getLogger(__name__)
+outbound_logger = logging.getLogger("synapse.http.outbound")
 
 metrics = synapse.metrics.get_metrics_for(__name__)
 
@@ -86,6 +88,7 @@ class MatrixFederationHttpClient(object):
         )
         self.clock = hs.get_clock()
         self.version_string = hs.version_string
+        self._next_id = 1
 
     def _create_url(self, destination, path_bytes, param_bytes, query_bytes):
         return urlparse.urlunparse(
@@ -106,16 +109,12 @@ class MatrixFederationHttpClient(object):
             destination, path_bytes, param_bytes, query_bytes
         )
 
-        logger.info("Sending request to %s: %s %s",
-                    destination, method, url_bytes)
+        txn_id = "%s-O-%s" % (method, self._next_id)
+        self._next_id = (self._next_id + 1) % (sys.maxint - 1)
 
-        logger.debug(
-            "Types: %s",
-            [
-                type(destination), type(method), type(path_bytes),
-                type(param_bytes),
-                type(query_bytes)
-            ]
+        outbound_logger.info(
+            "{%s} [%s] Sending request: %s %s",
+            txn_id, destination, method, url_bytes
         )
 
         # XXX: Would be much nicer to retry only at the transaction-layer
@@ -126,66 +125,80 @@ class MatrixFederationHttpClient(object):
             ("", "", path_bytes, param_bytes, query_bytes, "")
         )
 
-        while True:
-            producer = None
-            if body_callback:
-                producer = body_callback(method, http_url_bytes, headers_dict)
-
-            try:
-                request_deferred = preserve_context_over_fn(
-                    self.agent.request,
-                    method,
-                    url_bytes,
-                    Headers(headers_dict),
-                    producer
-                )
+        log_result = None
+        try:
+            while True:
+                producer = None
+                if body_callback:
+                    producer = body_callback(method, http_url_bytes, headers_dict)
+
+                try:
+                    def send_request():
+                        request_deferred = preserve_context_over_fn(
+                            self.agent.request,
+                            method,
+                            url_bytes,
+                            Headers(headers_dict),
+                            producer
+                        )
+
+
+                        return self.clock.time_bound_deferred(
+                            request_deferred,
+                            time_out=timeout/1000. if timeout else 60,
+                        )
+
+                    response = yield preserve_context_over_fn(
+                        send_request,
+                    )
 
-                response = yield self.clock.time_bound_deferred(
-                    request_deferred,
-                    time_out=timeout/1000. if timeout else 60,
-                )
+                    log_result = "%d %s" % (response.code, response.phrase,)
+                    break
+                except Exception as e:
+                    if not retry_on_dns_fail and isinstance(e, DNSLookupError):
+                        logger.warn(
+                            "DNS Lookup failed to %s with %s",
+                            destination,
+                            e
+                        )
+                        log_result = "DNS Lookup failed to %s with %s" % (
+                            destination, e
+                        )
+                        raise
 
-                logger.debug("Got response to %s", method)
-                break
-            except Exception as e:
-                if not retry_on_dns_fail and isinstance(e, DNSLookupError):
                     logger.warn(
-                        "DNS Lookup failed to %s with %s",
+                        "{%s} Sending request failed to %s: %s %s: %s - %s",
+                        txn_id,
                         destination,
-                        e
+                        method,
+                        url_bytes,
+                        type(e).__name__,
+                        _flatten_response_never_received(e),
                     )
-                    raise
-
-                logger.warn(
-                    "Sending request failed to %s: %s %s: %s - %s",
-                    destination,
-                    method,
-                    url_bytes,
-                    type(e).__name__,
-                    _flatten_response_never_received(e),
-                )
 
-                if retries_left and not timeout:
-                    yield sleep(2 ** (5 - retries_left))
-                    retries_left -= 1
-                else:
-                    raise
-
-        logger.info(
-            "Received response %d %s for %s: %s %s",
-            response.code,
-            response.phrase,
-            destination,
-            method,
-            url_bytes
-        )
+                    log_result = "%s - %s" % (
+                        type(e).__name__, _flatten_response_never_received(e),
+                    )
+
+                    if retries_left and not timeout:
+                        yield sleep(2 ** (5 - retries_left))
+                        retries_left -= 1
+                    else:
+                        raise
+        finally:
+            outbound_logger.info(
+                "{%s} [%s] Result: %s",
+                txn_id,
+                destination,
+                log_result,
+            )
 
         if 200 <= response.code < 300:
             pass
         else:
             # :'(
             # Update transactions table?
-            body = yield readBody(response)
+            body = yield preserve_context_over_fn(readBody, response)
             raise HttpResponseException(
                 response.code, response.phrase, body
             )
@@ -265,10 +278,7 @@ class MatrixFederationHttpClient(object):
                     "Content-Type not application/json"
                 )
 
-        logger.debug("Getting resp body")
-        body = yield readBody(response)
-        logger.debug("Got resp body")
-
+        body = yield preserve_context_over_fn(readBody, response)
         defer.returnValue(json.loads(body))
 
     @defer.inlineCallbacks
@@ -311,9 +321,7 @@ class MatrixFederationHttpClient(object):
                     "Content-Type not application/json"
                 )
 
-        logger.debug("Getting resp body")
-        body = yield readBody(response)
-        logger.debug("Got resp body")
+        body = yield preserve_context_over_fn(readBody, response)
 
         defer.returnValue(json.loads(body))
 
@@ -371,9 +379,7 @@ class MatrixFederationHttpClient(object):
                     "Content-Type not application/json"
                 )
 
-        logger.debug("Getting resp body")
-        body = yield readBody(response)
-        logger.debug("Got resp body")
+        body = yield preserve_context_over_fn(readBody, response)
 
         defer.returnValue(json.loads(body))
 
@@ -416,7 +422,10 @@ class MatrixFederationHttpClient(object):
         headers = dict(response.headers.getAllRawHeaders())
 
         try:
-            length = yield _readBodyToFile(response, output_stream, max_size)
+            length = yield preserve_context_over_fn(
+                _readBodyToFile,
+                response, output_stream, max_size
+            )
         except:
             logger.exception("Failed to download body")
             raise