summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2019-09-06 09:53:37 +0100
committerErik Johnston <erik@matrix.org>2019-09-06 09:53:37 +0100
commit68f53b7a0e923593750e118dc9cb7842226a8b3c (patch)
treeca42cadddb70cf03448eee510eb555c5e05142da
parentMerge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes (diff)
parentTrace how long it takes for the send trasaction to complete, including retrys... (diff)
downloadsynapse-68f53b7a0e923593750e118dc9cb7842226a8b3c.tar.xz
Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes
-rw-r--r--changelog.d/5986.feature1
-rw-r--r--changelog.d/5988.bugfix1
-rw-r--r--synapse/http/client.py87
-rw-r--r--synapse/http/matrixfederationclient.py1
-rw-r--r--synapse/logging/opentracing.py82
-rw-r--r--synapse/replication/http/_base.py7
6 files changed, 117 insertions, 62 deletions
diff --git a/changelog.d/5986.feature b/changelog.d/5986.feature
new file mode 100644
index 0000000000..f56aec1b32
--- /dev/null
+++ b/changelog.d/5986.feature
@@ -0,0 +1 @@
+Trace replication send times.
diff --git a/changelog.d/5988.bugfix b/changelog.d/5988.bugfix
new file mode 100644
index 0000000000..5c3597cb53
--- /dev/null
+++ b/changelog.d/5988.bugfix
@@ -0,0 +1 @@
+Fix invalid references to None while opentracing if the log context slips.
diff --git a/synapse/http/client.py b/synapse/http/client.py
index 0ae6db8ea7..51765ae3c0 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -46,6 +46,7 @@ from synapse.http import (
     redact_uri,
 )
 from synapse.logging.context import make_deferred_yieldable
+from synapse.logging.opentracing import set_tag, start_active_span, tags
 from synapse.util.async_helpers import timeout_deferred
 from synapse.util.caches import CACHE_SIZE_FACTOR
 
@@ -269,42 +270,56 @@ class SimpleHttpClient(object):
         # log request but strip `access_token` (AS requests for example include this)
         logger.info("Sending request %s %s", method, redact_uri(uri))
 
-        try:
-            body_producer = None
-            if data is not None:
-                body_producer = QuieterFileBodyProducer(BytesIO(data))
-
-            request_deferred = treq.request(
-                method,
-                uri,
-                agent=self.agent,
-                data=body_producer,
-                headers=headers,
-                **self._extra_treq_args
-            )
-            request_deferred = timeout_deferred(
-                request_deferred,
-                60,
-                self.hs.get_reactor(),
-                cancelled_to_request_timed_out_error,
-            )
-            response = yield make_deferred_yieldable(request_deferred)
-
-            incoming_responses_counter.labels(method, response.code).inc()
-            logger.info(
-                "Received response to %s %s: %s", method, redact_uri(uri), response.code
-            )
-            return response
-        except Exception as e:
-            incoming_responses_counter.labels(method, "ERR").inc()
-            logger.info(
-                "Error sending request to  %s %s: %s %s",
-                method,
-                redact_uri(uri),
-                type(e).__name__,
-                e.args[0],
-            )
-            raise
+        with start_active_span(
+            "outgoing-client-request",
+            tags={
+                tags.SPAN_KIND: tags.SPAN_KIND_RPC_CLIENT,
+                tags.HTTP_METHOD: method,
+                tags.HTTP_URL: uri,
+            },
+            finish_on_close=True,
+        ):
+            try:
+                body_producer = None
+                if data is not None:
+                    body_producer = QuieterFileBodyProducer(BytesIO(data))
+
+                request_deferred = treq.request(
+                    method,
+                    uri,
+                    agent=self.agent,
+                    data=body_producer,
+                    headers=headers,
+                    **self._extra_treq_args
+                )
+                request_deferred = timeout_deferred(
+                    request_deferred,
+                    60,
+                    self.hs.get_reactor(),
+                    cancelled_to_request_timed_out_error,
+                )
+                response = yield make_deferred_yieldable(request_deferred)
+
+                incoming_responses_counter.labels(method, response.code).inc()
+                logger.info(
+                    "Received response to %s %s: %s",
+                    method,
+                    redact_uri(uri),
+                    response.code,
+                )
+                return response
+            except Exception as e:
+                incoming_responses_counter.labels(method, "ERR").inc()
+                logger.info(
+                    "Error sending request to  %s %s: %s %s",
+                    method,
+                    redact_uri(uri),
+                    type(e).__name__,
+                    e.args[0],
+                )
+                set_tag(tags.ERROR, True)
+                set_tag("error_reason", e.args[0])
+                raise
 
     @defer.inlineCallbacks
     def post_urlencoded_get_json(self, uri, args={}, headers=None):
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 4326e98a28..3f7c93ffcb 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -345,7 +345,6 @@ class MatrixFederationHttpClient(object):
         else:
             query_bytes = b""
 
-        # Retreive current span
         scope = start_active_span(
             "outgoing-federation-request",
             tags={
diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py
index 2c34b54702..8c574ddd28 100644
--- a/synapse/logging/opentracing.py
+++ b/synapse/logging/opentracing.py
@@ -239,8 +239,7 @@ _homeserver_whitelist = None
 
 
 def only_if_tracing(func):
-    """Executes the function only if we're tracing. Otherwise return.
-    Assumes the function wrapped may return None"""
+    """Executes the function only if we're tracing. Otherwise returns None."""
 
     @wraps(func)
     def _only_if_tracing_inner(*args, **kwargs):
@@ -252,6 +251,41 @@ def only_if_tracing(func):
     return _only_if_tracing_inner
 
 
+def ensure_active_span(message, ret=None):
+    """Executes the operation only if opentracing is enabled and there is an active span.
+    If there is no active span it logs message at the error level.
+
+    Args:
+        message (str): Message which fills in "There was no active span when trying to %s"
+            in the error log if there is no active span and opentracing is enabled.
+        ret (object): return value if opentracing is None or there is no active span.
+
+    Returns (object): The result of the func or ret if opentracing is disabled or there
+        was no active span.
+    """
+
+    def ensure_active_span_inner_1(func):
+        @wraps(func)
+        def ensure_active_span_inner_2(*args, **kwargs):
+            if not opentracing:
+                return ret
+
+            if not opentracing.tracer.active_span:
+                logger.error(
+                    "There was no active span when trying to %s."
+                    " Did you forget to start one or did a context slip?",
+                    message,
+                )
+
+                return ret
+
+            return func(*args, **kwargs)
+
+        return ensure_active_span_inner_2
+
+    return ensure_active_span_inner_1
+
+
 @contextlib.contextmanager
 def _noop_context_manager(*args, **kwargs):
     """Does exactly what it says on the tin"""
@@ -349,26 +383,24 @@ def start_active_span(
     if opentracing is None:
         return _noop_context_manager()
 
-    else:
-        # We need to enter the scope here for the logcontext to become active
-        return opentracing.tracer.start_active_span(
-            operation_name,
-            child_of=child_of,
-            references=references,
-            tags=tags,
-            start_time=start_time,
-            ignore_active_span=ignore_active_span,
-            finish_on_close=finish_on_close,
-        )
+    return opentracing.tracer.start_active_span(
+        operation_name,
+        child_of=child_of,
+        references=references,
+        tags=tags,
+        start_time=start_time,
+        ignore_active_span=ignore_active_span,
+        finish_on_close=finish_on_close,
+    )
 
 
 def start_active_span_follows_from(operation_name, contexts):
     if opentracing is None:
         return _noop_context_manager()
-    else:
-        references = [opentracing.follows_from(context) for context in contexts]
-        scope = start_active_span(operation_name, references=references)
-        return scope
+
+    references = [opentracing.follows_from(context) for context in contexts]
+    scope = start_active_span(operation_name, references=references)
+    return scope
 
 
 def start_active_span_from_request(
@@ -465,19 +497,19 @@ def start_active_span_from_edu(
 # Opentracing setters for tags, logs, etc
 
 
-@only_if_tracing
+@ensure_active_span("set a tag")
 def set_tag(key, value):
     """Sets a tag on the active span"""
     opentracing.tracer.active_span.set_tag(key, value)
 
 
-@only_if_tracing
+@ensure_active_span("log")
 def log_kv(key_values, timestamp=None):
     """Log to the active span"""
     opentracing.tracer.active_span.log_kv(key_values, timestamp)
 
 
-@only_if_tracing
+@ensure_active_span("set the traces operation name")
 def set_operation_name(operation_name):
     """Sets the operation name of the active span"""
     opentracing.tracer.active_span.set_operation_name(operation_name)
@@ -486,7 +518,7 @@ def set_operation_name(operation_name):
 # Injection and extraction
 
 
-@only_if_tracing
+@ensure_active_span("inject the span into a header")
 def inject_active_span_twisted_headers(headers, destination, check_destination=True):
     """
     Injects a span context into twisted headers in-place
@@ -522,7 +554,7 @@ def inject_active_span_twisted_headers(headers, destination, check_destination=T
         headers.addRawHeaders(key, value)
 
 
-@only_if_tracing
+@ensure_active_span("inject the span into a byte dict")
 def inject_active_span_byte_dict(headers, destination, check_destination=True):
     """
     Injects a span context into a dict where the headers are encoded as byte
@@ -559,7 +591,7 @@ def inject_active_span_byte_dict(headers, destination, check_destination=True):
         headers[key.encode()] = [value.encode()]
 
 
-@only_if_tracing
+@ensure_active_span("inject the span into a text map")
 def inject_active_span_text_map(carrier, destination, check_destination=True):
     """
     Injects a span context into a dict
@@ -591,6 +623,7 @@ def inject_active_span_text_map(carrier, destination, check_destination=True):
     )
 
 
+@ensure_active_span("get the active span context as a dict", ret={})
 def get_active_span_text_map(destination=None):
     """
     Gets a span context as a dict. This can be used instead of manually
@@ -603,7 +636,7 @@ def get_active_span_text_map(destination=None):
         dict: the active span's context if opentracing is enabled, otherwise empty.
     """
 
-    if not opentracing or (destination and not whitelisted_homeserver(destination)):
+    if destination and not whitelisted_homeserver(destination):
         return {}
 
     carrier = {}
@@ -614,6 +647,7 @@ def get_active_span_text_map(destination=None):
     return carrier
 
 
+@ensure_active_span("get the span context as a string.", ret={})
 def active_span_context_as_string():
     """
     Returns:
diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py
index afc9a8ff29..03560c1f0e 100644
--- a/synapse/replication/http/_base.py
+++ b/synapse/replication/http/_base.py
@@ -28,7 +28,11 @@ from synapse.api.errors import (
     RequestSendFailed,
     SynapseError,
 )
-from synapse.logging.opentracing import inject_active_span_byte_dict, trace_servlet
+from synapse.logging.opentracing import (
+    inject_active_span_byte_dict,
+    trace,
+    trace_servlet,
+)
 from synapse.util.caches.response_cache import ResponseCache
 from synapse.util.stringutils import random_string
 
@@ -129,6 +133,7 @@ class ReplicationEndpoint(object):
 
         client = hs.get_simple_http_client()
 
+        @trace(opname="outgoing_replication_request")
         @defer.inlineCallbacks
         def send_request(**kwargs):
             data = yield cls._serialize_payload(**kwargs)