From 1bf83a191bc2b202db5c85eb972469cb27aefd09 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Wed, 9 Jun 2021 11:33:00 +0100 Subject: Clean up the interface for injecting opentracing over HTTP (#10143) * Remove unused helper functions * Clean up the interface for injecting opentracing over HTTP * changelog --- synapse/logging/opentracing.py | 102 ++++++++--------------------------------- 1 file changed, 19 insertions(+), 83 deletions(-) (limited to 'synapse/logging/opentracing.py') diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py index dd9377340e..5b4725e035 100644 --- a/synapse/logging/opentracing.py +++ b/synapse/logging/opentracing.py @@ -168,7 +168,7 @@ import inspect import logging import re from functools import wraps -from typing import TYPE_CHECKING, Dict, Optional, Pattern, Type +from typing import TYPE_CHECKING, Dict, List, Optional, Pattern, Type import attr @@ -574,59 +574,22 @@ def set_operation_name(operation_name): # Injection and extraction -@ensure_active_span("inject the span into a header") -def inject_active_span_twisted_headers(headers, destination, check_destination=True): +@ensure_active_span("inject the span into a header dict") +def inject_header_dict( + headers: Dict[bytes, List[bytes]], + destination: Optional[str] = None, + check_destination: bool = True, +) -> None: """ - Injects a span context into twisted headers in-place + Injects a span context into a dict of HTTP headers Args: - headers (twisted.web.http_headers.Headers) - destination (str): address of entity receiving the span context. If check_destination - is true the context will only be injected if the destination matches the - opentracing whitelist + headers: the dict to inject headers into + destination: address of entity receiving the span context. Must be given unless + check_destination is False. The context will only be injected if the + destination matches the opentracing whitelist check_destination (bool): If false, destination will be ignored and the context will always be injected. - span (opentracing.Span) - - Returns: - In-place modification of headers - - Note: - The headers set by the tracer are custom to the tracer implementation which - should be unique enough that they don't interfere with any headers set by - synapse or twisted. If we're still using jaeger these headers would be those - here: - https://github.com/jaegertracing/jaeger-client-python/blob/master/jaeger_client/constants.py - """ - - if check_destination and not whitelisted_homeserver(destination): - return - - span = opentracing.tracer.active_span - carrier = {} # type: Dict[str, str] - opentracing.tracer.inject(span.context, opentracing.Format.HTTP_HEADERS, carrier) - - for key, value in carrier.items(): - headers.addRawHeaders(key, value) - - -@ensure_active_span("inject the span into a byte dict") -def inject_active_span_byte_dict(headers, destination, check_destination=True): - """ - Injects a span context into a dict where the headers are encoded as byte - strings - - Args: - headers (dict) - destination (str): address of entity receiving the span context. If check_destination - is true the context will only be injected if the destination matches the - opentracing whitelist - check_destination (bool): If false, destination will be ignored and the context - will always be injected. - span (opentracing.Span) - - Returns: - In-place modification of headers Note: The headers set by the tracer are custom to the tracer implementation which @@ -635,8 +598,13 @@ def inject_active_span_byte_dict(headers, destination, check_destination=True): here: https://github.com/jaegertracing/jaeger-client-python/blob/master/jaeger_client/constants.py """ - if check_destination and not whitelisted_homeserver(destination): - return + if check_destination: + if destination is None: + raise ValueError( + "destination must be given unless check_destination is False" + ) + if not whitelisted_homeserver(destination): + return span = opentracing.tracer.active_span @@ -647,38 +615,6 @@ def inject_active_span_byte_dict(headers, destination, check_destination=True): headers[key.encode()] = [value.encode()] -@ensure_active_span("inject the span into a text map") -def inject_active_span_text_map(carrier, destination, check_destination=True): - """ - Injects a span context into a dict - - Args: - carrier (dict) - destination (str): address of entity receiving the span context. If check_destination - is true the context will only be injected if the destination matches the - opentracing whitelist - check_destination (bool): If false, destination will be ignored and the context - will always be injected. - - Returns: - In-place modification of carrier - - Note: - The headers set by the tracer are custom to the tracer implementation which - should be unique enough that they don't interfere with any headers set by - synapse or twisted. If we're still using jaeger these headers would be those - here: - https://github.com/jaegertracing/jaeger-client-python/blob/master/jaeger_client/constants.py - """ - - if check_destination and not whitelisted_homeserver(destination): - return - - opentracing.tracer.inject( - opentracing.tracer.active_span.context, opentracing.Format.TEXT_MAP, carrier - ) - - @ensure_active_span("get the active span context as a dict", ret={}) def get_active_span_text_map(destination=None): """ -- cgit 1.5.1 From 9e405034e59569c00916a87f643d879a286a7a34 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Wed, 16 Jun 2021 11:41:15 +0100 Subject: Make opentracing trace into event persistence (#10134) * Trace event persistence When we persist a batch of events, set the parent opentracing span to the that from the request, so that we can trace all the way in. * changelog * When we force tracing, set a baggage item ... so that we can check again later. * Link in both directions between persist_events spans --- changelog.d/10134.misc | 1 + synapse/api/auth.py | 4 +-- synapse/logging/opentracing.py | 57 +++++++++++++++++++++++++++++++++++++-- synapse/storage/persist_events.py | 46 +++++++++++++++++++++++++++---- 4 files changed, 99 insertions(+), 9 deletions(-) create mode 100644 changelog.d/10134.misc (limited to 'synapse/logging/opentracing.py') diff --git a/changelog.d/10134.misc b/changelog.d/10134.misc new file mode 100644 index 0000000000..ce9702645d --- /dev/null +++ b/changelog.d/10134.misc @@ -0,0 +1 @@ +Improve OpenTracing for event persistence. diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 26a3b38918..cf4333a923 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -207,7 +207,7 @@ class Auth: request.requester = user_id if user_id in self._force_tracing_for_users: - opentracing.set_tag(opentracing.tags.SAMPLING_PRIORITY, 1) + opentracing.force_tracing() opentracing.set_tag("authenticated_entity", user_id) opentracing.set_tag("user_id", user_id) opentracing.set_tag("appservice_id", app_service.id) @@ -260,7 +260,7 @@ class Auth: request.requester = requester if user_info.token_owner in self._force_tracing_for_users: - opentracing.set_tag(opentracing.tags.SAMPLING_PRIORITY, 1) + opentracing.force_tracing() opentracing.set_tag("authenticated_entity", user_info.token_owner) opentracing.set_tag("user_id", user_info.user_id) if device_id: diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py index 5b4725e035..4f18792c99 100644 --- a/synapse/logging/opentracing.py +++ b/synapse/logging/opentracing.py @@ -168,7 +168,7 @@ import inspect import logging import re from functools import wraps -from typing import TYPE_CHECKING, Dict, List, Optional, Pattern, Type +from typing import TYPE_CHECKING, Collection, Dict, List, Optional, Pattern, Type import attr @@ -278,6 +278,10 @@ class SynapseTags: DB_TXN_ID = "db.txn_id" +class SynapseBaggage: + FORCE_TRACING = "synapse-force-tracing" + + # Block everything by default # A regex which matches the server_names to expose traces for. # None means 'block everything'. @@ -285,6 +289,8 @@ _homeserver_whitelist = None # type: Optional[Pattern[str]] # Util methods +Sentinel = object() + def only_if_tracing(func): """Executes the function only if we're tracing. Otherwise returns None.""" @@ -447,12 +453,28 @@ def start_active_span( ) -def start_active_span_follows_from(operation_name, contexts): +def start_active_span_follows_from( + operation_name: str, contexts: Collection, inherit_force_tracing=False +): + """Starts an active opentracing span, with additional references to previous spans + + Args: + operation_name: name of the operation represented by the new span + contexts: the previous spans to inherit from + inherit_force_tracing: if set, and any of the previous contexts have had tracing + forced, the new span will also have tracing forced. + """ if opentracing is None: return noop_context_manager() references = [opentracing.follows_from(context) for context in contexts] scope = start_active_span(operation_name, references=references) + + if inherit_force_tracing and any( + is_context_forced_tracing(ctx) for ctx in contexts + ): + force_tracing(scope.span) + return scope @@ -551,6 +573,10 @@ def start_active_span_from_edu( # Opentracing setters for tags, logs, etc +@only_if_tracing +def active_span(): + """Get the currently active span, if any""" + return opentracing.tracer.active_span @ensure_active_span("set a tag") @@ -571,6 +597,33 @@ def set_operation_name(operation_name): opentracing.tracer.active_span.set_operation_name(operation_name) +@only_if_tracing +def force_tracing(span=Sentinel) -> None: + """Force sampling for the active/given span and its children. + + Args: + span: span to force tracing for. By default, the active span. + """ + if span is Sentinel: + span = opentracing.tracer.active_span + if span is None: + logger.error("No active span in force_tracing") + return + + span.set_tag(opentracing.tags.SAMPLING_PRIORITY, 1) + + # also set a bit of baggage, so that we have a way of figuring out if + # it is enabled later + span.set_baggage_item(SynapseBaggage.FORCE_TRACING, "1") + + +def is_context_forced_tracing(span_context) -> bool: + """Check if sampling has been force for the given span context.""" + if span_context is None: + return False + return span_context.baggage.get(SynapseBaggage.FORCE_TRACING) is not None + + # Injection and extraction diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py index c11f6c5845..dc38942bb1 100644 --- a/synapse/storage/persist_events.py +++ b/synapse/storage/persist_events.py @@ -18,6 +18,7 @@ import itertools import logging from collections import deque from typing import ( + Any, Awaitable, Callable, Collection, @@ -40,6 +41,7 @@ from twisted.internet import defer from synapse.api.constants import EventTypes, Membership from synapse.events import EventBase from synapse.events.snapshot import EventContext +from synapse.logging import opentracing from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.databases import Databases @@ -103,12 +105,18 @@ times_pruned_extremities = Counter( ) -@attr.s(auto_attribs=True, frozen=True, slots=True) +@attr.s(auto_attribs=True, slots=True) class _EventPersistQueueItem: events_and_contexts: List[Tuple[EventBase, EventContext]] backfilled: bool deferred: ObservableDeferred + parent_opentracing_span_contexts: List = [] + """A list of opentracing spans waiting for this batch""" + + opentracing_span_context: Any = None + """The opentracing span under which the persistence actually happened""" + _PersistResult = TypeVar("_PersistResult") @@ -171,9 +179,27 @@ class _EventPeristenceQueue(Generic[_PersistResult]): ) queue.append(end_item) + # add our events to the queue item end_item.events_and_contexts.extend(events_and_contexts) + + # also add our active opentracing span to the item so that we get a link back + span = opentracing.active_span() + if span: + end_item.parent_opentracing_span_contexts.append(span.context) + + # start a processor for the queue, if there isn't one already self._handle_queue(room_id) - return await make_deferred_yieldable(end_item.deferred.observe()) + + # wait for the queue item to complete + res = await make_deferred_yieldable(end_item.deferred.observe()) + + # add another opentracing span which links to the persist trace. + with opentracing.start_active_span_follows_from( + "persist_event_batch_complete", (end_item.opentracing_span_context,) + ): + pass + + return res def _handle_queue(self, room_id): """Attempts to handle the queue for a room if not already being handled. @@ -200,9 +226,17 @@ class _EventPeristenceQueue(Generic[_PersistResult]): queue = self._get_drainining_queue(room_id) for item in queue: try: - ret = await self._per_item_callback( - item.events_and_contexts, item.backfilled - ) + with opentracing.start_active_span_follows_from( + "persist_event_batch", + item.parent_opentracing_span_contexts, + inherit_force_tracing=True, + ) as scope: + if scope: + item.opentracing_span_context = scope.span.context + + ret = await self._per_item_callback( + item.events_and_contexts, item.backfilled + ) except Exception: with PreserveLoggingContext(): item.deferred.errback() @@ -252,6 +286,7 @@ class EventsPersistenceStorage: self._event_persist_queue = _EventPeristenceQueue(self._persist_event_batch) self._state_resolution_handler = hs.get_state_resolution_handler() + @opentracing.trace async def persist_events( self, events_and_contexts: Iterable[Tuple[EventBase, EventContext]], @@ -307,6 +342,7 @@ class EventsPersistenceStorage: self.main_store.get_room_max_token(), ) + @opentracing.trace async def persist_event( self, event: EventBase, context: EventContext, backfilled: bool = False ) -> Tuple[EventBase, PersistedEventPosition, RoomStreamToken]: -- cgit 1.5.1 From 91fa9cca99f7cd1ba96baaf3f2c1b5c045dd1a7c Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Fri, 18 Jun 2021 11:43:22 +0100 Subject: Expose opentracing trace id in response headers (#10199) Fixes: #9480 --- changelog.d/10199.misc | 1 + synapse/federation/transport/server.py | 3 +++ synapse/logging/opentracing.py | 21 +++++++++++++++++++++ 3 files changed, 25 insertions(+) create mode 100644 changelog.d/10199.misc (limited to 'synapse/logging/opentracing.py') diff --git a/changelog.d/10199.misc b/changelog.d/10199.misc new file mode 100644 index 0000000000..69b18aeacc --- /dev/null +++ b/changelog.d/10199.misc @@ -0,0 +1 @@ +Expose opentracing trace id in response headers. diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 16d740cf58..bed47f8abd 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -35,6 +35,7 @@ from synapse.http.servlet import ( parse_string_from_args, parse_strings_from_args, ) +from synapse.logging import opentracing from synapse.logging.context import run_in_background from synapse.logging.opentracing import ( SynapseTags, @@ -345,6 +346,8 @@ class BaseFederationServlet: ) with scope: + opentracing.inject_response_headers(request.responseHeaders) + if origin and self.RATELIMIT: with ratelimiter.ratelimit(origin) as d: await d diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py index 4f18792c99..140ed711e3 100644 --- a/synapse/logging/opentracing.py +++ b/synapse/logging/opentracing.py @@ -173,6 +173,7 @@ from typing import TYPE_CHECKING, Collection, Dict, List, Optional, Pattern, Typ import attr from twisted.internet import defer +from twisted.web.http_headers import Headers from synapse.config import ConfigError from synapse.util import json_decoder, json_encoder @@ -668,6 +669,25 @@ def inject_header_dict( headers[key.encode()] = [value.encode()] +def inject_response_headers(response_headers: Headers) -> None: + """Inject the current trace id into the HTTP response headers""" + if not opentracing: + return + span = opentracing.tracer.active_span + if not span: + return + + # This is a bit implementation-specific. + # + # Jaeger's Spans have a trace_id property; other implementations (including the + # dummy opentracing.span.Span which we use if init_tracer is not called) do not + # expose it + trace_id = getattr(span, "trace_id", None) + + if trace_id is not None: + response_headers.addRawHeader("Synapse-Trace-Id", f"{trace_id:x}") + + @ensure_active_span("get the active span context as a dict", ret={}) def get_active_span_text_map(destination=None): """ @@ -843,6 +863,7 @@ def trace_servlet(request: "SynapseRequest", extract_context: bool = False): scope = start_active_span(request_name) with scope: + inject_response_headers(request.responseHeaders) try: yield finally: -- cgit 1.5.1