diff --git a/changelog.d/5986.feature b/changelog.d/5986.feature
new file mode 100644
index 0000000000..f56aec1b32
--- /dev/null
+++ b/changelog.d/5986.feature
@@ -0,0 +1 @@
+Trace replication send times.
diff --git a/changelog.d/5988.bugfix b/changelog.d/5988.bugfix
new file mode 100644
index 0000000000..5c3597cb53
--- /dev/null
+++ b/changelog.d/5988.bugfix
@@ -0,0 +1 @@
+Fix invalid references to None while opentracing if the log context slips.
diff --git a/synapse/http/client.py b/synapse/http/client.py
index 0ae6db8ea7..51765ae3c0 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -46,6 +46,7 @@ from synapse.http import (
redact_uri,
)
from synapse.logging.context import make_deferred_yieldable
+from synapse.logging.opentracing import set_tag, start_active_span, tags
from synapse.util.async_helpers import timeout_deferred
from synapse.util.caches import CACHE_SIZE_FACTOR
@@ -269,42 +270,56 @@ class SimpleHttpClient(object):
# log request but strip `access_token` (AS requests for example include this)
logger.info("Sending request %s %s", method, redact_uri(uri))
- try:
- body_producer = None
- if data is not None:
- body_producer = QuieterFileBodyProducer(BytesIO(data))
-
- request_deferred = treq.request(
- method,
- uri,
- agent=self.agent,
- data=body_producer,
- headers=headers,
- **self._extra_treq_args
- )
- request_deferred = timeout_deferred(
- request_deferred,
- 60,
- self.hs.get_reactor(),
- cancelled_to_request_timed_out_error,
- )
- response = yield make_deferred_yieldable(request_deferred)
-
- incoming_responses_counter.labels(method, response.code).inc()
- logger.info(
- "Received response to %s %s: %s", method, redact_uri(uri), response.code
- )
- return response
- except Exception as e:
- incoming_responses_counter.labels(method, "ERR").inc()
- logger.info(
- "Error sending request to %s %s: %s %s",
- method,
- redact_uri(uri),
- type(e).__name__,
- e.args[0],
- )
- raise
+ with start_active_span(
+ "outgoing-client-request",
+ tags={
+ tags.SPAN_KIND: tags.SPAN_KIND_RPC_CLIENT,
+ tags.HTTP_METHOD: method,
+ tags.HTTP_URL: uri,
+ },
+ finish_on_close=True,
+ ):
+ try:
+ body_producer = None
+ if data is not None:
+ body_producer = QuieterFileBodyProducer(BytesIO(data))
+
+ request_deferred = treq.request(
+ method,
+ uri,
+ agent=self.agent,
+ data=body_producer,
+ headers=headers,
+ **self._extra_treq_args
+ )
+ request_deferred = timeout_deferred(
+ request_deferred,
+ 60,
+ self.hs.get_reactor(),
+ cancelled_to_request_timed_out_error,
+ )
+ response = yield make_deferred_yieldable(request_deferred)
+
+ incoming_responses_counter.labels(method, response.code).inc()
+ logger.info(
+ "Received response to %s %s: %s",
+ method,
+ redact_uri(uri),
+ response.code,
+ )
+ return response
+ except Exception as e:
+ incoming_responses_counter.labels(method, "ERR").inc()
+ logger.info(
+ "Error sending request to %s %s: %s %s",
+ method,
+ redact_uri(uri),
+ type(e).__name__,
+ e.args[0],
+ )
+ set_tag(tags.ERROR, True)
+ set_tag("error_reason", e.args[0])
+ raise
@defer.inlineCallbacks
def post_urlencoded_get_json(self, uri, args={}, headers=None):
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 4326e98a28..3f7c93ffcb 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -345,7 +345,6 @@ class MatrixFederationHttpClient(object):
else:
query_bytes = b""
- # Retreive current span
scope = start_active_span(
"outgoing-federation-request",
tags={
diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py
index 2c34b54702..8c574ddd28 100644
--- a/synapse/logging/opentracing.py
+++ b/synapse/logging/opentracing.py
@@ -239,8 +239,7 @@ _homeserver_whitelist = None
def only_if_tracing(func):
- """Executes the function only if we're tracing. Otherwise return.
- Assumes the function wrapped may return None"""
+ """Executes the function only if we're tracing. Otherwise returns None."""
@wraps(func)
def _only_if_tracing_inner(*args, **kwargs):
@@ -252,6 +251,41 @@ def only_if_tracing(func):
return _only_if_tracing_inner
+def ensure_active_span(message, ret=None):
+ """Executes the operation only if opentracing is enabled and there is an active span.
+ If there is no active span it logs message at the error level.
+
+ Args:
+ message (str): Message which fills in "There was no active span when trying to %s"
+ in the error log if there is no active span and opentracing is enabled.
+ ret (object): return value if opentracing is None or there is no active span.
+
+ Returns (object): The result of the func or ret if opentracing is disabled or there
+ was no active span.
+ """
+
+ def ensure_active_span_inner_1(func):
+ @wraps(func)
+ def ensure_active_span_inner_2(*args, **kwargs):
+ if not opentracing:
+ return ret
+
+ if not opentracing.tracer.active_span:
+ logger.error(
+ "There was no active span when trying to %s."
+ " Did you forget to start one or did a context slip?",
+ message,
+ )
+
+ return ret
+
+ return func(*args, **kwargs)
+
+ return ensure_active_span_inner_2
+
+ return ensure_active_span_inner_1
+
+
@contextlib.contextmanager
def _noop_context_manager(*args, **kwargs):
"""Does exactly what it says on the tin"""
@@ -349,26 +383,24 @@ def start_active_span(
if opentracing is None:
return _noop_context_manager()
- else:
- # We need to enter the scope here for the logcontext to become active
- return opentracing.tracer.start_active_span(
- operation_name,
- child_of=child_of,
- references=references,
- tags=tags,
- start_time=start_time,
- ignore_active_span=ignore_active_span,
- finish_on_close=finish_on_close,
- )
+ return opentracing.tracer.start_active_span(
+ operation_name,
+ child_of=child_of,
+ references=references,
+ tags=tags,
+ start_time=start_time,
+ ignore_active_span=ignore_active_span,
+ finish_on_close=finish_on_close,
+ )
def start_active_span_follows_from(operation_name, contexts):
if opentracing is None:
return _noop_context_manager()
- else:
- references = [opentracing.follows_from(context) for context in contexts]
- scope = start_active_span(operation_name, references=references)
- return scope
+
+ references = [opentracing.follows_from(context) for context in contexts]
+ scope = start_active_span(operation_name, references=references)
+ return scope
def start_active_span_from_request(
@@ -465,19 +497,19 @@ def start_active_span_from_edu(
# Opentracing setters for tags, logs, etc
-@only_if_tracing
+@ensure_active_span("set a tag")
def set_tag(key, value):
"""Sets a tag on the active span"""
opentracing.tracer.active_span.set_tag(key, value)
-@only_if_tracing
+@ensure_active_span("log")
def log_kv(key_values, timestamp=None):
"""Log to the active span"""
opentracing.tracer.active_span.log_kv(key_values, timestamp)
-@only_if_tracing
+@ensure_active_span("set the traces operation name")
def set_operation_name(operation_name):
"""Sets the operation name of the active span"""
opentracing.tracer.active_span.set_operation_name(operation_name)
@@ -486,7 +518,7 @@ def set_operation_name(operation_name):
# Injection and extraction
-@only_if_tracing
+@ensure_active_span("inject the span into a header")
def inject_active_span_twisted_headers(headers, destination, check_destination=True):
"""
Injects a span context into twisted headers in-place
@@ -522,7 +554,7 @@ def inject_active_span_twisted_headers(headers, destination, check_destination=T
headers.addRawHeaders(key, value)
-@only_if_tracing
+@ensure_active_span("inject the span into a byte dict")
def inject_active_span_byte_dict(headers, destination, check_destination=True):
"""
Injects a span context into a dict where the headers are encoded as byte
@@ -559,7 +591,7 @@ def inject_active_span_byte_dict(headers, destination, check_destination=True):
headers[key.encode()] = [value.encode()]
-@only_if_tracing
+@ensure_active_span("inject the span into a text map")
def inject_active_span_text_map(carrier, destination, check_destination=True):
"""
Injects a span context into a dict
@@ -591,6 +623,7 @@ def inject_active_span_text_map(carrier, destination, check_destination=True):
)
+@ensure_active_span("get the active span context as a dict", ret={})
def get_active_span_text_map(destination=None):
"""
Gets a span context as a dict. This can be used instead of manually
@@ -603,7 +636,7 @@ def get_active_span_text_map(destination=None):
dict: the active span's context if opentracing is enabled, otherwise empty.
"""
- if not opentracing or (destination and not whitelisted_homeserver(destination)):
+ if destination and not whitelisted_homeserver(destination):
return {}
carrier = {}
@@ -614,6 +647,7 @@ def get_active_span_text_map(destination=None):
return carrier
+@ensure_active_span("get the span context as a string.", ret={})
def active_span_context_as_string():
"""
Returns:
diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py
index afc9a8ff29..03560c1f0e 100644
--- a/synapse/replication/http/_base.py
+++ b/synapse/replication/http/_base.py
@@ -28,7 +28,11 @@ from synapse.api.errors import (
RequestSendFailed,
SynapseError,
)
-from synapse.logging.opentracing import inject_active_span_byte_dict, trace_servlet
+from synapse.logging.opentracing import (
+ inject_active_span_byte_dict,
+ trace,
+ trace_servlet,
+)
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.stringutils import random_string
@@ -129,6 +133,7 @@ class ReplicationEndpoint(object):
client = hs.get_simple_http_client()
+ @trace(opname="outgoing_replication_request")
@defer.inlineCallbacks
def send_request(**kwargs):
data = yield cls._serialize_payload(**kwargs)
|