summary refs log tree commit diff
path: root/synapse/logging
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/logging')
-rw-r--r--synapse/logging/context.py9
-rw-r--r--synapse/logging/opentracing.py1055
-rw-r--r--synapse/logging/scopecontextmanager.py171
-rw-r--r--synapse/logging/tracing.py1022
4 files changed, 1023 insertions, 1234 deletions
diff --git a/synapse/logging/context.py b/synapse/logging/context.py

index f62bea968f..56243aa5e7 100644 --- a/synapse/logging/context.py +++ b/synapse/logging/context.py
@@ -46,7 +46,6 @@ from twisted.internet import defer, threads from twisted.python.threadpool import ThreadPool if TYPE_CHECKING: - from synapse.logging.scopecontextmanager import _LogContextScope from synapse.types import ISynapseReactor logger = logging.getLogger(__name__) @@ -220,14 +219,13 @@ LoggingContextOrSentinel = Union["LoggingContext", "_Sentinel"] class _Sentinel: """Sentinel to represent the root context""" - __slots__ = ["previous_context", "finished", "request", "scope", "tag"] + __slots__ = ["previous_context", "finished", "request", "tag"] def __init__(self) -> None: # Minimal set for compatibility with LoggingContext self.previous_context = None self.finished = False self.request = None - self.scope = None self.tag = None def __str__(self) -> str: @@ -280,7 +278,6 @@ class LoggingContext: "finished", "request", "tag", - "scope", ] def __init__( @@ -301,7 +298,6 @@ class LoggingContext: self.main_thread = get_thread_id() self.request = None self.tag = "" - self.scope: Optional["_LogContextScope"] = None # keep track of whether we have hit the __exit__ block for this context # (suggesting that the the thing that created the context thinks it should @@ -314,9 +310,6 @@ class LoggingContext: # we track the current request_id self.request = self.parent_context.request - # we also track the current scope: - self.scope = self.parent_context.scope - if request is not None: # the request param overrides the request from the parent context self.request = request diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py deleted file mode 100644
index b69060854f..0000000000 --- a/synapse/logging/opentracing.py +++ /dev/null
@@ -1,1055 +0,0 @@ -# Copyright 2019 The Matrix.org Foundation C.I.C. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -# NOTE -# This is a small wrapper around opentracing because opentracing is not currently -# packaged downstream (specifically debian). Since opentracing instrumentation is -# fairly invasive it was awkward to make it optional. As a result we opted to encapsulate -# all opentracing state in these methods which effectively noop if opentracing is -# not present. We should strongly consider encouraging the downstream distributers -# to package opentracing and making opentracing a full dependency. In order to facilitate -# this move the methods have work very similarly to opentracing's and it should only -# be a matter of few regexes to move over to opentracing's access patterns proper. - -""" -============================ -Using OpenTracing in Synapse -============================ - -Python-specific tracing concepts are at https://opentracing.io/guides/python/. -Note that Synapse wraps OpenTracing in a small module (this one) in order to make the -OpenTracing dependency optional. That means that the access patterns are -different to those demonstrated in the OpenTracing guides. However, it is -still useful to know, especially if OpenTracing is included as a full dependency -in the future or if you are modifying this module. - - -OpenTracing is encapsulated so that -no span objects from OpenTracing are exposed in Synapse's code. This allows -OpenTracing to be easily disabled in Synapse and thereby have OpenTracing as -an optional dependency. This does however limit the number of modifiable spans -at any point in the code to one. From here out references to `opentracing` -in the code snippets refer to the Synapses module. -Most methods provided in the module have a direct correlation to those provided -by opentracing. Refer to docs there for a more in-depth documentation on some of -the args and methods. - -Tracing -------- - -In Synapse it is not possible to start a non-active span. Spans can be started -using the ``start_active_span`` method. This returns a scope (see -OpenTracing docs) which is a context manager that needs to be entered and -exited. This is usually done by using ``with``. - -.. code-block:: python - - from synapse.logging.opentracing import start_active_span - - with start_active_span("operation name"): - # Do something we want to tracer - -Forgetting to enter or exit a scope will result in some mysterious and grievous log -context errors. - -At anytime where there is an active span ``opentracing.set_tag`` can be used to -set a tag on the current active span. - -Tracing functions ------------------ - -Functions can be easily traced using decorators. The name of -the function becomes the operation name for the span. - -.. code-block:: python - - from synapse.logging.opentracing import trace - - # Start a span using 'interesting_function' as the operation name - @trace - def interesting_function(*args, **kwargs): - # Does all kinds of cool and expected things - return something_usual_and_useful - - -Operation names can be explicitly set for a function by using ``trace_with_opname``: - -.. code-block:: python - - from synapse.logging.opentracing import trace_with_opname - - @trace_with_opname("a_better_operation_name") - def interesting_badly_named_function(*args, **kwargs): - # Does all kinds of cool and expected things - return something_usual_and_useful - -Setting Tags ------------- - -To set a tag on the active span do - -.. code-block:: python - - from synapse.logging.opentracing import set_tag - - set_tag(tag_name, tag_value) - -There's a convenient decorator to tag all the args of the method. It uses -inspection in order to use the formal parameter names prefixed with 'ARG_' as -tag names. It uses kwarg names as tag names without the prefix. - -.. code-block:: python - - from synapse.logging.opentracing import tag_args - - @tag_args - def set_fates(clotho, lachesis, atropos, father="Zues", mother="Themis"): - pass - - set_fates("the story", "the end", "the act") - # This will have the following tags - # - ARG_clotho: "the story" - # - ARG_lachesis: "the end" - # - ARG_atropos: "the act" - # - father: "Zues" - # - mother: "Themis" - -Contexts and carriers ---------------------- - -There are a selection of wrappers for injecting and extracting contexts from -carriers provided. Unfortunately OpenTracing's three context injection -techniques are not adequate for our inject of OpenTracing span-contexts into -Twisted's http headers, EDU contents and our database tables. Also note that -the binary encoding format mandated by OpenTracing is not actually implemented -by jaeger_client v4.0.0 - it will silently noop. -Please refer to the end of ``logging/opentracing.py`` for the available -injection and extraction methods. - -Homeserver whitelisting ------------------------ - -Most of the whitelist checks are encapsulated in the modules's injection -and extraction method but be aware that using custom carriers or crossing -unchartered waters will require the enforcement of the whitelist. -``logging/opentracing.py`` has a ``whitelisted_homeserver`` method which takes -in a destination and compares it to the whitelist. - -Most injection methods take a 'destination' arg. The context will only be injected -if the destination matches the whitelist or the destination is None. - -======= -Gotchas -======= - -- Checking whitelists on span propagation -- Inserting pii -- Forgetting to enter or exit a scope -- Span source: make sure that the span you expect to be active across a - function call really will be that one. Does the current function have more - than one caller? Will all of those calling functions have be in a context - with an active span? -""" -import contextlib -import enum -import inspect -import logging -import re -from functools import wraps -from typing import ( - TYPE_CHECKING, - Any, - Callable, - Collection, - ContextManager, - Dict, - Generator, - Iterable, - List, - Optional, - Pattern, - Type, - TypeVar, - Union, - cast, - overload, -) - -import attr -from typing_extensions import ParamSpec - -from twisted.internet import defer -from twisted.web.http import Request -from twisted.web.http_headers import Headers - -from synapse.config import ConfigError -from synapse.util import json_decoder, json_encoder - -if TYPE_CHECKING: - from synapse.http.site import SynapseRequest - from synapse.server import HomeServer - -# Helper class - -# Matches the number suffix in an instance name like "matrix.org client_reader-8" -STRIP_INSTANCE_NUMBER_SUFFIX_REGEX = re.compile(r"[_-]?\d+$") - - -class _DummyTagNames: - """wrapper of opentracings tags. We need to have them if we - want to reference them without opentracing around. Clearly they - should never actually show up in a trace. `set_tags` overwrites - these with the correct ones.""" - - INVALID_TAG = "invalid-tag" - COMPONENT = INVALID_TAG - DATABASE_INSTANCE = INVALID_TAG - DATABASE_STATEMENT = INVALID_TAG - DATABASE_TYPE = INVALID_TAG - DATABASE_USER = INVALID_TAG - ERROR = INVALID_TAG - HTTP_METHOD = INVALID_TAG - HTTP_STATUS_CODE = INVALID_TAG - HTTP_URL = INVALID_TAG - MESSAGE_BUS_DESTINATION = INVALID_TAG - PEER_ADDRESS = INVALID_TAG - PEER_HOSTNAME = INVALID_TAG - PEER_HOST_IPV4 = INVALID_TAG - PEER_HOST_IPV6 = INVALID_TAG - PEER_PORT = INVALID_TAG - PEER_SERVICE = INVALID_TAG - SAMPLING_PRIORITY = INVALID_TAG - SERVICE = INVALID_TAG - SPAN_KIND = INVALID_TAG - SPAN_KIND_CONSUMER = INVALID_TAG - SPAN_KIND_PRODUCER = INVALID_TAG - SPAN_KIND_RPC_CLIENT = INVALID_TAG - SPAN_KIND_RPC_SERVER = INVALID_TAG - - -try: - import opentracing - import opentracing.tags - - tags = opentracing.tags -except ImportError: - opentracing = None # type: ignore[assignment] - tags = _DummyTagNames # type: ignore[assignment] -try: - from jaeger_client import Config as JaegerConfig - - from synapse.logging.scopecontextmanager import LogContextScopeManager -except ImportError: - JaegerConfig = None # type: ignore - LogContextScopeManager = None # type: ignore - - -try: - from rust_python_jaeger_reporter import Reporter - - # jaeger-client 4.7.0 requires that reporters inherit from BaseReporter, which - # didn't exist before that version. - try: - from jaeger_client.reporter import BaseReporter - except ImportError: - - class BaseReporter: # type: ignore[no-redef] - pass - - @attr.s(slots=True, frozen=True, auto_attribs=True) - class _WrappedRustReporter(BaseReporter): - """Wrap the reporter to ensure `report_span` never throws.""" - - _reporter: Reporter = attr.Factory(Reporter) - - def set_process(self, *args: Any, **kwargs: Any) -> None: - return self._reporter.set_process(*args, **kwargs) - - def report_span(self, span: "opentracing.Span") -> None: - try: - return self._reporter.report_span(span) - except Exception: - logger.exception("Failed to report span") - - RustReporter: Optional[Type[_WrappedRustReporter]] = _WrappedRustReporter -except ImportError: - RustReporter = None - - -logger = logging.getLogger(__name__) - - -class SynapseTags: - # The message ID of any to_device message processed - TO_DEVICE_MESSAGE_ID = "to_device.message_id" - - # Whether the sync response has new data to be returned to the client. - SYNC_RESULT = "sync.new_data" - - INSTANCE_NAME = "instance_name" - - # incoming HTTP request ID (as written in the logs) - REQUEST_ID = "request_id" - - # HTTP request tag (used to distinguish full vs incremental syncs, etc) - REQUEST_TAG = "request_tag" - - # Text description of a database transaction - DB_TXN_DESC = "db.txn_desc" - - # Uniqueish ID of a database transaction - DB_TXN_ID = "db.txn_id" - - # The name of the external cache - CACHE_NAME = "cache.name" - - # Used to tag function arguments - # - # Tag a named arg. The name of the argument should be appended to this prefix. - FUNC_ARG_PREFIX = "ARG." - # Tag extra variadic number of positional arguments (`def foo(first, second, *extras)`) - FUNC_ARGS = "args" - # Tag keyword args - FUNC_KWARGS = "kwargs" - - # Some intermediate result that's interesting to the function. The label for - # the result should be appended to this prefix. - RESULT_PREFIX = "RESULT." - - -class SynapseBaggage: - FORCE_TRACING = "synapse-force-tracing" - - -# Block everything by default -# A regex which matches the server_names to expose traces for. -# None means 'block everything'. -_homeserver_whitelist: Optional[Pattern[str]] = None - -# Util methods - - -class _Sentinel(enum.Enum): - # defining a sentinel in this way allows mypy to correctly handle the - # type of a dictionary lookup. - sentinel = object() - - -P = ParamSpec("P") -R = TypeVar("R") -T = TypeVar("T") - - -def only_if_tracing(func: Callable[P, R]) -> Callable[P, Optional[R]]: - """Executes the function only if we're tracing. Otherwise returns None.""" - - @wraps(func) - def _only_if_tracing_inner(*args: P.args, **kwargs: P.kwargs) -> Optional[R]: - if opentracing: - return func(*args, **kwargs) - else: - return None - - return _only_if_tracing_inner - - -@overload -def ensure_active_span( - message: str, -) -> Callable[[Callable[P, R]], Callable[P, Optional[R]]]: - ... - - -@overload -def ensure_active_span( - message: str, ret: T -) -> Callable[[Callable[P, R]], Callable[P, Union[T, R]]]: - ... - - -def ensure_active_span( - message: str, ret: Optional[T] = None -) -> Callable[[Callable[P, R]], Callable[P, Union[Optional[T], R]]]: - """Executes the operation only if opentracing is enabled and there is an active span. - If there is no active span it logs message at the error level. - - Args: - message: Message which fills in "There was no active span when trying to %s" - in the error log if there is no active span and opentracing is enabled. - ret: return value if opentracing is None or there is no active span. - - Returns: - The result of the func, falling back to ret if opentracing is disabled or there - was no active span. - """ - - def ensure_active_span_inner_1( - func: Callable[P, R] - ) -> Callable[P, Union[Optional[T], R]]: - @wraps(func) - def ensure_active_span_inner_2( - *args: P.args, **kwargs: P.kwargs - ) -> Union[Optional[T], R]: - if not opentracing: - return ret - - if not opentracing.tracer.active_span: - logger.error( - "There was no active span when trying to %s." - " Did you forget to start one or did a context slip?", - message, - stack_info=True, - ) - - return ret - - return func(*args, **kwargs) - - return ensure_active_span_inner_2 - - return ensure_active_span_inner_1 - - -# Setup - - -def init_tracer(hs: "HomeServer") -> None: - """Set the whitelists and initialise the JaegerClient tracer""" - global opentracing - if not hs.config.tracing.opentracer_enabled: - # We don't have a tracer - opentracing = None # type: ignore[assignment] - return - - if not opentracing or not JaegerConfig: - raise ConfigError( - "The server has been configured to use opentracing but opentracing is not " - "installed." - ) - - # Pull out the jaeger config if it was given. Otherwise set it to something sensible. - # See https://github.com/jaegertracing/jaeger-client-python/blob/master/jaeger_client/config.py - - set_homeserver_whitelist(hs.config.tracing.opentracer_whitelist) - - from jaeger_client.metrics.prometheus import PrometheusMetricsFactory - - # Instance names are opaque strings but by stripping off the number suffix, - # we can get something that looks like a "worker type", e.g. - # "client_reader-1" -> "client_reader" so we don't spread the traces across - # so many services. - instance_name_by_type = re.sub( - STRIP_INSTANCE_NUMBER_SUFFIX_REGEX, "", hs.get_instance_name() - ) - - config = JaegerConfig( - config=hs.config.tracing.jaeger_config, - service_name=f"{hs.config.server.server_name} {instance_name_by_type}", - scope_manager=LogContextScopeManager(), - metrics_factory=PrometheusMetricsFactory(), - ) - - # If we have the rust jaeger reporter available let's use that. - if RustReporter: - logger.info("Using rust_python_jaeger_reporter library") - assert config.sampler is not None - tracer = config.create_tracer(RustReporter(), config.sampler) - opentracing.set_global_tracer(tracer) - else: - config.initialize_tracer() - - -# Whitelisting - - -@only_if_tracing -def set_homeserver_whitelist(homeserver_whitelist: Iterable[str]) -> None: - """Sets the homeserver whitelist - - Args: - homeserver_whitelist: regexes specifying whitelisted homeservers - """ - global _homeserver_whitelist - if homeserver_whitelist: - # Makes a single regex which accepts all passed in regexes in the list - _homeserver_whitelist = re.compile( - "({})".format(")|(".join(homeserver_whitelist)) - ) - - -@only_if_tracing -def whitelisted_homeserver(destination: str) -> bool: - """Checks if a destination matches the whitelist - - Args: - destination - """ - - if _homeserver_whitelist: - return _homeserver_whitelist.match(destination) is not None - return False - - -# Start spans and scopes - -# Could use kwargs but I want these to be explicit -def start_active_span( - operation_name: str, - child_of: Optional[Union["opentracing.Span", "opentracing.SpanContext"]] = None, - references: Optional[List["opentracing.Reference"]] = None, - tags: Optional[Dict[str, str]] = None, - start_time: Optional[float] = None, - ignore_active_span: bool = False, - finish_on_close: bool = True, - *, - tracer: Optional["opentracing.Tracer"] = None, -) -> "opentracing.Scope": - """Starts an active opentracing span. - - Records the start time for the span, and sets it as the "active span" in the - scope manager. - - Args: - See opentracing.tracer - Returns: - scope (Scope) or contextlib.nullcontext - """ - - if opentracing is None: - return contextlib.nullcontext() # type: ignore[unreachable] - - if tracer is None: - # use the global tracer by default - tracer = opentracing.tracer - - return tracer.start_active_span( - operation_name, - child_of=child_of, - references=references, - tags=tags, - start_time=start_time, - ignore_active_span=ignore_active_span, - finish_on_close=finish_on_close, - ) - - -def start_active_span_follows_from( - operation_name: str, - contexts: Collection, - child_of: Optional[Union["opentracing.Span", "opentracing.SpanContext"]] = None, - start_time: Optional[float] = None, - *, - inherit_force_tracing: bool = False, - tracer: Optional["opentracing.Tracer"] = None, -) -> "opentracing.Scope": - """Starts an active opentracing span, with additional references to previous spans - - Args: - operation_name: name of the operation represented by the new span - contexts: the previous spans to inherit from - - child_of: optionally override the parent span. If unset, the currently active - span will be the parent. (If there is no currently active span, the first - span in `contexts` will be the parent.) - - start_time: optional override for the start time of the created span. Seconds - since the epoch. - - inherit_force_tracing: if set, and any of the previous contexts have had tracing - forced, the new span will also have tracing forced. - tracer: override the opentracing tracer. By default the global tracer is used. - """ - if opentracing is None: - return contextlib.nullcontext() # type: ignore[unreachable] - - references = [opentracing.follows_from(context) for context in contexts] - scope = start_active_span( - operation_name, - child_of=child_of, - references=references, - start_time=start_time, - tracer=tracer, - ) - - if inherit_force_tracing and any( - is_context_forced_tracing(ctx) for ctx in contexts - ): - force_tracing(scope.span) - - return scope - - -def start_active_span_from_edu( - edu_content: Dict[str, Any], - operation_name: str, - references: Optional[List["opentracing.Reference"]] = None, - tags: Optional[Dict[str, str]] = None, - start_time: Optional[float] = None, - ignore_active_span: bool = False, - finish_on_close: bool = True, -) -> "opentracing.Scope": - """ - Extracts a span context from an edu and uses it to start a new active span - - Args: - edu_content: an edu_content with a `context` field whose value is - canonical json for a dict which contains opentracing information. - - For the other args see opentracing.tracer - """ - references = references or [] - - if opentracing is None: - return contextlib.nullcontext() # type: ignore[unreachable] - - carrier = json_decoder.decode(edu_content.get("context", "{}")).get( - "opentracing", {} - ) - context = opentracing.tracer.extract(opentracing.Format.TEXT_MAP, carrier) - _references = [ - opentracing.child_of(span_context_from_string(x)) - for x in carrier.get("references", []) - ] - - # For some reason jaeger decided not to support the visualization of multiple parent - # spans or explicitly show references. I include the span context as a tag here as - # an aid to people debugging but it's really not an ideal solution. - - references += _references - - scope = opentracing.tracer.start_active_span( - operation_name, - child_of=context, - references=references, - tags=tags, - start_time=start_time, - ignore_active_span=ignore_active_span, - finish_on_close=finish_on_close, - ) - - scope.span.set_tag("references", carrier.get("references", [])) - return scope - - -# Opentracing setters for tags, logs, etc -@only_if_tracing -def active_span() -> Optional["opentracing.Span"]: - """Get the currently active span, if any""" - return opentracing.tracer.active_span - - -@ensure_active_span("set a tag") -def set_tag(key: str, value: Union[str, bool, int, float]) -> None: - """Sets a tag on the active span""" - assert opentracing.tracer.active_span is not None - opentracing.tracer.active_span.set_tag(key, value) - - -@ensure_active_span("log") -def log_kv(key_values: Dict[str, Any], timestamp: Optional[float] = None) -> None: - """Log to the active span""" - assert opentracing.tracer.active_span is not None - opentracing.tracer.active_span.log_kv(key_values, timestamp) - - -@ensure_active_span("set the traces operation name") -def set_operation_name(operation_name: str) -> None: - """Sets the operation name of the active span""" - assert opentracing.tracer.active_span is not None - opentracing.tracer.active_span.set_operation_name(operation_name) - - -@only_if_tracing -def force_tracing( - span: Union["opentracing.Span", _Sentinel] = _Sentinel.sentinel -) -> None: - """Force sampling for the active/given span and its children. - - Args: - span: span to force tracing for. By default, the active span. - """ - if isinstance(span, _Sentinel): - span_to_trace = opentracing.tracer.active_span - else: - span_to_trace = span - if span_to_trace is None: - logger.error("No active span in force_tracing") - return - - span_to_trace.set_tag(opentracing.tags.SAMPLING_PRIORITY, 1) - - # also set a bit of baggage, so that we have a way of figuring out if - # it is enabled later - span_to_trace.set_baggage_item(SynapseBaggage.FORCE_TRACING, "1") - - -def is_context_forced_tracing( - span_context: Optional["opentracing.SpanContext"], -) -> bool: - """Check if sampling has been force for the given span context.""" - if span_context is None: - return False - return span_context.baggage.get(SynapseBaggage.FORCE_TRACING) is not None - - -# Injection and extraction - - -@ensure_active_span("inject the span into a header dict") -def inject_header_dict( - headers: Dict[bytes, List[bytes]], - destination: Optional[str] = None, - check_destination: bool = True, -) -> None: - """ - Injects a span context into a dict of HTTP headers - - Args: - headers: the dict to inject headers into - destination: address of entity receiving the span context. Must be given unless - check_destination is False. The context will only be injected if the - destination matches the opentracing whitelist - check_destination: If false, destination will be ignored and the context - will always be injected. - - Note: - The headers set by the tracer are custom to the tracer implementation which - should be unique enough that they don't interfere with any headers set by - synapse or twisted. If we're still using jaeger these headers would be those - here: - https://github.com/jaegertracing/jaeger-client-python/blob/master/jaeger_client/constants.py - """ - if check_destination: - if destination is None: - raise ValueError( - "destination must be given unless check_destination is False" - ) - if not whitelisted_homeserver(destination): - return - - span = opentracing.tracer.active_span - - carrier: Dict[str, str] = {} - assert span is not None - opentracing.tracer.inject(span.context, opentracing.Format.HTTP_HEADERS, carrier) - - for key, value in carrier.items(): - headers[key.encode()] = [value.encode()] - - -def inject_response_headers(response_headers: Headers) -> None: - """Inject the current trace id into the HTTP response headers""" - if not opentracing: - return - span = opentracing.tracer.active_span - if not span: - return - - # This is a bit implementation-specific. - # - # Jaeger's Spans have a trace_id property; other implementations (including the - # dummy opentracing.span.Span which we use if init_tracer is not called) do not - # expose it - trace_id = getattr(span, "trace_id", None) - - if trace_id is not None: - response_headers.addRawHeader("Synapse-Trace-Id", f"{trace_id:x}") - - -@ensure_active_span( - "get the active span context as a dict", ret=cast(Dict[str, str], {}) -) -def get_active_span_text_map(destination: Optional[str] = None) -> Dict[str, str]: - """ - Gets a span context as a dict. This can be used instead of manually - injecting a span into an empty carrier. - - Args: - destination: the name of the remote server. - - Returns: - the active span's context if opentracing is enabled, otherwise empty. - """ - - if destination and not whitelisted_homeserver(destination): - return {} - - carrier: Dict[str, str] = {} - assert opentracing.tracer.active_span is not None - opentracing.tracer.inject( - opentracing.tracer.active_span.context, opentracing.Format.TEXT_MAP, carrier - ) - - return carrier - - -@ensure_active_span("get the span context as a string.", ret={}) -def active_span_context_as_string() -> str: - """ - Returns: - The active span context encoded as a string. - """ - carrier: Dict[str, str] = {} - if opentracing: - assert opentracing.tracer.active_span is not None - opentracing.tracer.inject( - opentracing.tracer.active_span.context, opentracing.Format.TEXT_MAP, carrier - ) - return json_encoder.encode(carrier) - - -def span_context_from_request(request: Request) -> "Optional[opentracing.SpanContext]": - """Extract an opentracing context from the headers on an HTTP request - - This is useful when we have received an HTTP request from another part of our - system, and want to link our spans to those of the remote system. - """ - if not opentracing: - return None - header_dict = { - k.decode(): v[0].decode() for k, v in request.requestHeaders.getAllRawHeaders() - } - return opentracing.tracer.extract(opentracing.Format.HTTP_HEADERS, header_dict) - - -@only_if_tracing -def span_context_from_string(carrier: str) -> Optional["opentracing.SpanContext"]: - """ - Returns: - The active span context decoded from a string. - """ - payload: Dict[str, str] = json_decoder.decode(carrier) - return opentracing.tracer.extract(opentracing.Format.TEXT_MAP, payload) - - -@only_if_tracing -def extract_text_map(carrier: Dict[str, str]) -> Optional["opentracing.SpanContext"]: - """ - Wrapper method for opentracing's tracer.extract for TEXT_MAP. - Args: - carrier: a dict possibly containing a span context. - - Returns: - The active span context extracted from carrier. - """ - return opentracing.tracer.extract(opentracing.Format.TEXT_MAP, carrier) - - -# Tracing decorators - - -def _custom_sync_async_decorator( - func: Callable[P, R], - wrapping_logic: Callable[[Callable[P, R], Any, Any], ContextManager[None]], -) -> Callable[P, R]: - """ - Decorates a function that is sync or async (coroutines), or that returns a Twisted - `Deferred`. The custom business logic of the decorator goes in `wrapping_logic`. - - Example usage: - ```py - # Decorator to time the function and log it out - def duration(func: Callable[P, R]) -> Callable[P, R]: - @contextlib.contextmanager - def _wrapping_logic(func: Callable[P, R], *args: P.args, **kwargs: P.kwargs) -> Generator[None, None, None]: - start_ts = time.time() - try: - yield - finally: - end_ts = time.time() - duration = end_ts - start_ts - logger.info("%s took %s seconds", func.__name__, duration) - return _custom_sync_async_decorator(func, _wrapping_logic) - ``` - - Args: - func: The function to be decorated - wrapping_logic: The business logic of your custom decorator. - This should be a ContextManager so you are able to run your logic - before/after the function as desired. - """ - - if inspect.iscoroutinefunction(func): - - @wraps(func) - async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> R: - with wrapping_logic(func, *args, **kwargs): - return await func(*args, **kwargs) # type: ignore[misc] - - else: - # The other case here handles both sync functions and those - # decorated with inlineDeferred. - @wraps(func) - def _wrapper(*args: P.args, **kwargs: P.kwargs) -> R: - scope = wrapping_logic(func, *args, **kwargs) - scope.__enter__() - - try: - result = func(*args, **kwargs) - if isinstance(result, defer.Deferred): - - def call_back(result: R) -> R: - scope.__exit__(None, None, None) - return result - - def err_back(result: R) -> R: - scope.__exit__(None, None, None) - return result - - result.addCallbacks(call_back, err_back) - - else: - if inspect.isawaitable(result): - logger.error( - "@trace may not have wrapped %s correctly! " - "The function is not async but returned a %s.", - func.__qualname__, - type(result).__name__, - ) - - scope.__exit__(None, None, None) - - return result - - except Exception as e: - scope.__exit__(type(e), None, e.__traceback__) - raise - - return _wrapper # type: ignore[return-value] - - -def trace_with_opname( - opname: str, - *, - tracer: Optional["opentracing.Tracer"] = None, -) -> Callable[[Callable[P, R]], Callable[P, R]]: - """ - Decorator to trace a function with a custom opname. - See the module's doc string for usage examples. - """ - - # type-ignore: mypy bug, see https://github.com/python/mypy/issues/12909 - @contextlib.contextmanager # type: ignore[arg-type] - def _wrapping_logic( - func: Callable[P, R], *args: P.args, **kwargs: P.kwargs - ) -> Generator[None, None, None]: - with start_active_span(opname, tracer=tracer): - yield - - def _decorator(func: Callable[P, R]) -> Callable[P, R]: - if not opentracing: - return func - - return _custom_sync_async_decorator(func, _wrapping_logic) - - return _decorator - - -def trace(func: Callable[P, R]) -> Callable[P, R]: - """ - Decorator to trace a function. - Sets the operation name to that of the function's name. - See the module's doc string for usage examples. - """ - - return trace_with_opname(func.__name__)(func) - - -def tag_args(func: Callable[P, R]) -> Callable[P, R]: - """ - Decorator to tag all of the args to the active span. - - Args: - func: `func` is assumed to be a method taking a `self` parameter, or a - `classmethod` taking a `cls` parameter. In either case, a tag is not - created for this parameter. - """ - - if not opentracing: - return func - - # type-ignore: mypy bug, see https://github.com/python/mypy/issues/12909 - @contextlib.contextmanager # type: ignore[arg-type] - def _wrapping_logic( - func: Callable[P, R], *args: P.args, **kwargs: P.kwargs - ) -> Generator[None, None, None]: - argspec = inspect.getfullargspec(func) - # We use `[1:]` to skip the `self` object reference and `start=1` to - # make the index line up with `argspec.args`. - # - # FIXME: We could update this to handle any type of function by ignoring the - # first argument only if it's named `self` or `cls`. This isn't fool-proof - # but handles the idiomatic cases. - for i, arg in enumerate(args[1:], start=1): - set_tag(SynapseTags.FUNC_ARG_PREFIX + argspec.args[i], str(arg)) - set_tag(SynapseTags.FUNC_ARGS, str(args[len(argspec.args) :])) - set_tag(SynapseTags.FUNC_KWARGS, str(kwargs)) - yield - - return _custom_sync_async_decorator(func, _wrapping_logic) - - -@contextlib.contextmanager -def trace_servlet( - request: "SynapseRequest", extract_context: bool = False -) -> Generator[None, None, None]: - """Returns a context manager which traces a request. It starts a span - with some servlet specific tags such as the request metrics name and - request information. - - Args: - request - extract_context: Whether to attempt to extract the opentracing - context from the request the servlet is handling. - """ - - if opentracing is None: - yield # type: ignore[unreachable] - return - - request_tags = { - SynapseTags.REQUEST_ID: request.get_request_id(), - tags.SPAN_KIND: tags.SPAN_KIND_RPC_SERVER, - tags.HTTP_METHOD: request.get_method(), - tags.HTTP_URL: request.get_redacted_uri(), - tags.PEER_HOST_IPV6: request.getClientAddress().host, - } - - request_name = request.request_metrics.name - context = span_context_from_request(request) if extract_context else None - - # we configure the scope not to finish the span immediately on exit, and instead - # pass the span into the SynapseRequest, which will finish it once we've finished - # sending the response to the client. - scope = start_active_span(request_name, child_of=context, finish_on_close=False) - request.set_opentracing_span(scope.span) - - with scope: - inject_response_headers(request.responseHeaders) - try: - yield - finally: - # We set the operation name again in case its changed (which happens - # with JsonResource). - scope.span.set_operation_name(request.request_metrics.name) - - request_tags[ - SynapseTags.REQUEST_TAG - ] = request.request_metrics.start_context.tag - - # set the tags *after* the servlet completes, in case it decided to - # prioritise the span (tags will get dropped on unprioritised spans) - for k, v in request_tags.items(): - scope.span.set_tag(k, v) diff --git a/synapse/logging/scopecontextmanager.py b/synapse/logging/scopecontextmanager.py deleted file mode 100644
index 10877bdfc5..0000000000 --- a/synapse/logging/scopecontextmanager.py +++ /dev/null
@@ -1,171 +0,0 @@ -# Copyright 2019 The Matrix.org Foundation C.I.C. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License.import logging - -import logging -from types import TracebackType -from typing import Optional, Type - -from opentracing import Scope, ScopeManager, Span - -import twisted - -from synapse.logging.context import ( - LoggingContext, - current_context, - nested_logging_context, -) - -logger = logging.getLogger(__name__) - - -class LogContextScopeManager(ScopeManager): - """ - The LogContextScopeManager tracks the active scope in opentracing - by using the log contexts which are native to synapse. This is so - that the basic opentracing api can be used across twisted defereds. - - It would be nice just to use opentracing's ContextVarsScopeManager, - but currently that doesn't work due to https://twistedmatrix.com/trac/ticket/10301. - """ - - def __init__(self) -> None: - pass - - @property - def active(self) -> Optional[Scope]: - """ - Returns the currently active Scope which can be used to access the - currently active Scope.span. - If there is a non-null Scope, its wrapped Span - becomes an implicit parent of any newly-created Span at - Tracer.start_active_span() time. - - Return: - The Scope that is active, or None if not available. - """ - ctx = current_context() - return ctx.scope - - def activate(self, span: Span, finish_on_close: bool) -> Scope: - """ - Makes a Span active. - Args - span: the span that should become active. - finish_on_close: whether Span should be automatically finished when - Scope.close() is called. - - Returns: - Scope to control the end of the active period for - *span*. It is a programming error to neglect to call - Scope.close() on the returned instance. - """ - - ctx = current_context() - - if not ctx: - logger.error("Tried to activate scope outside of loggingcontext") - return Scope(None, span) # type: ignore[arg-type] - - if ctx.scope is not None: - # start a new logging context as a child of the existing one. - # Doing so -- rather than updating the existing logcontext -- means that - # creating several concurrent spans under the same logcontext works - # correctly. - ctx = nested_logging_context("") - enter_logcontext = True - else: - # if there is no span currently associated with the current logcontext, we - # just store the scope in it. - # - # This feels a bit dubious, but it does hack around a problem where a - # span outlasts its parent logcontext (which would otherwise lead to - # "Re-starting finished log context" errors). - enter_logcontext = False - - scope = _LogContextScope(self, span, ctx, enter_logcontext, finish_on_close) - ctx.scope = scope - if enter_logcontext: - ctx.__enter__() - - return scope - - -class _LogContextScope(Scope): - """ - A custom opentracing scope, associated with a LogContext - - * filters out _DefGen_Return exceptions which arise from calling - `defer.returnValue` in Twisted code - - * When the scope is closed, the logcontext's active scope is reset to None. - and - if enter_logcontext was set - the logcontext is finished too. - """ - - def __init__( - self, - manager: LogContextScopeManager, - span: Span, - logcontext: LoggingContext, - enter_logcontext: bool, - finish_on_close: bool, - ): - """ - Args: - manager: - the manager that is responsible for this scope. - span: - the opentracing span which this scope represents the local - lifetime for. - logcontext: - the log context to which this scope is attached. - enter_logcontext: - if True the log context will be exited when the scope is finished - finish_on_close: - if True finish the span when the scope is closed - """ - super().__init__(manager, span) - self.logcontext = logcontext - self._finish_on_close = finish_on_close - self._enter_logcontext = enter_logcontext - - def __exit__( - self, - exc_type: Optional[Type[BaseException]], - value: Optional[BaseException], - traceback: Optional[TracebackType], - ) -> None: - if exc_type == twisted.internet.defer._DefGen_Return: - # filter out defer.returnValue() calls - exc_type = value = traceback = None - super().__exit__(exc_type, value, traceback) - - def __str__(self) -> str: - return f"Scope<{self.span}>" - - def close(self) -> None: - active_scope = self.manager.active - if active_scope is not self: - logger.error( - "Closing scope %s which is not the currently-active one %s", - self, - active_scope, - ) - - if self._finish_on_close: - self.span.finish() - - self.logcontext.scope = None - - if self._enter_logcontext: - self.logcontext.__exit__(None, None, None) diff --git a/synapse/logging/tracing.py b/synapse/logging/tracing.py new file mode 100644
index 0000000000..d455854467 --- /dev/null +++ b/synapse/logging/tracing.py
@@ -0,0 +1,1022 @@ +# Copyright 2019 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# NOTE This is a small wrapper around opentelemetry because tracing is optional +# and not always packaged downstream. Since opentelemetry instrumentation is +# fairly invasive it was awkward to make it optional. As a result we opted to +# encapsulate all opentelemetry state in these methods which effectively noop if +# opentelemetry is not present. We should strongly consider encouraging the +# downstream distributers to package opentelemetry and making opentelemetry a +# full dependency. In order to facilitate this move the methods have work very +# similarly to opentelemetry's and it should only be a matter of few regexes to +# move over to opentelemetry's access patterns proper. + +""" +============================ +Using OpenTelemetry in Synapse +============================ + +Python-specific tracing concepts are at +https://opentelemetry.io/docs/instrumentation/python/. Note that Synapse wraps +OpenTelemetry in a small module (this one) in order to make the OpenTelemetry +dependency optional. That means that some access patterns are different to those +demonstrated in the OpenTelemetry guides. However, it is still useful to know, +especially if OpenTelemetry is included as a full dependency in the future or if +you are modifying this module. + + +OpenTelemetry is encapsulated so that no span objects from OpenTelemetry are +exposed in Synapse's code. This allows OpenTelemetry to be easily disabled in +Synapse and thereby have OpenTelemetry as an optional dependency. This does +however limit the number of modifiable spans at any point in the code to one. +From here out references to `tracing` in the code snippets refer to the Synapses +module. Most methods provided in the module have a direct correlation to those +provided by OpenTelemetry. Refer to docs there for a more in-depth documentation +on some of the args and methods. + +Tracing +------- + +In Synapse, it is not possible to start a non-active span. Spans can be started +using the ``start_active_span`` method. This returns a context manager that +needs to be entered and exited to expose the ``span``. This is usually done by +using a ``with`` statement. + +.. code-block:: python + + from synapse.logging.tracing import start_active_span + + with start_active_span("operation name"): + # Do something we want to trace + +Forgetting to enter or exit a scope will result in unstarted and unfinished +spans that will not be reported (exported). + +At anytime where there is an active span ``set_attribute`` can be +used to set a tag on the current active span. + +Tracing functions +----------------- + +Functions can be easily traced using decorators. The name of the function +becomes the operation name for the span. + +.. code-block:: python + + from synapse.logging.tracing import trace + + # Start a span using 'interesting_function' as the operation name + @trace + def interesting_function(*args, **kwargs): + # Does all kinds of cool and expected things return + something_usual_and_useful + + +Operation names can be explicitly set for a function by using +``trace_with_opname``: + +.. code-block:: python + + from synapse.logging.tracing import trace_with_opname + + @trace_with_opname("a_better_operation_name") + def interesting_badly_named_function(*args, **kwargs): + # Does all kinds of cool and expected things return + something_usual_and_useful + +Setting Tags +------------ + +To set a tag on the active span do + +.. code-block:: python + + from synapse.logging.tracing import set_attribute + + set_attribute(tag_name, tag_value) + +There's a convenient decorator to tag all the args of the method. It uses +inspection in order to use the formal parameter names prefixed with 'ARG_' as +tag names. It uses kwarg names as tag names without the prefix. + +.. code-block:: python + from synapse.logging.tracing import tag_args + @tag_args + def set_fates(clotho, lachesis, atropos, father="Zues", mother="Themis"): + pass + set_fates("the story", "the end", "the act") + # This will have the following tags + # - ARG_clotho: "the story" + # - ARG_lachesis: "the end" + # - ARG_atropos: "the act" + # - father: "Zues" + # - mother: "Themis" + +Contexts and carriers +--------------------- + +There are a selection of wrappers for injecting and extracting contexts from +carriers provided. We use these to inject of OpenTelemetry Contexts into +Twisted's http headers, EDU contents and our database tables. Please refer to +the end of ``logging/tracing.py`` for the available injection and extraction +methods. + +Homeserver whitelisting +----------------------- + +Most of the whitelist checks are encapsulated in the modules's injection and +extraction method but be aware that using custom carriers or crossing +unchartered waters will require the enforcement of the whitelist. +``logging/tracing.py`` has a ``whitelisted_homeserver`` method which takes +in a destination and compares it to the whitelist. + +Most injection methods take a 'destination' arg. The context will only be +injected if the destination matches the whitelist or the destination is None. + +======= +Gotchas +======= + +- Checking whitelists on span propagation +- Inserting pii +- Forgetting to enter or exit a scope +- Span source: make sure that the span you expect to be active across a function + call really will be that one. Does the current function have more than one + caller? Will all of those calling functions have be in a context with an + active span? +""" +import contextlib +import inspect +import logging +import re +from abc import ABC +from functools import wraps +from typing import ( + TYPE_CHECKING, + Any, + Callable, + ContextManager, + Dict, + Generator, + Iterable, + List, + Optional, + Pattern, + Sequence, + TypeVar, + Union, + cast, + overload, +) + +from typing_extensions import ParamSpec + +from twisted.internet import defer +from twisted.web.http import Request +from twisted.web.http_headers import Headers + +from synapse.api.constants import EventContentFields +from synapse.config import ConfigError +from synapse.util import json_decoder + +if TYPE_CHECKING: + from synapse.http.site import SynapseRequest + from synapse.server import HomeServer + +# Helper class + +T = TypeVar("T") + +# Matches the number suffix in an instance name like "matrix.org client_reader-8" +STRIP_INSTANCE_NUMBER_SUFFIX_REGEX = re.compile(r"[_-]?\d+$") + + +class _DummyLookup(object): + """This will always returns the fixed value given for any accessed property""" + + def __init__(self, value: T) -> None: + self.value = value + + # type-ignore: Because mypy says "A function returning TypeVar should receive at + # least one argument containing the same Typevar" but this is just a dummy + # stand-in that doesn't need any input. + def __getattribute__(self, name: str) -> T: # type: ignore[type-var] + return object.__getattribute__(self, "value") + + +class DummyLink(ABC): + """Dummy placeholder for `opentelemetry.trace.Link`""" + + def __init__(self, *args: Any) -> None: + self.not_implemented_message = ( + "opentelemetry isn't installed so this is just a dummy link placeholder" + ) + + @property + def context(self) -> None: + raise NotImplementedError(self.not_implemented_message) + + @property + def attributes(self) -> None: + raise NotImplementedError(self.not_implemented_message) + + +# These dependencies are optional so they can fail to import +# and we +try: + import opentelemetry + import opentelemetry.exporter.jaeger.thrift + import opentelemetry.propagate + import opentelemetry.sdk.resources + import opentelemetry.sdk.trace + import opentelemetry.sdk.trace.export + import opentelemetry.semconv.trace + import opentelemetry.trace + import opentelemetry.trace.propagation + import opentelemetry.trace.status + + SpanKind = opentelemetry.trace.SpanKind + SpanAttributes = opentelemetry.semconv.trace.SpanAttributes + StatusCode = opentelemetry.trace.status.StatusCode + Link = opentelemetry.trace.Link +except ImportError: + opentelemetry = None # type: ignore[assignment] + SpanKind = _DummyLookup(0) # type: ignore + SpanAttributes = _DummyLookup("fake-attribute") # type: ignore + StatusCode = _DummyLookup(0) # type: ignore + Link = DummyLink # type: ignore + + +logger = logging.getLogger(__name__) + + +class SynapseTags: + """FIXME: Rename to `SynapseAttributes` so it matches OpenTelemetry `SpanAttributes`""" + + # The message ID of any to_device message processed + TO_DEVICE_MESSAGE_ID = "to_device.message_id" + + # Whether the sync response has new data to be returned to the client. + SYNC_RESULT = "sync.new_data" + + # The Synapse instance name + INSTANCE_NAME = "instance_name" + + # incoming HTTP request ID (as written in the logs) + REQUEST_ID = "request_id" + + # HTTP request tag (used to distinguish full vs incremental syncs, etc) + REQUEST_TAG = "request_tag" + + # Text description of a database transaction + DB_TXN_DESC = "db.txn_desc" + + # Uniqueish ID of a database transaction + DB_TXN_ID = "db.txn_id" + + # The name of the external cache + CACHE_NAME = "cache.name" + + # Used to tag function arguments + # + # Tag a named arg. The name of the argument should be appended to this prefix. + FUNC_ARG_PREFIX = "ARG." + # Tag extra variadic number of positional arguments (`def foo(first, second, *extras)`) + FUNC_ARGS = "args" + # Tag keyword args + FUNC_KWARGS = "kwargs" + + # Some intermediate result that's interesting to the function. The label for + # the result should be appended to this prefix. + RESULT_PREFIX = "RESULT." + + +class SynapseBaggage: + FORCE_TRACING = "synapse-force-tracing" + + +# Block everything by default +# A regex which matches the server_names to expose traces for. +# None means 'block everything'. +_homeserver_whitelist: Optional[Pattern[str]] = None + +# Util methods + + +P = ParamSpec("P") +R = TypeVar("R") + + +def only_if_tracing(func: Callable[P, R]) -> Callable[P, Optional[R]]: + """Decorator function that executes the function only if we're tracing. Otherwise returns None.""" + + @wraps(func) + def _only_if_tracing_inner(*args: P.args, **kwargs: P.kwargs) -> Optional[R]: + if opentelemetry: + return func(*args, **kwargs) + else: + return None + + return _only_if_tracing_inner + + +@overload +def ensure_active_span( + message: str, +) -> Callable[[Callable[P, R]], Callable[P, Optional[R]]]: + ... + + +@overload +def ensure_active_span( + message: str, ret: T +) -> Callable[[Callable[P, R]], Callable[P, Union[T, R]]]: + ... + + +def ensure_active_span( + message: str, ret: Optional[T] = None +) -> Callable[[Callable[P, R]], Callable[P, Union[Optional[T], R]]]: + """Executes the operation only if opentelemetry is enabled and there is an active span. + If there is no active span it logs message at the error level. + + Args: + message: Message which fills in "There was no active span when trying to %s" + in the error log if there is no active span and opentelemetry is enabled. + ret: return value if opentelemetry is None or there is no active span. + + Returns: + The result of the func, falling back to ret if opentelemetry is disabled or there + was no active span. + """ + + def ensure_active_span_inner_1( + func: Callable[P, R] + ) -> Callable[P, Union[Optional[T], R]]: + @wraps(func) + def ensure_active_span_inner_2( + *args: P.args, **kwargs: P.kwargs + ) -> Union[Optional[T], R]: + if not opentelemetry: + return ret + + if not opentelemetry.trace.get_current_span(): + logger.error( + "There was no active span when trying to %s." + " Did you forget to start one or did a context slip?", + message, + stack_info=True, + ) + + return ret + + return func(*args, **kwargs) + + return ensure_active_span_inner_2 + + return ensure_active_span_inner_1 + + +# Setup + + +def init_tracer(hs: "HomeServer") -> None: + """Set the whitelists and initialise the OpenTelemetry tracer""" + global opentelemetry + if not hs.config.tracing.tracing_enabled: + # We don't have a tracer + opentelemetry = None # type: ignore[assignment] + return + + if not opentelemetry: + raise ConfigError( + "The server has been configured to use OpenTelemetry but OpenTelemetry is not " + "installed." + ) + + # Pull out of the config if it was given. Otherwise set it to something sensible. + set_homeserver_whitelist(hs.config.tracing.homeserver_whitelist) + + # Instance names are opaque strings but by stripping off the number suffix, + # we can get something that looks like a "worker type", e.g. + # "client_reader-1" -> "client_reader" so we don't spread the traces across + # so many services. + instance_name_by_type = re.sub( + STRIP_INSTANCE_NUMBER_SUFFIX_REGEX, "", hs.get_instance_name() + ) + + resource = opentelemetry.sdk.resources.Resource( + attributes={ + opentelemetry.sdk.resources.SERVICE_NAME: f"{hs.config.server.server_name} {instance_name_by_type}" + } + ) + + # TODO: `force_tracing_for_users` is not compatible with OTEL samplers + # because you can only determine `opentelemetry.trace.TraceFlags.SAMPLED` + # and whether it uses a recording span when the span is created and we don't + # have enough information at that time (we can determine in + # `synapse/api/auth.py`). There isn't a way to change the trace flags after + # the fact so there is no way to programmatically force + # recording/tracing/sampling like there was in opentracing. + sampler = opentelemetry.sdk.trace.sampling.ParentBasedTraceIdRatio( + hs.config.tracing.sample_rate + ) + + tracer_provider = opentelemetry.sdk.trace.TracerProvider( + resource=resource, sampler=sampler + ) + + # consoleProcessor = opentelemetry.sdk.trace.export.BatchSpanProcessor( + # opentelemetry.sdk.trace.export.ConsoleSpanExporter() + # ) + # tracer_provider.add_span_processor(consoleProcessor) + + jaeger_exporter = opentelemetry.exporter.jaeger.thrift.JaegerExporter( + **hs.config.tracing.jaeger_exporter_config + ) + jaeger_processor = opentelemetry.sdk.trace.export.BatchSpanProcessor( + jaeger_exporter + ) + tracer_provider.add_span_processor(jaeger_processor) + + # Sets the global default tracer provider + opentelemetry.trace.set_tracer_provider(tracer_provider) + + +# Whitelisting + + +@only_if_tracing +def set_homeserver_whitelist(homeserver_whitelist: Iterable[str]) -> None: + """Sets the homeserver whitelist + + Args: + homeserver_whitelist: regexes specifying whitelisted homeservers + """ + global _homeserver_whitelist + if homeserver_whitelist: + # Makes a single regex which accepts all passed in regexes in the list + _homeserver_whitelist = re.compile( + "({})".format(")|(".join(homeserver_whitelist)) + ) + + +@only_if_tracing +def whitelisted_homeserver(destination: str) -> bool: + """Checks if a destination matches the whitelist + + Args: + destination + """ + + if _homeserver_whitelist: + return _homeserver_whitelist.match(destination) is not None + return False + + +# Start spans and scopes + + +def use_span( + span: Optional["opentelemetry.trace.Span"], + end_on_exit: bool = True, +) -> ContextManager[Optional["opentelemetry.trace.Span"]]: + if opentelemetry is None or span is None: + return contextlib.nullcontext() + + return opentelemetry.trace.use_span(span=span, end_on_exit=end_on_exit) + + +def create_non_recording_span() -> Optional["opentelemetry.trace.Span"]: + """Create a no-op span that does not record or become part of a recorded trace""" + if opentelemetry is None: + return None # type: ignore[unreachable] + + return opentelemetry.trace.NonRecordingSpan( + opentelemetry.trace.INVALID_SPAN_CONTEXT + ) + + +def start_span( + name: str, + *, + context: Optional["opentelemetry.context.context.Context"] = None, + kind: Optional["opentelemetry.trace.SpanKind"] = SpanKind.INTERNAL, + attributes: "opentelemetry.util.types.Attributes" = None, + links: Optional[Sequence["opentelemetry.trace.Link"]] = None, + start_time: Optional[int] = None, + record_exception: bool = True, + set_status_on_exception: bool = True, + end_on_exit: bool = True, + # For testing only + tracer: Optional["opentelemetry.trace.Tracer"] = None, +) -> "opentelemetry.trace.Span": + if opentelemetry is None: + raise Exception("Not able to create span without opentelemetry installed.") + + if tracer is None: + tracer = opentelemetry.trace.get_tracer(__name__) + + # TODO: Why is this necessary to satisfy this error? It has a default? + # ` error: Argument "kind" to "start_span" of "Tracer" has incompatible type "Optional[SpanKind]"; expected "SpanKind" [arg-type]` + if kind is None: + kind = SpanKind.INTERNAL + + return tracer.start_span( + name=name, + context=context, + kind=kind, + attributes=attributes, + links=links, + start_time=start_time, + record_exception=record_exception, + set_status_on_exception=set_status_on_exception, + ) + + +def start_active_span( + name: str, + *, + context: Optional["opentelemetry.context.context.Context"] = None, + kind: "opentelemetry.trace.SpanKind" = SpanKind.INTERNAL, + attributes: "opentelemetry.util.types.Attributes" = None, + links: Optional[Sequence["opentelemetry.trace.Link"]] = None, + start_time: Optional[int] = None, + record_exception: bool = True, + set_status_on_exception: bool = True, + end_on_exit: bool = True, + # For testing only + tracer: Optional["opentelemetry.trace.Tracer"] = None, +) -> ContextManager[Optional["opentelemetry.trace.Span"]]: + if opentelemetry is None: + return contextlib.nullcontext() # type: ignore[unreachable] + + span = start_span( + name=name, + context=context, + kind=kind, + attributes=attributes, + links=links, + start_time=start_time, + record_exception=record_exception, + set_status_on_exception=set_status_on_exception, + tracer=tracer, + ) + + # Equivalent to `tracer.start_as_current_span` + return opentelemetry.trace.use_span( + span, + end_on_exit=end_on_exit, + record_exception=record_exception, + set_status_on_exception=set_status_on_exception, + ) + + +def start_active_span_from_edu( + operation_name: str, + *, + edu_content: Dict[str, Any], +) -> ContextManager[Optional["opentelemetry.trace.Span"]]: + """ + Extracts a span context from an edu and uses it to start a new active span + + Args: + operation_name: The label for the chunk of time used to process the given edu. + edu_content: an edu_content with a `context` field whose value is + canonical json for a dict which contains tracing information. + """ + if opentelemetry is None: + return contextlib.nullcontext() # type: ignore[unreachable] + + carrier = json_decoder.decode( + edu_content.get(EventContentFields.TRACING_CONTEXT, "{}") + ) + + context = extract_text_map(carrier) + + return start_active_span(name=operation_name, context=context) + + +# OpenTelemetry setters for attributes, logs, etc +@only_if_tracing +def get_active_span() -> Optional["opentelemetry.trace.Span"]: + """Get the currently active span, if any""" + return opentelemetry.trace.get_current_span() + + +def get_span_context_from_context( + context: "opentelemetry.context.context.Context", +) -> Optional["opentelemetry.trace.SpanContext"]: + """Utility function to convert a `Context` to a `SpanContext` + + Based on https://github.com/open-telemetry/opentelemetry-python/blob/43288ca9a36144668797c11ca2654836ec8b5e99/opentelemetry-api/src/opentelemetry/trace/propagation/tracecontext.py#L99-L102 + """ + span = opentelemetry.trace.get_current_span(context=context) + span_context = span.get_span_context() + if span_context == opentelemetry.trace.INVALID_SPAN_CONTEXT: + return None + return span_context + + +def get_context_from_span( + span: "opentelemetry.trace.Span", +) -> "opentelemetry.context.context.Context": + # This doesn't affect the current context at all, it just converts a span + # into `Context` object basically (bad name). + ctx = opentelemetry.trace.propagation.set_span_in_context(span) + return ctx + + +@ensure_active_span("set a tag") +def set_attribute(key: str, value: Union[str, bool, int, float]) -> None: + """Sets a tag on the active span""" + active_span = get_active_span() + assert active_span is not None + active_span.set_attribute(key, value) + + +@ensure_active_span("set the status") +def set_status( + status_code: "opentelemetry.trace.status.StatusCode", exc: Optional[Exception] +) -> None: + """Sets a tag on the active span""" + active_span = get_active_span() + assert active_span is not None + active_span.set_status(opentelemetry.trace.status.Status(status_code=status_code)) + if exc: + active_span.record_exception(exc) + + +DEFAULT_LOG_NAME = "log" + + +@ensure_active_span("log") +def log_kv(key_values: Dict[str, Any], timestamp: Optional[int] = None) -> None: + """Log to the active span""" + active_span = get_active_span() + assert active_span is not None + event_name = key_values.get("event", DEFAULT_LOG_NAME) + active_span.add_event(event_name, attributes=key_values, timestamp=timestamp) + + +@only_if_tracing +def force_tracing(span: Optional["opentelemetry.trace.Span"] = None) -> None: + """Force sampling for the active/given span and its children. + + Args: + span: span to force tracing for. By default, the active span. + """ + # TODO + pass + + +def is_context_forced_tracing( + context: "opentelemetry.context.context.Context", +) -> bool: + """Check if sampling has been force for the given span context.""" + # TODO + return False + + +# Injection and extraction + + +@ensure_active_span("inject the active tracing context into a header dict") +def inject_active_tracing_context_into_header_dict( + headers: Dict[bytes, List[bytes]], + destination: Optional[str] = None, + check_destination: bool = True, +) -> None: + """ + Injects the active tracing context into a dict of HTTP headers + + Args: + headers: the dict to inject headers into + destination: address of entity receiving the span context. Must be given unless + `check_destination` is False. + check_destination (bool): If False, destination will be ignored and the context + will always be injected. If True, the context will only be injected if the + destination matches the tracing allowlist + + Note: + The headers set by the tracer are custom to the tracer implementation which + should be unique enough that they don't interfere with any headers set by + synapse or twisted. + """ + if check_destination: + if destination is None: + raise ValueError( + "destination must be given unless check_destination is False" + ) + if not whitelisted_homeserver(destination): + return + + active_span = get_active_span() + assert active_span is not None + ctx = get_context_from_span(active_span) + + propagator = opentelemetry.propagate.get_global_textmap() + # Put all of SpanContext properties into the headers dict + propagator.inject(headers, context=ctx) + + +def inject_trace_id_into_response_headers(response_headers: Headers) -> None: + """Inject the current trace id into the HTTP response headers""" + if not opentelemetry: + return + active_span = get_active_span() + if not active_span: + return + + trace_id = active_span.get_span_context().trace_id + + if trace_id is not None: + response_headers.addRawHeader("Synapse-Trace-Id", f"{trace_id:x}") + + +@ensure_active_span( + "get the active span context as a dict", ret=cast(Dict[str, str], {}) +) +def get_active_span_text_map(destination: Optional[str] = None) -> Dict[str, str]: + """ + Gets the active tracing Context serialized as a dict. This can be used + instead of manually injecting a span into an empty carrier. + + Args: + destination: the name of the remote server. + + Returns: + dict: the serialized active span's context if opentelemetry is enabled, otherwise + empty. + """ + if destination and not whitelisted_homeserver(destination): + return {} + + active_span = get_active_span() + assert active_span is not None + ctx = get_context_from_span(active_span) + + carrier_text_map: Dict[str, str] = {} + propagator = opentelemetry.propagate.get_global_textmap() + # Put all of Context properties onto the carrier text map that we can return + propagator.inject(carrier_text_map, context=ctx) + + return carrier_text_map + + +def context_from_request( + request: Request, +) -> Optional["opentelemetry.context.context.Context"]: + """Extract an opentelemetry context from the headers on an HTTP request + + This is useful when we have received an HTTP request from another part of our + system, and want to link our spans to those of the remote system. + """ + if not opentelemetry: + return None + header_dict = { + k.decode(): v[0].decode() for k, v in request.requestHeaders.getAllRawHeaders() + } + + # Extract all of the relevant values from the headers to construct a + # SpanContext to return. + return extract_text_map(header_dict) + + +@only_if_tracing +def extract_text_map( + carrier: Dict[str, str] +) -> Optional["opentelemetry.context.context.Context"]: + """ + Wrapper method for opentelemetry's propagator.extract for TEXT_MAP. + Args: + carrier: a dict possibly containing a context. + + Returns: + The active context extracted from carrier. + """ + propagator = opentelemetry.propagate.get_global_textmap() + # Extract all of the relevant values from the `carrier` to construct a + # Context to return. + return propagator.extract(carrier) + + +# Tracing decorators + + +def _custom_sync_async_decorator( + func: Callable[P, R], + wrapping_logic: Callable[[Callable[P, R], Any, Any], ContextManager[None]], +) -> Callable[P, R]: + """ + Decorates a function that is sync or async (coroutines), or that returns a Twisted + `Deferred`. The custom business logic of the decorator goes in `wrapping_logic`. + Example usage: + ```py + # Decorator to time the function and log it out + def duration(func: Callable[P, R]) -> Callable[P, R]: + @contextlib.contextmanager + def _wrapping_logic(func: Callable[P, R], *args: P.args, **kwargs: P.kwargs) -> Generator[None, None, None]: + start_ts = time.time() + try: + yield + finally: + end_ts = time.time() + duration = end_ts - start_ts + logger.info("%s took %s seconds", func.__name__, duration) + return _custom_sync_async_decorator(func, _wrapping_logic) + ``` + Args: + func: The function to be decorated + wrapping_logic: The business logic of your custom decorator. + This should be a ContextManager so you are able to run your logic + before/after the function as desired. + """ + + if inspect.iscoroutinefunction(func): + + @wraps(func) + async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> R: + with wrapping_logic(func, *args, **kwargs): + return await func(*args, **kwargs) # type: ignore[misc] + + else: + # The other case here handles both sync functions and those + # decorated with inlineDeferred. + @wraps(func) + def _wrapper(*args: P.args, **kwargs: P.kwargs) -> R: + scope = wrapping_logic(func, *args, **kwargs) + scope.__enter__() + + try: + result = func(*args, **kwargs) + if isinstance(result, defer.Deferred): + + def call_back(result: R) -> R: + scope.__exit__(None, None, None) + return result + + def err_back(result: R) -> R: + scope.__exit__(None, None, None) + return result + + result.addCallbacks(call_back, err_back) + + else: + if inspect.isawaitable(result): + logger.error( + "@trace may not have wrapped %s correctly! " + "The function is not async but returned a %s.", + func.__qualname__, + type(result).__name__, + ) + + scope.__exit__(None, None, None) + + return result + + except Exception as e: + scope.__exit__(type(e), None, e.__traceback__) + raise + + return _wrapper # type: ignore[return-value] + + +def trace_with_opname( + opname: str, + *, + tracer: Optional["opentelemetry.trace.Tracer"] = None, +) -> Callable[[Callable[P, R]], Callable[P, R]]: + """ + Decorator to trace a function with a custom opname. + See the module's doc string for usage examples. + """ + # type-ignore: mypy bug, see https://github.com/python/mypy/issues/12909 + @contextlib.contextmanager # type: ignore[arg-type] + def _wrapping_logic( + func: Callable[P, R], *args: P.args, **kwargs: P.kwargs + ) -> Generator[None, None, None]: + with start_active_span(opname, tracer=tracer): + yield + + def _decorator(func: Callable[P, R]) -> Callable[P, R]: + if not opentelemetry: + return func + + return _custom_sync_async_decorator(func, _wrapping_logic) + + return _decorator + + +def trace(func: Callable[P, R]) -> Callable[P, R]: + """ + Decorator to trace a function. + + Sets the operation name to that of the function's name. + + See the module's doc string for usage examples. + """ + + return trace_with_opname(func.__name__)(func) + + +def tag_args(func: Callable[P, R]) -> Callable[P, R]: + """ + Tags all of the args to the active span. + """ + + if not opentelemetry: + return func + + # type-ignore: mypy bug, see https://github.com/python/mypy/issues/12909 + @contextlib.contextmanager # type: ignore[arg-type] + def _wrapping_logic( + func: Callable[P, R], *args: P.args, **kwargs: P.kwargs + ) -> Generator[None, None, None]: + argspec = inspect.getfullargspec(func) + # We use `[1:]` to skip the `self` object reference and `start=1` to + # make the index line up with `argspec.args`. + # + # FIXME: We could update this to handle any type of function by ignoring the + # first argument only if it's named `self` or `cls`. This isn't fool-proof + # but handles the idiomatic cases. + for i, arg in enumerate(args[1:], start=1): + set_attribute(SynapseTags.FUNC_ARG_PREFIX + argspec.args[i], str(arg)) + set_attribute(SynapseTags.FUNC_ARGS, str(args[len(argspec.args) :])) + set_attribute(SynapseTags.FUNC_KWARGS, str(kwargs)) + yield + + return _custom_sync_async_decorator(func, _wrapping_logic) + + +@contextlib.contextmanager +def trace_servlet( + request: "SynapseRequest", extract_context: bool = False +) -> Generator[None, None, None]: + """Returns a context manager which traces a request. It starts a span + with some servlet specific tags such as the request metrics name and + request information. + + Args: + request + extract_context: Whether to attempt to extract the tracing + context from the request the servlet is handling. + """ + + if opentelemetry is None: + yield # type: ignore[unreachable] + return + + attrs = { + SynapseTags.REQUEST_ID: request.get_request_id(), + SpanAttributes.HTTP_METHOD: request.get_method(), + SpanAttributes.HTTP_URL: request.get_redacted_uri(), + SpanAttributes.HTTP_HOST: request.getClientAddress().host, + } + + request_name = request.request_metrics.name + tracing_context = context_from_request(request) if extract_context else None + + # This is will end up being the root span for all of servlet traces and we + # aren't able to determine whether to force tracing yet. We can determine + # whether to force trace later in `synapse/api/auth.py`. + with start_active_span( + request_name, + kind=SpanKind.SERVER, + context=tracing_context, + attributes=attrs, + # we configure the span not to finish immediately on exiting the scope, + # and instead pass the span into the SynapseRequest (via + # `request.set_tracing_span(span)`), which will finish it once we've + # finished sending the response to the client. + end_on_exit=False, + ) as span: + if span: + request.set_tracing_span(span) + + inject_trace_id_into_response_headers(request.responseHeaders) + try: + yield + finally: + if span: + # We set the operation name again in case its changed (which happens + # with JsonResource). + span.update_name(request.request_metrics.name) + + if request.request_metrics.start_context.tag is not None: + span.set_attribute( + SynapseTags.REQUEST_TAG, + request.request_metrics.start_context.tag, + )