diff --git a/synapse/logging/context.py b/synapse/logging/context.py
index fd9cb97920..f62bea968f 100644
--- a/synapse/logging/context.py
+++ b/synapse/logging/context.py
@@ -117,8 +117,7 @@ class ContextResourceUsage:
"""Create a new ContextResourceUsage
Args:
- copy_from (ContextResourceUsage|None): if not None, an object to
- copy stats from
+ copy_from: if not None, an object to copy stats from
"""
if copy_from is None:
self.reset()
@@ -162,7 +161,7 @@ class ContextResourceUsage:
"""Add another ContextResourceUsage's stats to this one's.
Args:
- other (ContextResourceUsage): the other resource usage object
+ other: the other resource usage object
"""
self.ru_utime += other.ru_utime
self.ru_stime += other.ru_stime
@@ -342,7 +341,7 @@ class LoggingContext:
called directly.
Returns:
- LoggingContext: the current logging context
+ The current logging context
"""
warnings.warn(
"synapse.logging.context.LoggingContext.current_context() is deprecated "
@@ -362,7 +361,8 @@ class LoggingContext:
called directly.
Args:
- context(LoggingContext): The context to activate.
+ context: The context to activate.
+
Returns:
The context that was previously active
"""
@@ -474,8 +474,7 @@ class LoggingContext:
"""Get resources used by this logcontext so far.
Returns:
- ContextResourceUsage: a *copy* of the object tracking resource
- usage so far
+ A *copy* of the object tracking resource usage so far
"""
# we always return a copy, for consistency
res = self._resource_usage.copy()
@@ -586,7 +585,7 @@ class LoggingContextFilter(logging.Filter):
True to include the record in the log output.
"""
context = current_context()
- record.request = self._default_request # type: ignore
+ record.request = self._default_request
# context should never be None, but if it somehow ends up being, then
# we end up in a death spiral of infinite loops, so let's check, for
@@ -594,21 +593,21 @@ class LoggingContextFilter(logging.Filter):
if context is not None:
# Logging is interested in the request ID. Note that for backwards
# compatibility this is stored as the "request" on the record.
- record.request = str(context) # type: ignore
+ record.request = str(context)
# Add some data from the HTTP request.
request = context.request
if request is None:
return True
- record.ip_address = request.ip_address # type: ignore
- record.site_tag = request.site_tag # type: ignore
- record.requester = request.requester # type: ignore
- record.authenticated_entity = request.authenticated_entity # type: ignore
- record.method = request.method # type: ignore
- record.url = request.url # type: ignore
- record.protocol = request.protocol # type: ignore
- record.user_agent = request.user_agent # type: ignore
+ record.ip_address = request.ip_address
+ record.site_tag = request.site_tag
+ record.requester = request.requester
+ record.authenticated_entity = request.authenticated_entity
+ record.method = request.method
+ record.url = request.url
+ record.protocol = request.protocol
+ record.user_agent = request.user_agent
return True
@@ -663,7 +662,8 @@ def current_context() -> LoggingContextOrSentinel:
def set_current_context(context: LoggingContextOrSentinel) -> LoggingContextOrSentinel:
"""Set the current logging context in thread local storage
Args:
- context(LoggingContext): The context to activate.
+ context: The context to activate.
+
Returns:
The context that was previously active
"""
@@ -700,7 +700,7 @@ def nested_logging_context(suffix: str) -> LoggingContext:
suffix: suffix to add to the parent context's 'name'.
Returns:
- LoggingContext: new logging context.
+ A new logging context.
"""
curr_context = current_context()
if not curr_context:
@@ -898,20 +898,19 @@ def defer_to_thread(
on it.
Args:
- reactor (twisted.internet.base.ReactorBase): The reactor in whose main thread
- the Deferred will be invoked, and whose threadpool we should use for the
- function.
+ reactor: The reactor in whose main thread the Deferred will be invoked,
+ and whose threadpool we should use for the function.
Normally this will be hs.get_reactor().
- f (callable): The function to call.
+ f: The function to call.
args: positional arguments to pass to f.
kwargs: keyword arguments to pass to f.
Returns:
- Deferred: A Deferred which fires a callback with the result of `f`, or an
+ A Deferred which fires a callback with the result of `f`, or an
errback if `f` throws an exception.
"""
return defer_to_threadpool(reactor, reactor.getThreadPool(), f, *args, **kwargs)
@@ -939,20 +938,20 @@ def defer_to_threadpool(
on it.
Args:
- reactor (twisted.internet.base.ReactorBase): The reactor in whose main thread
- the Deferred will be invoked. Normally this will be hs.get_reactor().
+ reactor: The reactor in whose main thread the Deferred will be invoked.
+ Normally this will be hs.get_reactor().
- threadpool (twisted.python.threadpool.ThreadPool): The threadpool to use for
- running `f`. Normally this will be hs.get_reactor().getThreadPool().
+ threadpool: The threadpool to use for running `f`. Normally this will be
+ hs.get_reactor().getThreadPool().
- f (callable): The function to call.
+ f: The function to call.
args: positional arguments to pass to f.
kwargs: keyword arguments to pass to f.
Returns:
- Deferred: A Deferred which fires a callback with the result of `f`, or an
+ A Deferred which fires a callback with the result of `f`, or an
errback if `f` throws an exception.
"""
curr_context = current_context()
diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py
index c1aa205eed..b69060854f 100644
--- a/synapse/logging/opentracing.py
+++ b/synapse/logging/opentracing.py
@@ -173,6 +173,7 @@ from typing import (
Any,
Callable,
Collection,
+ ContextManager,
Dict,
Generator,
Iterable,
@@ -202,6 +203,9 @@ if TYPE_CHECKING:
# Helper class
+# Matches the number suffix in an instance name like "matrix.org client_reader-8"
+STRIP_INSTANCE_NUMBER_SUFFIX_REGEX = re.compile(r"[_-]?\d+$")
+
class _DummyTagNames:
"""wrapper of opentracings tags. We need to have them if we
@@ -294,6 +298,8 @@ class SynapseTags:
# Whether the sync response has new data to be returned to the client.
SYNC_RESULT = "sync.new_data"
+ INSTANCE_NAME = "instance_name"
+
# incoming HTTP request ID (as written in the logs)
REQUEST_ID = "request_id"
@@ -309,6 +315,19 @@ class SynapseTags:
# The name of the external cache
CACHE_NAME = "cache.name"
+ # Used to tag function arguments
+ #
+ # Tag a named arg. The name of the argument should be appended to this prefix.
+ FUNC_ARG_PREFIX = "ARG."
+ # Tag extra variadic number of positional arguments (`def foo(first, second, *extras)`)
+ FUNC_ARGS = "args"
+ # Tag keyword args
+ FUNC_KWARGS = "kwargs"
+
+ # Some intermediate result that's interesting to the function. The label for
+ # the result should be appended to this prefix.
+ RESULT_PREFIX = "RESULT."
+
class SynapseBaggage:
FORCE_TRACING = "synapse-force-tracing"
@@ -427,9 +446,17 @@ def init_tracer(hs: "HomeServer") -> None:
from jaeger_client.metrics.prometheus import PrometheusMetricsFactory
+ # Instance names are opaque strings but by stripping off the number suffix,
+ # we can get something that looks like a "worker type", e.g.
+ # "client_reader-1" -> "client_reader" so we don't spread the traces across
+ # so many services.
+ instance_name_by_type = re.sub(
+ STRIP_INSTANCE_NUMBER_SUFFIX_REGEX, "", hs.get_instance_name()
+ )
+
config = JaegerConfig(
config=hs.config.tracing.jaeger_config,
- service_name=f"{hs.config.server.server_name} {hs.get_instance_name()}",
+ service_name=f"{hs.config.server.server_name} {instance_name_by_type}",
scope_manager=LogContextScopeManager(),
metrics_factory=PrometheusMetricsFactory(),
)
@@ -694,7 +721,7 @@ def inject_header_dict(
destination: address of entity receiving the span context. Must be given unless
check_destination is False. The context will only be injected if the
destination matches the opentracing whitelist
- check_destination (bool): If false, destination will be ignored and the context
+ check_destination: If false, destination will be ignored and the context
will always be injected.
Note:
@@ -753,7 +780,7 @@ def get_active_span_text_map(destination: Optional[str] = None) -> Dict[str, str
destination: the name of the remote server.
Returns:
- dict: the active span's context if opentracing is enabled, otherwise empty.
+ the active span's context if opentracing is enabled, otherwise empty.
"""
if destination and not whitelisted_homeserver(destination):
@@ -823,75 +850,117 @@ def extract_text_map(carrier: Dict[str, str]) -> Optional["opentracing.SpanConte
# Tracing decorators
-def trace_with_opname(opname: str) -> Callable[[Callable[P, R]], Callable[P, R]]:
+def _custom_sync_async_decorator(
+ func: Callable[P, R],
+ wrapping_logic: Callable[[Callable[P, R], Any, Any], ContextManager[None]],
+) -> Callable[P, R]:
"""
- Decorator to trace a function with a custom opname.
-
- See the module's doc string for usage examples.
+ Decorates a function that is sync or async (coroutines), or that returns a Twisted
+ `Deferred`. The custom business logic of the decorator goes in `wrapping_logic`.
+
+ Example usage:
+ ```py
+ # Decorator to time the function and log it out
+ def duration(func: Callable[P, R]) -> Callable[P, R]:
+ @contextlib.contextmanager
+ def _wrapping_logic(func: Callable[P, R], *args: P.args, **kwargs: P.kwargs) -> Generator[None, None, None]:
+ start_ts = time.time()
+ try:
+ yield
+ finally:
+ end_ts = time.time()
+ duration = end_ts - start_ts
+ logger.info("%s took %s seconds", func.__name__, duration)
+ return _custom_sync_async_decorator(func, _wrapping_logic)
+ ```
+ Args:
+ func: The function to be decorated
+ wrapping_logic: The business logic of your custom decorator.
+ This should be a ContextManager so you are able to run your logic
+ before/after the function as desired.
"""
- def decorator(func: Callable[P, R]) -> Callable[P, R]:
- if opentracing is None:
- return func # type: ignore[unreachable]
+ if inspect.iscoroutinefunction(func):
- if inspect.iscoroutinefunction(func):
+ @wraps(func)
+ async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
+ with wrapping_logic(func, *args, **kwargs):
+ return await func(*args, **kwargs) # type: ignore[misc]
- @wraps(func)
- async def _trace_inner(*args: P.args, **kwargs: P.kwargs) -> R:
- with start_active_span(opname):
- return await func(*args, **kwargs) # type: ignore[misc]
+ else:
+ # The other case here handles both sync functions and those
+ # decorated with inlineDeferred.
+ @wraps(func)
+ def _wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
+ scope = wrapping_logic(func, *args, **kwargs)
+ scope.__enter__()
- else:
- # The other case here handles both sync functions and those
- # decorated with inlineDeferred.
- @wraps(func)
- def _trace_inner(*args: P.args, **kwargs: P.kwargs) -> R:
- scope = start_active_span(opname)
- scope.__enter__()
-
- try:
- result = func(*args, **kwargs)
- if isinstance(result, defer.Deferred):
-
- def call_back(result: R) -> R:
- scope.__exit__(None, None, None)
- return result
-
- def err_back(result: R) -> R:
- scope.__exit__(None, None, None)
- return result
-
- result.addCallbacks(call_back, err_back)
-
- else:
- if inspect.isawaitable(result):
- logger.error(
- "@trace may not have wrapped %s correctly! "
- "The function is not async but returned a %s.",
- func.__qualname__,
- type(result).__name__,
- )
+ try:
+ result = func(*args, **kwargs)
+ if isinstance(result, defer.Deferred):
+ def call_back(result: R) -> R:
scope.__exit__(None, None, None)
+ return result
- return result
+ def err_back(result: R) -> R:
+ scope.__exit__(None, None, None)
+ return result
+
+ result.addCallbacks(call_back, err_back)
+
+ else:
+ if inspect.isawaitable(result):
+ logger.error(
+ "@trace may not have wrapped %s correctly! "
+ "The function is not async but returned a %s.",
+ func.__qualname__,
+ type(result).__name__,
+ )
- except Exception as e:
- scope.__exit__(type(e), None, e.__traceback__)
- raise
+ scope.__exit__(None, None, None)
- return _trace_inner # type: ignore[return-value]
+ return result
- return decorator
+ except Exception as e:
+ scope.__exit__(type(e), None, e.__traceback__)
+ raise
+
+ return _wrapper # type: ignore[return-value]
+
+
+def trace_with_opname(
+ opname: str,
+ *,
+ tracer: Optional["opentracing.Tracer"] = None,
+) -> Callable[[Callable[P, R]], Callable[P, R]]:
+ """
+ Decorator to trace a function with a custom opname.
+ See the module's doc string for usage examples.
+ """
+
+ # type-ignore: mypy bug, see https://github.com/python/mypy/issues/12909
+ @contextlib.contextmanager # type: ignore[arg-type]
+ def _wrapping_logic(
+ func: Callable[P, R], *args: P.args, **kwargs: P.kwargs
+ ) -> Generator[None, None, None]:
+ with start_active_span(opname, tracer=tracer):
+ yield
+
+ def _decorator(func: Callable[P, R]) -> Callable[P, R]:
+ if not opentracing:
+ return func
+
+ return _custom_sync_async_decorator(func, _wrapping_logic)
+
+ return _decorator
def trace(func: Callable[P, R]) -> Callable[P, R]:
"""
Decorator to trace a function.
-
Sets the operation name to that of the function's name.
-
See the module's doc string for usage examples.
"""
@@ -900,22 +969,36 @@ def trace(func: Callable[P, R]) -> Callable[P, R]:
def tag_args(func: Callable[P, R]) -> Callable[P, R]:
"""
- Tags all of the args to the active span.
+ Decorator to tag all of the args to the active span.
+
+ Args:
+ func: `func` is assumed to be a method taking a `self` parameter, or a
+ `classmethod` taking a `cls` parameter. In either case, a tag is not
+ created for this parameter.
"""
if not opentracing:
return func
- @wraps(func)
- def _tag_args_inner(*args: P.args, **kwargs: P.kwargs) -> R:
+ # type-ignore: mypy bug, see https://github.com/python/mypy/issues/12909
+ @contextlib.contextmanager # type: ignore[arg-type]
+ def _wrapping_logic(
+ func: Callable[P, R], *args: P.args, **kwargs: P.kwargs
+ ) -> Generator[None, None, None]:
argspec = inspect.getfullargspec(func)
- for i, arg in enumerate(argspec.args[1:]):
- set_tag("ARG_" + arg, str(args[i])) # type: ignore[index]
- set_tag("args", str(args[len(argspec.args) :])) # type: ignore[index]
- set_tag("kwargs", str(kwargs))
- return func(*args, **kwargs)
+ # We use `[1:]` to skip the `self` object reference and `start=1` to
+ # make the index line up with `argspec.args`.
+ #
+ # FIXME: We could update this to handle any type of function by ignoring the
+ # first argument only if it's named `self` or `cls`. This isn't fool-proof
+ # but handles the idiomatic cases.
+ for i, arg in enumerate(args[1:], start=1):
+ set_tag(SynapseTags.FUNC_ARG_PREFIX + argspec.args[i], str(arg))
+ set_tag(SynapseTags.FUNC_ARGS, str(args[len(argspec.args) :]))
+ set_tag(SynapseTags.FUNC_KWARGS, str(kwargs))
+ yield
- return _tag_args_inner
+ return _custom_sync_async_decorator(func, _wrapping_logic)
@contextlib.contextmanager
@@ -962,11 +1045,11 @@ def trace_servlet(
# with JsonResource).
scope.span.set_operation_name(request.request_metrics.name)
- # set the tags *after* the servlet completes, in case it decided to
- # prioritise the span (tags will get dropped on unprioritised spans)
request_tags[
SynapseTags.REQUEST_TAG
] = request.request_metrics.start_context.tag
+ # set the tags *after* the servlet completes, in case it decided to
+ # prioritise the span (tags will get dropped on unprioritised spans)
for k, v in request_tags.items():
scope.span.set_tag(k, v)
|