summary refs log tree commit diff
path: root/synapse/logging
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/logging')
-rw-r--r--synapse/logging/context.py59
-rw-r--r--synapse/logging/opentracing.py211
2 files changed, 176 insertions, 94 deletions
diff --git a/synapse/logging/context.py b/synapse/logging/context.py

index fd9cb97920..f62bea968f 100644 --- a/synapse/logging/context.py +++ b/synapse/logging/context.py
@@ -117,8 +117,7 @@ class ContextResourceUsage: """Create a new ContextResourceUsage Args: - copy_from (ContextResourceUsage|None): if not None, an object to - copy stats from + copy_from: if not None, an object to copy stats from """ if copy_from is None: self.reset() @@ -162,7 +161,7 @@ class ContextResourceUsage: """Add another ContextResourceUsage's stats to this one's. Args: - other (ContextResourceUsage): the other resource usage object + other: the other resource usage object """ self.ru_utime += other.ru_utime self.ru_stime += other.ru_stime @@ -342,7 +341,7 @@ class LoggingContext: called directly. Returns: - LoggingContext: the current logging context + The current logging context """ warnings.warn( "synapse.logging.context.LoggingContext.current_context() is deprecated " @@ -362,7 +361,8 @@ class LoggingContext: called directly. Args: - context(LoggingContext): The context to activate. + context: The context to activate. + Returns: The context that was previously active """ @@ -474,8 +474,7 @@ class LoggingContext: """Get resources used by this logcontext so far. Returns: - ContextResourceUsage: a *copy* of the object tracking resource - usage so far + A *copy* of the object tracking resource usage so far """ # we always return a copy, for consistency res = self._resource_usage.copy() @@ -586,7 +585,7 @@ class LoggingContextFilter(logging.Filter): True to include the record in the log output. """ context = current_context() - record.request = self._default_request # type: ignore + record.request = self._default_request # context should never be None, but if it somehow ends up being, then # we end up in a death spiral of infinite loops, so let's check, for @@ -594,21 +593,21 @@ class LoggingContextFilter(logging.Filter): if context is not None: # Logging is interested in the request ID. Note that for backwards # compatibility this is stored as the "request" on the record. - record.request = str(context) # type: ignore + record.request = str(context) # Add some data from the HTTP request. request = context.request if request is None: return True - record.ip_address = request.ip_address # type: ignore - record.site_tag = request.site_tag # type: ignore - record.requester = request.requester # type: ignore - record.authenticated_entity = request.authenticated_entity # type: ignore - record.method = request.method # type: ignore - record.url = request.url # type: ignore - record.protocol = request.protocol # type: ignore - record.user_agent = request.user_agent # type: ignore + record.ip_address = request.ip_address + record.site_tag = request.site_tag + record.requester = request.requester + record.authenticated_entity = request.authenticated_entity + record.method = request.method + record.url = request.url + record.protocol = request.protocol + record.user_agent = request.user_agent return True @@ -663,7 +662,8 @@ def current_context() -> LoggingContextOrSentinel: def set_current_context(context: LoggingContextOrSentinel) -> LoggingContextOrSentinel: """Set the current logging context in thread local storage Args: - context(LoggingContext): The context to activate. + context: The context to activate. + Returns: The context that was previously active """ @@ -700,7 +700,7 @@ def nested_logging_context(suffix: str) -> LoggingContext: suffix: suffix to add to the parent context's 'name'. Returns: - LoggingContext: new logging context. + A new logging context. """ curr_context = current_context() if not curr_context: @@ -898,20 +898,19 @@ def defer_to_thread( on it. Args: - reactor (twisted.internet.base.ReactorBase): The reactor in whose main thread - the Deferred will be invoked, and whose threadpool we should use for the - function. + reactor: The reactor in whose main thread the Deferred will be invoked, + and whose threadpool we should use for the function. Normally this will be hs.get_reactor(). - f (callable): The function to call. + f: The function to call. args: positional arguments to pass to f. kwargs: keyword arguments to pass to f. Returns: - Deferred: A Deferred which fires a callback with the result of `f`, or an + A Deferred which fires a callback with the result of `f`, or an errback if `f` throws an exception. """ return defer_to_threadpool(reactor, reactor.getThreadPool(), f, *args, **kwargs) @@ -939,20 +938,20 @@ def defer_to_threadpool( on it. Args: - reactor (twisted.internet.base.ReactorBase): The reactor in whose main thread - the Deferred will be invoked. Normally this will be hs.get_reactor(). + reactor: The reactor in whose main thread the Deferred will be invoked. + Normally this will be hs.get_reactor(). - threadpool (twisted.python.threadpool.ThreadPool): The threadpool to use for - running `f`. Normally this will be hs.get_reactor().getThreadPool(). + threadpool: The threadpool to use for running `f`. Normally this will be + hs.get_reactor().getThreadPool(). - f (callable): The function to call. + f: The function to call. args: positional arguments to pass to f. kwargs: keyword arguments to pass to f. Returns: - Deferred: A Deferred which fires a callback with the result of `f`, or an + A Deferred which fires a callback with the result of `f`, or an errback if `f` throws an exception. """ curr_context = current_context() diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py
index c1aa205eed..b69060854f 100644 --- a/synapse/logging/opentracing.py +++ b/synapse/logging/opentracing.py
@@ -173,6 +173,7 @@ from typing import ( Any, Callable, Collection, + ContextManager, Dict, Generator, Iterable, @@ -202,6 +203,9 @@ if TYPE_CHECKING: # Helper class +# Matches the number suffix in an instance name like "matrix.org client_reader-8" +STRIP_INSTANCE_NUMBER_SUFFIX_REGEX = re.compile(r"[_-]?\d+$") + class _DummyTagNames: """wrapper of opentracings tags. We need to have them if we @@ -294,6 +298,8 @@ class SynapseTags: # Whether the sync response has new data to be returned to the client. SYNC_RESULT = "sync.new_data" + INSTANCE_NAME = "instance_name" + # incoming HTTP request ID (as written in the logs) REQUEST_ID = "request_id" @@ -309,6 +315,19 @@ class SynapseTags: # The name of the external cache CACHE_NAME = "cache.name" + # Used to tag function arguments + # + # Tag a named arg. The name of the argument should be appended to this prefix. + FUNC_ARG_PREFIX = "ARG." + # Tag extra variadic number of positional arguments (`def foo(first, second, *extras)`) + FUNC_ARGS = "args" + # Tag keyword args + FUNC_KWARGS = "kwargs" + + # Some intermediate result that's interesting to the function. The label for + # the result should be appended to this prefix. + RESULT_PREFIX = "RESULT." + class SynapseBaggage: FORCE_TRACING = "synapse-force-tracing" @@ -427,9 +446,17 @@ def init_tracer(hs: "HomeServer") -> None: from jaeger_client.metrics.prometheus import PrometheusMetricsFactory + # Instance names are opaque strings but by stripping off the number suffix, + # we can get something that looks like a "worker type", e.g. + # "client_reader-1" -> "client_reader" so we don't spread the traces across + # so many services. + instance_name_by_type = re.sub( + STRIP_INSTANCE_NUMBER_SUFFIX_REGEX, "", hs.get_instance_name() + ) + config = JaegerConfig( config=hs.config.tracing.jaeger_config, - service_name=f"{hs.config.server.server_name} {hs.get_instance_name()}", + service_name=f"{hs.config.server.server_name} {instance_name_by_type}", scope_manager=LogContextScopeManager(), metrics_factory=PrometheusMetricsFactory(), ) @@ -694,7 +721,7 @@ def inject_header_dict( destination: address of entity receiving the span context. Must be given unless check_destination is False. The context will only be injected if the destination matches the opentracing whitelist - check_destination (bool): If false, destination will be ignored and the context + check_destination: If false, destination will be ignored and the context will always be injected. Note: @@ -753,7 +780,7 @@ def get_active_span_text_map(destination: Optional[str] = None) -> Dict[str, str destination: the name of the remote server. Returns: - dict: the active span's context if opentracing is enabled, otherwise empty. + the active span's context if opentracing is enabled, otherwise empty. """ if destination and not whitelisted_homeserver(destination): @@ -823,75 +850,117 @@ def extract_text_map(carrier: Dict[str, str]) -> Optional["opentracing.SpanConte # Tracing decorators -def trace_with_opname(opname: str) -> Callable[[Callable[P, R]], Callable[P, R]]: +def _custom_sync_async_decorator( + func: Callable[P, R], + wrapping_logic: Callable[[Callable[P, R], Any, Any], ContextManager[None]], +) -> Callable[P, R]: """ - Decorator to trace a function with a custom opname. - - See the module's doc string for usage examples. + Decorates a function that is sync or async (coroutines), or that returns a Twisted + `Deferred`. The custom business logic of the decorator goes in `wrapping_logic`. + + Example usage: + ```py + # Decorator to time the function and log it out + def duration(func: Callable[P, R]) -> Callable[P, R]: + @contextlib.contextmanager + def _wrapping_logic(func: Callable[P, R], *args: P.args, **kwargs: P.kwargs) -> Generator[None, None, None]: + start_ts = time.time() + try: + yield + finally: + end_ts = time.time() + duration = end_ts - start_ts + logger.info("%s took %s seconds", func.__name__, duration) + return _custom_sync_async_decorator(func, _wrapping_logic) + ``` + Args: + func: The function to be decorated + wrapping_logic: The business logic of your custom decorator. + This should be a ContextManager so you are able to run your logic + before/after the function as desired. """ - def decorator(func: Callable[P, R]) -> Callable[P, R]: - if opentracing is None: - return func # type: ignore[unreachable] + if inspect.iscoroutinefunction(func): - if inspect.iscoroutinefunction(func): + @wraps(func) + async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> R: + with wrapping_logic(func, *args, **kwargs): + return await func(*args, **kwargs) # type: ignore[misc] - @wraps(func) - async def _trace_inner(*args: P.args, **kwargs: P.kwargs) -> R: - with start_active_span(opname): - return await func(*args, **kwargs) # type: ignore[misc] + else: + # The other case here handles both sync functions and those + # decorated with inlineDeferred. + @wraps(func) + def _wrapper(*args: P.args, **kwargs: P.kwargs) -> R: + scope = wrapping_logic(func, *args, **kwargs) + scope.__enter__() - else: - # The other case here handles both sync functions and those - # decorated with inlineDeferred. - @wraps(func) - def _trace_inner(*args: P.args, **kwargs: P.kwargs) -> R: - scope = start_active_span(opname) - scope.__enter__() - - try: - result = func(*args, **kwargs) - if isinstance(result, defer.Deferred): - - def call_back(result: R) -> R: - scope.__exit__(None, None, None) - return result - - def err_back(result: R) -> R: - scope.__exit__(None, None, None) - return result - - result.addCallbacks(call_back, err_back) - - else: - if inspect.isawaitable(result): - logger.error( - "@trace may not have wrapped %s correctly! " - "The function is not async but returned a %s.", - func.__qualname__, - type(result).__name__, - ) + try: + result = func(*args, **kwargs) + if isinstance(result, defer.Deferred): + def call_back(result: R) -> R: scope.__exit__(None, None, None) + return result - return result + def err_back(result: R) -> R: + scope.__exit__(None, None, None) + return result + + result.addCallbacks(call_back, err_back) + + else: + if inspect.isawaitable(result): + logger.error( + "@trace may not have wrapped %s correctly! " + "The function is not async but returned a %s.", + func.__qualname__, + type(result).__name__, + ) - except Exception as e: - scope.__exit__(type(e), None, e.__traceback__) - raise + scope.__exit__(None, None, None) - return _trace_inner # type: ignore[return-value] + return result - return decorator + except Exception as e: + scope.__exit__(type(e), None, e.__traceback__) + raise + + return _wrapper # type: ignore[return-value] + + +def trace_with_opname( + opname: str, + *, + tracer: Optional["opentracing.Tracer"] = None, +) -> Callable[[Callable[P, R]], Callable[P, R]]: + """ + Decorator to trace a function with a custom opname. + See the module's doc string for usage examples. + """ + + # type-ignore: mypy bug, see https://github.com/python/mypy/issues/12909 + @contextlib.contextmanager # type: ignore[arg-type] + def _wrapping_logic( + func: Callable[P, R], *args: P.args, **kwargs: P.kwargs + ) -> Generator[None, None, None]: + with start_active_span(opname, tracer=tracer): + yield + + def _decorator(func: Callable[P, R]) -> Callable[P, R]: + if not opentracing: + return func + + return _custom_sync_async_decorator(func, _wrapping_logic) + + return _decorator def trace(func: Callable[P, R]) -> Callable[P, R]: """ Decorator to trace a function. - Sets the operation name to that of the function's name. - See the module's doc string for usage examples. """ @@ -900,22 +969,36 @@ def trace(func: Callable[P, R]) -> Callable[P, R]: def tag_args(func: Callable[P, R]) -> Callable[P, R]: """ - Tags all of the args to the active span. + Decorator to tag all of the args to the active span. + + Args: + func: `func` is assumed to be a method taking a `self` parameter, or a + `classmethod` taking a `cls` parameter. In either case, a tag is not + created for this parameter. """ if not opentracing: return func - @wraps(func) - def _tag_args_inner(*args: P.args, **kwargs: P.kwargs) -> R: + # type-ignore: mypy bug, see https://github.com/python/mypy/issues/12909 + @contextlib.contextmanager # type: ignore[arg-type] + def _wrapping_logic( + func: Callable[P, R], *args: P.args, **kwargs: P.kwargs + ) -> Generator[None, None, None]: argspec = inspect.getfullargspec(func) - for i, arg in enumerate(argspec.args[1:]): - set_tag("ARG_" + arg, str(args[i])) # type: ignore[index] - set_tag("args", str(args[len(argspec.args) :])) # type: ignore[index] - set_tag("kwargs", str(kwargs)) - return func(*args, **kwargs) + # We use `[1:]` to skip the `self` object reference and `start=1` to + # make the index line up with `argspec.args`. + # + # FIXME: We could update this to handle any type of function by ignoring the + # first argument only if it's named `self` or `cls`. This isn't fool-proof + # but handles the idiomatic cases. + for i, arg in enumerate(args[1:], start=1): + set_tag(SynapseTags.FUNC_ARG_PREFIX + argspec.args[i], str(arg)) + set_tag(SynapseTags.FUNC_ARGS, str(args[len(argspec.args) :])) + set_tag(SynapseTags.FUNC_KWARGS, str(kwargs)) + yield - return _tag_args_inner + return _custom_sync_async_decorator(func, _wrapping_logic) @contextlib.contextmanager @@ -962,11 +1045,11 @@ def trace_servlet( # with JsonResource). scope.span.set_operation_name(request.request_metrics.name) - # set the tags *after* the servlet completes, in case it decided to - # prioritise the span (tags will get dropped on unprioritised spans) request_tags[ SynapseTags.REQUEST_TAG ] = request.request_metrics.start_context.tag + # set the tags *after* the servlet completes, in case it decided to + # prioritise the span (tags will get dropped on unprioritised spans) for k, v in request_tags.items(): scope.span.set_tag(k, v)