summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/federation/transport/server/_base.py39
-rw-r--r--synapse/http/site.py30
-rw-r--r--synapse/logging/opentracing.py68
3 files changed, 64 insertions, 73 deletions
diff --git a/synapse/federation/transport/server/_base.py b/synapse/federation/transport/server/_base.py
index dc39e3537b..da1fbf8b63 100644
--- a/synapse/federation/transport/server/_base.py
+++ b/synapse/federation/transport/server/_base.py
@@ -22,13 +22,11 @@ from synapse.api.urls import FEDERATION_V1_PREFIX
 from synapse.http.server import HttpServer, ServletCallback
 from synapse.http.servlet import parse_json_object_from_request
 from synapse.http.site import SynapseRequest
-from synapse.logging import opentracing
 from synapse.logging.context import run_in_background
 from synapse.logging.opentracing import (
-    SynapseTags,
-    start_active_span,
-    start_active_span_from_request,
-    tags,
+    set_tag,
+    span_context_from_request,
+    start_active_span_follows_from,
     whitelisted_homeserver,
 )
 from synapse.server import HomeServer
@@ -279,30 +277,19 @@ class BaseFederationServlet:
                 logger.warning("authenticate_request failed: %s", e)
                 raise
 
-            request_tags = {
-                SynapseTags.REQUEST_ID: request.get_request_id(),
-                tags.SPAN_KIND: tags.SPAN_KIND_RPC_SERVER,
-                tags.HTTP_METHOD: request.get_method(),
-                tags.HTTP_URL: request.get_redacted_uri(),
-                tags.PEER_HOST_IPV6: request.getClientIP(),
-                "authenticated_entity": origin,
-                "servlet_name": request.request_metrics.name,
-            }
-
-            # Only accept the span context if the origin is authenticated
-            # and whitelisted
+            # update the active opentracing span with the authenticated entity
+            set_tag("authenticated_entity", origin)
+
+            # if the origin is authenticated and whitelisted, link to its span context
+            context = None
             if origin and whitelisted_homeserver(origin):
-                scope = start_active_span_from_request(
-                    request, "incoming-federation-request", tags=request_tags
-                )
-            else:
-                scope = start_active_span(
-                    "incoming-federation-request", tags=request_tags
-                )
+                context = span_context_from_request(request)
 
-            with scope:
-                opentracing.inject_response_headers(request.responseHeaders)
+            scope = start_active_span_follows_from(
+                "incoming-federation-request", contexts=(context,) if context else ()
+            )
 
+            with scope:
                 if origin and self.RATELIMIT:
                     with ratelimiter.ratelimit(origin) as d:
                         await d
diff --git a/synapse/http/site.py b/synapse/http/site.py
index 9f68d7e191..80f7a2ff58 100644
--- a/synapse/http/site.py
+++ b/synapse/http/site.py
@@ -14,7 +14,7 @@
 import contextlib
 import logging
 import time
-from typing import Any, Generator, Optional, Tuple, Union
+from typing import TYPE_CHECKING, Any, Generator, Optional, Tuple, Union
 
 import attr
 from zope.interface import implementer
@@ -35,6 +35,9 @@ from synapse.logging.context import (
 )
 from synapse.types import Requester
 
+if TYPE_CHECKING:
+    import opentracing
+
 logger = logging.getLogger(__name__)
 
 _next_request_seq = 0
@@ -81,6 +84,10 @@ class SynapseRequest(Request):
         # server name, for client requests this is the Requester object.
         self._requester: Optional[Union[Requester, str]] = None
 
+        # An opentracing span for this request. Will be closed when the request is
+        # completely processed.
+        self._opentracing_span: "Optional[opentracing.Span]" = None
+
         # we can't yet create the logcontext, as we don't know the method.
         self.logcontext: Optional[LoggingContext] = None
 
@@ -148,6 +155,13 @@ class SynapseRequest(Request):
         # If there's no authenticated entity, it was the requester.
         self.logcontext.request.authenticated_entity = authenticated_entity or requester
 
+    def set_opentracing_span(self, span: "opentracing.Span") -> None:
+        """attach an opentracing span to this request
+
+        Doing so will cause the span to be closed when we finish processing the request
+        """
+        self._opentracing_span = span
+
     def get_request_id(self) -> str:
         return "%s-%i" % (self.get_method(), self.request_seq)
 
@@ -286,6 +300,9 @@ class SynapseRequest(Request):
             self._processing_finished_time = time.time()
             self._is_processing = False
 
+            if self._opentracing_span:
+                self._opentracing_span.log_kv({"event": "finished processing"})
+
             # if we've already sent the response, log it now; otherwise, we wait for the
             # response to be sent.
             if self.finish_time is not None:
@@ -299,6 +316,8 @@ class SynapseRequest(Request):
         """
         self.finish_time = time.time()
         Request.finish(self)
+        if self._opentracing_span:
+            self._opentracing_span.log_kv({"event": "response sent"})
         if not self._is_processing:
             assert self.logcontext is not None
             with PreserveLoggingContext(self.logcontext):
@@ -333,6 +352,11 @@ class SynapseRequest(Request):
         with PreserveLoggingContext(self.logcontext):
             logger.info("Connection from client lost before response was sent")
 
+            if self._opentracing_span:
+                self._opentracing_span.log_kv(
+                    {"event": "client connection lost", "reason": str(reason.value)}
+                )
+
             if not self._is_processing:
                 self._finished_processing()
 
@@ -421,6 +445,10 @@ class SynapseRequest(Request):
             usage.evt_db_fetch_count,
         )
 
+        # complete the opentracing span, if any.
+        if self._opentracing_span:
+            self._opentracing_span.finish()
+
         try:
             self.request_metrics.stop(self.finish_time, self.code, self.sentLength)
         except Exception as e:
diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py
index 5d93ab07f1..6364290615 100644
--- a/synapse/logging/opentracing.py
+++ b/synapse/logging/opentracing.py
@@ -173,6 +173,7 @@ from typing import TYPE_CHECKING, Collection, Dict, List, Optional, Pattern, Typ
 import attr
 
 from twisted.internet import defer
+from twisted.web.http import Request
 from twisted.web.http_headers import Headers
 
 from synapse.config import ConfigError
@@ -490,48 +491,6 @@ def start_active_span_follows_from(
     return scope
 
 
-def start_active_span_from_request(
-    request,
-    operation_name,
-    references=None,
-    tags=None,
-    start_time=None,
-    ignore_active_span=False,
-    finish_on_close=True,
-):
-    """
-    Extracts a span context from a Twisted Request.
-    args:
-        headers (twisted.web.http.Request)
-
-        For the other args see opentracing.tracer
-
-    returns:
-        span_context (opentracing.span.SpanContext)
-    """
-    # Twisted encodes the values as lists whereas opentracing doesn't.
-    # So, we take the first item in the list.
-    # Also, twisted uses byte arrays while opentracing expects strings.
-
-    if opentracing is None:
-        return noop_context_manager()  # type: ignore[unreachable]
-
-    header_dict = {
-        k.decode(): v[0].decode() for k, v in request.requestHeaders.getAllRawHeaders()
-    }
-    context = opentracing.tracer.extract(opentracing.Format.HTTP_HEADERS, header_dict)
-
-    return opentracing.tracer.start_active_span(
-        operation_name,
-        child_of=context,
-        references=references,
-        tags=tags,
-        start_time=start_time,
-        ignore_active_span=ignore_active_span,
-        finish_on_close=finish_on_close,
-    )
-
-
 def start_active_span_from_edu(
     edu_content,
     operation_name,
@@ -743,6 +702,20 @@ def active_span_context_as_string():
     return json_encoder.encode(carrier)
 
 
+def span_context_from_request(request: Request) -> "Optional[opentracing.SpanContext]":
+    """Extract an opentracing context from the headers on an HTTP request
+
+    This is useful when we have received an HTTP request from another part of our
+    system, and want to link our spans to those of the remote system.
+    """
+    if not opentracing:
+        return None
+    header_dict = {
+        k.decode(): v[0].decode() for k, v in request.requestHeaders.getAllRawHeaders()
+    }
+    return opentracing.tracer.extract(opentracing.Format.HTTP_HEADERS, header_dict)
+
+
 @only_if_tracing
 def span_context_from_string(carrier):
     """
@@ -882,10 +855,13 @@ def trace_servlet(request: "SynapseRequest", extract_context: bool = False):
     }
 
     request_name = request.request_metrics.name
-    if extract_context:
-        scope = start_active_span_from_request(request, request_name)
-    else:
-        scope = start_active_span(request_name)
+    context = span_context_from_request(request) if extract_context else None
+
+    # we configure the scope not to finish the span immediately on exit, and instead
+    # pass the span into the SynapseRequest, which will finish it once we've finished
+    # sending the response to the client.
+    scope = start_active_span(request_name, child_of=context, finish_on_close=False)
+    request.set_opentracing_span(scope.span)
 
     with scope:
         inject_response_headers(request.responseHeaders)