# Copyright 2019 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # NOTE This is a small wrapper around opentelemetry because tracing is optional # and not always packaged downstream. Since opentelemetry instrumentation is # fairly invasive it was awkward to make it optional. As a result we opted to # encapsulate all opentelemetry state in these methods which effectively noop if # opentelemetry is not present. We should strongly consider encouraging the # downstream distributers to package opentelemetry and making opentelemetry a # full dependency. In order to facilitate this move the methods have work very # similarly to opentelemetry's and it should only be a matter of few regexes to # move over to opentelemetry's access patterns proper. """ ============================ Using OpenTelemetry in Synapse ============================ Python-specific tracing concepts are at https://opentelemetry.io/docs/instrumentation/python/. Note that Synapse wraps OpenTelemetry in a small module (this one) in order to make the OpenTelemetry dependency optional. That means that some access patterns are different to those demonstrated in the OpenTelemetry guides. However, it is still useful to know, especially if OpenTelemetry is included as a full dependency in the future or if you are modifying this module. OpenTelemetry is encapsulated so that no span objects from OpenTelemetry are exposed in Synapse's code. This allows OpenTelemetry to be easily disabled in Synapse and thereby have OpenTelemetry as an optional dependency. This does however limit the number of modifiable spans at any point in the code to one. From here out references to `tracing` in the code snippets refer to the Synapses module. Most methods provided in the module have a direct correlation to those provided by OpenTelemetry. Refer to docs there for a more in-depth documentation on some of the args and methods. Tracing ------- In Synapse, it is not possible to start a non-active span. Spans can be started using the ``start_active_span`` method. This returns a context manager that needs to be entered and exited to expose the ``span``. This is usually done by using a ``with`` statement. .. code-block:: python from synapse.logging.tracing import start_active_span with start_active_span("operation name"): # Do something we want to trace Forgetting to enter or exit a scope will result in unstarted and unfinished spans that will not be reported (exported). At anytime where there is an active span ``set_attribute`` can be used to set a tag on the current active span. Tracing functions ----------------- Functions can be easily traced using decorators. The name of the function becomes the operation name for the span. .. code-block:: python from synapse.logging.tracing import trace # Start a span using 'interesting_function' as the operation name @trace def interesting_function(*args, **kwargs): # Does all kinds of cool and expected things return something_usual_and_useful Operation names can be explicitly set for a function by using ``trace_with_opname``: .. code-block:: python from synapse.logging.tracing import trace_with_opname @trace_with_opname("a_better_operation_name") def interesting_badly_named_function(*args, **kwargs): # Does all kinds of cool and expected things return something_usual_and_useful Setting Tags ------------ To set a tag on the active span do .. code-block:: python from synapse.logging.tracing import set_attribute set_attribute(tag_name, tag_value) There's a convenient decorator to tag all the args of the method. It uses inspection in order to use the formal parameter names prefixed with 'ARG_' as tag names. It uses kwarg names as tag names without the prefix. .. code-block:: python from synapse.logging.tracing import tag_args @tag_args def set_fates(clotho, lachesis, atropos, father="Zues", mother="Themis"): pass set_fates("the story", "the end", "the act") # This will have the following tags # - ARG_clotho: "the story" # - ARG_lachesis: "the end" # - ARG_atropos: "the act" # - father: "Zues" # - mother: "Themis" Contexts and carriers --------------------- There are a selection of wrappers for injecting and extracting contexts from carriers provided. We use these to inject of OpenTelemetry Contexts into Twisted's http headers, EDU contents and our database tables. Please refer to the end of ``logging/tracing.py`` for the available injection and extraction methods. Homeserver whitelisting ----------------------- Most of the whitelist checks are encapsulated in the modules's injection and extraction method but be aware that using custom carriers or crossing unchartered waters will require the enforcement of the whitelist. ``logging/tracing.py`` has a ``whitelisted_homeserver`` method which takes in a destination and compares it to the whitelist. Most injection methods take a 'destination' arg. The context will only be injected if the destination matches the whitelist or the destination is None. ======= Gotchas ======= - Checking whitelists on span propagation - Inserting pii - Forgetting to enter or exit a scope - Span source: make sure that the span you expect to be active across a function call really will be that one. Does the current function have more than one caller? Will all of those calling functions have be in a context with an active span? """ import contextlib import inspect import logging import re from abc import ABC from functools import wraps from typing import ( TYPE_CHECKING, Any, Callable, ContextManager, Dict, Generator, Iterable, List, Optional, Pattern, Sequence, TypeVar, Union, cast, overload, ) from typing_extensions import ParamSpec from twisted.internet import defer from twisted.web.http import Request from twisted.web.http_headers import Headers from synapse.api.constants import EventContentFields from synapse.config import ConfigError from synapse.util import json_decoder if TYPE_CHECKING: from synapse.http.site import SynapseRequest from synapse.server import HomeServer # Helper class T = TypeVar("T") # Matches the number suffix in an instance name like "matrix.org client_reader-8" STRIP_INSTANCE_NUMBER_SUFFIX_REGEX = re.compile(r"[_-]?\d+$") class _DummyLookup(object): """This will always returns the fixed value given for any accessed property""" def __init__(self, value: T) -> None: self.value = value def __getattribute__(self, name: str) -> T: return object.__getattribute__(self, "value") class DummyLink(ABC): """Dummy placeholder for `opentelemetry.trace.Link`""" def __init__(self) -> None: self.not_implemented_message = ( "opentelemetry isn't installed so this is just a dummy link placeholder" ) @property def context(self) -> None: raise NotImplementedError(self.not_implemented_message) @property def attributes(self) -> None: raise NotImplementedError(self.not_implemented_message) # These dependencies are optional so they can fail to import # and we try: import opentelemetry import opentelemetry.exporter.jaeger.thrift import opentelemetry.propagate import opentelemetry.sdk.resources import opentelemetry.sdk.trace import opentelemetry.sdk.trace.export import opentelemetry.semconv.trace import opentelemetry.trace import opentelemetry.trace.propagation import opentelemetry.trace.status SpanKind = opentelemetry.trace.SpanKind SpanAttributes = opentelemetry.semconv.trace.SpanAttributes StatusCode = opentelemetry.trace.status.StatusCode Link = opentelemetry.trace.Link except ImportError: opentelemetry = None # type: ignore[assignment] SpanKind = _DummyLookup(0) # type: ignore SpanAttributes = _DummyLookup("fake-attribute") # type: ignore StatusCode = _DummyLookup(0) # type: ignore Link = DummyLink # type: ignore logger = logging.getLogger(__name__) class SynapseTags: """FIXME: Rename to `SynapseAttributes` so it matches OpenTelemetry `SpanAttributes`""" # The message ID of any to_device message processed TO_DEVICE_MESSAGE_ID = "to_device.message_id" # Whether the sync response has new data to be returned to the client. SYNC_RESULT = "sync.new_data" # The Synapse instance name INSTANCE_NAME = "instance_name" # incoming HTTP request ID (as written in the logs) REQUEST_ID = "request_id" # HTTP request tag (used to distinguish full vs incremental syncs, etc) REQUEST_TAG = "request_tag" # Text description of a database transaction DB_TXN_DESC = "db.txn_desc" # Uniqueish ID of a database transaction DB_TXN_ID = "db.txn_id" # The name of the external cache CACHE_NAME = "cache.name" # Used to tag function arguments # # Tag a named arg. The name of the argument should be appended to this prefix. FUNC_ARG_PREFIX = "ARG." # Tag extra variadic number of positional arguments (`def foo(first, second, *extras)`) FUNC_ARGS = "args" # Tag keyword args FUNC_KWARGS = "kwargs" # Some intermediate result that's interesting to the function. The label for # the result should be appended to this prefix. RESULT_PREFIX = "RESULT." class SynapseBaggage: FORCE_TRACING = "synapse-force-tracing" # Block everything by default # A regex which matches the server_names to expose traces for. # None means 'block everything'. _homeserver_whitelist: Optional[Pattern[str]] = None # Util methods P = ParamSpec("P") R = TypeVar("R") def only_if_tracing(func: Callable[P, R]) -> Callable[P, Optional[R]]: """Decorator function that executes the function only if we're tracing. Otherwise returns None.""" @wraps(func) def _only_if_tracing_inner(*args: P.args, **kwargs: P.kwargs) -> Optional[R]: if opentelemetry: return func(*args, **kwargs) else: return None return _only_if_tracing_inner @overload def ensure_active_span( message: str, ) -> Callable[[Callable[P, R]], Callable[P, Optional[R]]]: ... @overload def ensure_active_span( message: str, ret: T ) -> Callable[[Callable[P, R]], Callable[P, Union[T, R]]]: ... def ensure_active_span( message: str, ret: Optional[T] = None ) -> Callable[[Callable[P, R]], Callable[P, Union[Optional[T], R]]]: """Executes the operation only if opentelemetry is enabled and there is an active span. If there is no active span it logs message at the error level. Args: message: Message which fills in "There was no active span when trying to %s" in the error log if there is no active span and opentelemetry is enabled. ret: return value if opentelemetry is None or there is no active span. Returns: The result of the func, falling back to ret if opentelemetry is disabled or there was no active span. """ def ensure_active_span_inner_1( func: Callable[P, R] ) -> Callable[P, Union[Optional[T], R]]: @wraps(func) def ensure_active_span_inner_2( *args: P.args, **kwargs: P.kwargs ) -> Union[Optional[T], R]: if not opentelemetry: return ret if not opentelemetry.trace.get_current_span(): logger.error( "There was no active span when trying to %s." " Did you forget to start one or did a context slip?", message, stack_info=True, ) return ret return func(*args, **kwargs) return ensure_active_span_inner_2 return ensure_active_span_inner_1 # Setup def init_tracer(hs: "HomeServer") -> None: """Set the whitelists and initialise the OpenTelemetry tracer""" global opentelemetry if not hs.config.tracing.tracing_enabled: # We don't have a tracer opentelemetry = None # type: ignore[assignment] return if not opentelemetry: raise ConfigError( "The server has been configured to use OpenTelemetry but OpenTelemetry is not " "installed." ) # Pull out of the config if it was given. Otherwise set it to something sensible. set_homeserver_whitelist(hs.config.tracing.homeserver_whitelist) # Instance names are opaque strings but by stripping off the number suffix, # we can get something that looks like a "worker type", e.g. # "client_reader-1" -> "client_reader" so we don't spread the traces across # so many services. instance_name_by_type = re.sub( STRIP_INSTANCE_NUMBER_SUFFIX_REGEX, "", hs.get_instance_name() ) resource = opentelemetry.sdk.resources.Resource( attributes={ opentelemetry.sdk.resources.SERVICE_NAME: f"{hs.config.server.server_name} {instance_name_by_type}" } ) # TODO: `force_tracing_for_users` is not compatible with OTEL samplers # because you can only determine `opentelemetry.trace.TraceFlags.SAMPLED` # and whether it uses a recording span when the span is created and we don't # have enough information at that time (we can determine in # `synapse/api/auth.py`). There isn't a way to change the trace flags after # the fact so there is no way to programmatically force # recording/tracing/sampling like there was in opentracing. sampler = opentelemetry.sdk.trace.sampling.ParentBasedTraceIdRatio( hs.config.tracing.sample_rate ) tracer_provider = opentelemetry.sdk.trace.TracerProvider( resource=resource, sampler=sampler ) # consoleProcessor = opentelemetry.sdk.trace.export.BatchSpanProcessor( # opentelemetry.sdk.trace.export.ConsoleSpanExporter() # ) # tracer_provider.add_span_processor(consoleProcessor) jaeger_exporter = opentelemetry.exporter.jaeger.thrift.JaegerExporter( **hs.config.tracing.jaeger_exporter_config ) jaeger_processor = opentelemetry.sdk.trace.export.BatchSpanProcessor( jaeger_exporter ) tracer_provider.add_span_processor(jaeger_processor) # Sets the global default tracer provider opentelemetry.trace.set_tracer_provider(tracer_provider) # Whitelisting @only_if_tracing def set_homeserver_whitelist(homeserver_whitelist: Iterable[str]) -> None: """Sets the homeserver whitelist Args: homeserver_whitelist: regexes specifying whitelisted homeservers """ global _homeserver_whitelist if homeserver_whitelist: # Makes a single regex which accepts all passed in regexes in the list _homeserver_whitelist = re.compile( "({})".format(")|(".join(homeserver_whitelist)) ) @only_if_tracing def whitelisted_homeserver(destination: str) -> bool: """Checks if a destination matches the whitelist Args: destination """ if _homeserver_whitelist: return _homeserver_whitelist.match(destination) is not None return False # Start spans and scopes def use_span( span: Optional["opentelemetry.trace.Span"], end_on_exit: bool = True, ) -> ContextManager[Optional["opentelemetry.trace.Span"]]: if opentelemetry is None or span is None: return contextlib.nullcontext() return opentelemetry.trace.use_span(span=span, end_on_exit=end_on_exit) def create_non_recording_span() -> Optional["opentelemetry.trace.Span"]: """Create a no-op span that does not record or become part of a recorded trace""" if opentelemetry is None: return None # type: ignore[unreachable] return opentelemetry.trace.NonRecordingSpan( opentelemetry.trace.INVALID_SPAN_CONTEXT ) def start_span( name: str, *, context: Optional["opentelemetry.context.context.Context"] = None, kind: Optional["opentelemetry.trace.SpanKind"] = SpanKind.INTERNAL, attributes: "opentelemetry.util.types.Attributes" = None, links: Optional[Sequence["opentelemetry.trace.Link"]] = None, start_time: Optional[int] = None, record_exception: bool = True, set_status_on_exception: bool = True, end_on_exit: bool = True, # For testing only tracer: Optional["opentelemetry.trace.Tracer"] = None, ) -> "opentelemetry.trace.Span": if opentelemetry is None: raise Exception("Not able to create span without opentelemetry installed.") if tracer is None: tracer = opentelemetry.trace.get_tracer(__name__) # TODO: Why is this necessary to satisfy this error? It has a default? # ` error: Argument "kind" to "start_span" of "Tracer" has incompatible type "Optional[SpanKind]"; expected "SpanKind" [arg-type]` if kind is None: kind = SpanKind.INTERNAL return tracer.start_span( name=name, context=context, kind=kind, attributes=attributes, links=links, start_time=start_time, record_exception=record_exception, set_status_on_exception=set_status_on_exception, ) def start_active_span( name: str, *, context: Optional["opentelemetry.context.context.Context"] = None, kind: Optional["opentelemetry.trace.SpanKind"] = SpanKind.INTERNAL, attributes: "opentelemetry.util.types.Attributes" = None, links: Optional[Sequence["opentelemetry.trace.Link"]] = None, start_time: Optional[int] = None, record_exception: bool = True, set_status_on_exception: bool = True, end_on_exit: bool = True, # For testing only tracer: Optional["opentelemetry.trace.Tracer"] = None, ) -> ContextManager[Optional["opentelemetry.trace.Span"]]: if opentelemetry is None: return contextlib.nullcontext() # type: ignore[unreachable] # TODO: Why is this necessary to satisfy this error? It has a default? # ` error: Argument "kind" to "start_span" of "Tracer" has incompatible type "Optional[SpanKind]"; expected "SpanKind" [arg-type]` if kind is None: kind = SpanKind.INTERNAL span = start_span( name=name, context=context, kind=kind, attributes=attributes, links=links, start_time=start_time, record_exception=record_exception, set_status_on_exception=set_status_on_exception, tracer=tracer, ) # Equivalent to `tracer.start_as_current_span` return opentelemetry.trace.use_span( span, end_on_exit=end_on_exit, record_exception=record_exception, set_status_on_exception=set_status_on_exception, ) def start_active_span_from_edu( operation_name: str, *, edu_content: Dict[str, Any], ) -> ContextManager[Optional["opentelemetry.trace.Span"]]: """ Extracts a span context from an edu and uses it to start a new active span Args: operation_name: The label for the chunk of time used to process the given edu. edu_content: an edu_content with a `context` field whose value is canonical json for a dict which contains tracing information. """ if opentelemetry is None: return contextlib.nullcontext() # type: ignore[unreachable] carrier = json_decoder.decode( edu_content.get(EventContentFields.TRACING_CONTEXT, "{}") ) context = extract_text_map(carrier) return start_active_span(name=operation_name, context=context) # OpenTelemetry setters for attributes, logs, etc @only_if_tracing def get_active_span() -> Optional["opentelemetry.trace.Span"]: """Get the currently active span, if any""" return opentelemetry.trace.get_current_span() def get_span_context_from_context( context: "opentelemetry.context.context.Context", ) -> Optional["opentelemetry.trace.SpanContext"]: """Utility function to convert a `Context` to a `SpanContext` Based on https://github.com/open-telemetry/opentelemetry-python/blob/43288ca9a36144668797c11ca2654836ec8b5e99/opentelemetry-api/src/opentelemetry/trace/propagation/tracecontext.py#L99-L102 """ span = opentelemetry.trace.get_current_span(context=context) span_context = span.get_span_context() if span_context == opentelemetry.trace.INVALID_SPAN_CONTEXT: return None return span_context def get_context_from_span( span: "opentelemetry.trace.Span", ) -> "opentelemetry.context.context.Context": # This doesn't affect the current context at all, it just converts a span # into `Context` object basically (bad name). ctx = opentelemetry.trace.propagation.set_span_in_context(span) return ctx @ensure_active_span("set a tag") def set_attribute(key: str, value: Union[str, bool, int, float]) -> None: """Sets a tag on the active span""" active_span = get_active_span() assert active_span is not None active_span.set_attribute(key, value) @ensure_active_span("set the status") def set_status( status_code: "opentelemetry.trace.status.StatusCode", exc: Optional[Exception] ) -> None: """Sets a tag on the active span""" active_span = get_active_span() assert active_span is not None active_span.set_status(opentelemetry.trace.status.Status(status_code=status_code)) if exc: active_span.record_exception(exc) DEFAULT_LOG_NAME = "log" @ensure_active_span("log") def log_kv(key_values: Dict[str, Any], timestamp: Optional[int] = None) -> None: """Log to the active span""" active_span = get_active_span() assert active_span is not None event_name = key_values.get("event", DEFAULT_LOG_NAME) active_span.add_event(event_name, attributes=key_values, timestamp=timestamp) @only_if_tracing def force_tracing(span: Optional["opentelemetry.trace.Span"] = None) -> None: """Force sampling for the active/given span and its children. Args: span: span to force tracing for. By default, the active span. """ # TODO pass def is_context_forced_tracing( context: "opentelemetry.context.context.Context", ) -> bool: """Check if sampling has been force for the given span context.""" # TODO return False # Injection and extraction @ensure_active_span("inject the active tracing context into a header dict") def inject_active_tracing_context_into_header_dict( headers: Dict[bytes, List[bytes]], destination: Optional[str] = None, check_destination: bool = True, ) -> None: """ Injects the active tracing context into a dict of HTTP headers Args: headers: the dict to inject headers into destination: address of entity receiving the span context. Must be given unless `check_destination` is False. check_destination (bool): If False, destination will be ignored and the context will always be injected. If True, the context will only be injected if the destination matches the tracing allowlist Note: The headers set by the tracer are custom to the tracer implementation which should be unique enough that they don't interfere with any headers set by synapse or twisted. """ if check_destination: if destination is None: raise ValueError( "destination must be given unless check_destination is False" ) if not whitelisted_homeserver(destination): return active_span = get_active_span() assert active_span is not None ctx = get_context_from_span(active_span) propagator = opentelemetry.propagate.get_global_textmap() # Put all of SpanContext properties into the headers dict propagator.inject(headers, context=ctx) def inject_trace_id_into_response_headers(response_headers: Headers) -> None: """Inject the current trace id into the HTTP response headers""" if not opentelemetry: return active_span = get_active_span() if not active_span: return trace_id = active_span.get_span_context().trace_id if trace_id is not None: response_headers.addRawHeader("Synapse-Trace-Id", f"{trace_id:x}") @ensure_active_span( "get the active span context as a dict", ret=cast(Dict[str, str], {}) ) def get_active_span_text_map(destination: Optional[str] = None) -> Dict[str, str]: """ Gets the active tracing Context serialized as a dict. This can be used instead of manually injecting a span into an empty carrier. Args: destination: the name of the remote server. Returns: dict: the serialized active span's context if opentelemetry is enabled, otherwise empty. """ if destination and not whitelisted_homeserver(destination): return {} active_span = get_active_span() assert active_span is not None ctx = get_context_from_span(active_span) carrier_text_map: Dict[str, str] = {} propagator = opentelemetry.propagate.get_global_textmap() # Put all of Context properties onto the carrier text map that we can return propagator.inject(carrier_text_map, context=ctx) return carrier_text_map def context_from_request( request: Request, ) -> Optional["opentelemetry.context.context.Context"]: """Extract an opentelemetry context from the headers on an HTTP request This is useful when we have received an HTTP request from another part of our system, and want to link our spans to those of the remote system. """ if not opentelemetry: return None header_dict = { k.decode(): v[0].decode() for k, v in request.requestHeaders.getAllRawHeaders() } # Extract all of the relevant values from the headers to construct a # SpanContext to return. return extract_text_map(header_dict) @only_if_tracing def extract_text_map( carrier: Dict[str, str] ) -> Optional["opentelemetry.context.context.Context"]: """ Wrapper method for opentelemetry's propagator.extract for TEXT_MAP. Args: carrier: a dict possibly containing a context. Returns: The active context extracted from carrier. """ propagator = opentelemetry.propagate.get_global_textmap() # Extract all of the relevant values from the `carrier` to construct a # Context to return. return propagator.extract(carrier) # Tracing decorators def _custom_sync_async_decorator( func: Callable[P, R], wrapping_logic: Callable[[Callable[P, R], Any, Any], ContextManager[None]], ) -> Callable[P, R]: """ Decorates a function that is sync or async (coroutines), or that returns a Twisted `Deferred`. The custom business logic of the decorator goes in `wrapping_logic`. Example usage: ```py # Decorator to time the function and log it out def duration(func: Callable[P, R]) -> Callable[P, R]: @contextlib.contextmanager def _wrapping_logic(func: Callable[P, R], *args: P.args, **kwargs: P.kwargs) -> Generator[None, None, None]: start_ts = time.time() try: yield finally: end_ts = time.time() duration = end_ts - start_ts logger.info("%s took %s seconds", func.__name__, duration) return _custom_sync_async_decorator(func, _wrapping_logic) ``` Args: func: The function to be decorated wrapping_logic: The business logic of your custom decorator. This should be a ContextManager so you are able to run your logic before/after the function as desired. """ if inspect.iscoroutinefunction(func): @wraps(func) async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> R: with wrapping_logic(func, *args, **kwargs): return await func(*args, **kwargs) # type: ignore[misc] else: # The other case here handles both sync functions and those # decorated with inlineDeferred. @wraps(func) def _wrapper(*args: P.args, **kwargs: P.kwargs) -> R: scope = wrapping_logic(func, *args, **kwargs) scope.__enter__() try: result = func(*args, **kwargs) if isinstance(result, defer.Deferred): def call_back(result: R) -> R: scope.__exit__(None, None, None) return result def err_back(result: R) -> R: scope.__exit__(None, None, None) return result result.addCallbacks(call_back, err_back) else: if inspect.isawaitable(result): logger.error( "@trace may not have wrapped %s correctly! " "The function is not async but returned a %s.", func.__qualname__, type(result).__name__, ) scope.__exit__(None, None, None) return result except Exception as e: scope.__exit__(type(e), None, e.__traceback__) raise return _wrapper # type: ignore[return-value] def trace_with_opname( opname: str, *, tracer: Optional["opentelemetry.trace.Tracer"] = None, ) -> Callable[[Callable[P, R]], Callable[P, R]]: """ Decorator to trace a function with a custom opname. See the module's doc string for usage examples. """ # type-ignore: mypy bug, see https://github.com/python/mypy/issues/12909 @contextlib.contextmanager # type: ignore[arg-type] def _wrapping_logic( func: Callable[P, R], *args: P.args, **kwargs: P.kwargs ) -> Generator[None, None, None]: with start_active_span(opname, tracer=tracer): yield def _decorator(func: Callable[P, R]) -> Callable[P, R]: if not opentelemetry: return func return _custom_sync_async_decorator(func, _wrapping_logic) return _decorator def trace(func: Callable[P, R]) -> Callable[P, R]: """ Decorator to trace a function. Sets the operation name to that of the function's name. See the module's doc string for usage examples. """ return trace_with_opname(func.__name__)(func) def tag_args(func: Callable[P, R]) -> Callable[P, R]: """ Tags all of the args to the active span. """ if not opentelemetry: return func # type-ignore: mypy bug, see https://github.com/python/mypy/issues/12909 @contextlib.contextmanager # type: ignore[arg-type] def _wrapping_logic( func: Callable[P, R], *args: P.args, **kwargs: P.kwargs ) -> Generator[None, None, None]: argspec = inspect.getfullargspec(func) # We use `[1:]` to skip the `self` object reference and `start=1` to # make the index line up with `argspec.args`. # # FIXME: We could update this to handle any type of function by ignoring the # first argument only if it's named `self` or `cls`. This isn't fool-proof # but handles the idiomatic cases. for i, arg in enumerate(args[1:], start=1): # type: ignore[index] set_attribute(SynapseTags.FUNC_ARG_PREFIX + argspec.args[i], str(arg)) set_attribute(SynapseTags.FUNC_ARGS, str(args[len(argspec.args) :])) # type: ignore[index] set_attribute(SynapseTags.FUNC_KWARGS, str(kwargs)) yield return _custom_sync_async_decorator(func, _wrapping_logic) @contextlib.contextmanager def trace_servlet( request: "SynapseRequest", extract_context: bool = False ) -> Generator[None, None, None]: """Returns a context manager which traces a request. It starts a span with some servlet specific tags such as the request metrics name and request information. Args: request extract_context: Whether to attempt to extract the tracing context from the request the servlet is handling. """ if opentelemetry is None: yield # type: ignore[unreachable] return attrs = { SynapseTags.REQUEST_ID: request.get_request_id(), SpanAttributes.HTTP_METHOD: request.get_method(), SpanAttributes.HTTP_URL: request.get_redacted_uri(), SpanAttributes.HTTP_HOST: request.getClientAddress().host, } request_name = request.request_metrics.name tracing_context = context_from_request(request) if extract_context else None # This is will end up being the root span for all of servlet traces and we # aren't able to determine whether to force tracing yet. We can determine # whether to force trace later in `synapse/api/auth.py`. with start_active_span( request_name, kind=SpanKind.SERVER, context=tracing_context, attributes=attrs, # we configure the span not to finish immediately on exiting the scope, # and instead pass the span into the SynapseRequest (via # `request.set_tracing_span(span)`), which will finish it once we've # finished sending the response to the client. end_on_exit=False, ) as span: if span: request.set_tracing_span(span) inject_trace_id_into_response_headers(request.responseHeaders) try: yield finally: if span: # We set the operation name again in case its changed (which happens # with JsonResource). span.update_name(request.request_metrics.name) if request.request_metrics.start_context.tag is not None: span.set_attribute( SynapseTags.REQUEST_TAG, request.request_metrics.start_context.tag, )