diff options
Diffstat (limited to 'synapse/logging')
-rw-r--r-- | synapse/logging/_structured.py | 6 | ||||
-rw-r--r-- | synapse/logging/_terse_json.py | 4 | ||||
-rw-r--r-- | synapse/logging/context.py | 35 | ||||
-rw-r--r-- | synapse/logging/formatter.py | 3 | ||||
-rw-r--r-- | synapse/logging/opentracing.py | 173 | ||||
-rw-r--r-- | synapse/logging/scopecontextmanager.py | 2 | ||||
-rw-r--r-- | synapse/logging/utils.py | 126 |
7 files changed, 128 insertions, 221 deletions
diff --git a/synapse/logging/_structured.py b/synapse/logging/_structured.py index 7372450b45..144506c8f2 100644 --- a/synapse/logging/_structured.py +++ b/synapse/logging/_structured.py @@ -55,7 +55,7 @@ def stdlib_log_level_to_twisted(level: str) -> LogLevel: @attr.s @implementer(ILogObserver) -class LogContextObserver(object): +class LogContextObserver: """ An ILogObserver which adds Synapse-specific log context information. @@ -169,7 +169,7 @@ class OutputPipeType(Values): @attr.s -class DrainConfiguration(object): +class DrainConfiguration: name = attr.ib() type = attr.ib() location = attr.ib() @@ -177,7 +177,7 @@ class DrainConfiguration(object): @attr.s -class NetworkJSONTerseOptions(object): +class NetworkJSONTerseOptions: maximum_buffer = attr.ib(type=int) diff --git a/synapse/logging/_terse_json.py b/synapse/logging/_terse_json.py index c0b9384189..1b8916cfa2 100644 --- a/synapse/logging/_terse_json.py +++ b/synapse/logging/_terse_json.py @@ -152,7 +152,7 @@ def TerseJSONToConsoleLogObserver(outFile: IO[str], metadata: dict) -> FileLogOb @attr.s @implementer(IPushProducer) -class LogProducer(object): +class LogProducer: """ An IPushProducer that writes logs from its buffer to its transport when it is resumed. @@ -190,7 +190,7 @@ class LogProducer(object): @attr.s @implementer(ILogObserver) -class TerseJSONToTCPLogObserver(object): +class TerseJSONToTCPLogObserver: """ An IObserver that writes JSON logs to a TCP target. diff --git a/synapse/logging/context.py b/synapse/logging/context.py index 8b9c4e38bd..22598e02d2 100644 --- a/synapse/logging/context.py +++ b/synapse/logging/context.py @@ -74,7 +74,7 @@ except Exception: get_thread_id = threading.get_ident -class ContextResourceUsage(object): +class ContextResourceUsage: """Object for tracking the resources used by a log context Attributes: @@ -179,7 +179,7 @@ class ContextResourceUsage(object): LoggingContextOrSentinel = Union["LoggingContext", "_Sentinel"] -class _Sentinel(object): +class _Sentinel: """Sentinel to represent the root context""" __slots__ = ["previous_context", "finished", "request", "scope", "tag"] @@ -226,7 +226,7 @@ class _Sentinel(object): SENTINEL_CONTEXT = _Sentinel() -class LoggingContext(object): +class LoggingContext: """Additional context for log formatting. Contexts are scoped within a "with" block. @@ -566,36 +566,33 @@ class LoggingContextFilter(logging.Filter): return True -class PreserveLoggingContext(object): - """Captures the current logging context and restores it when the scope is - exited. Used to restore the context after a function using - @defer.inlineCallbacks is resumed by a callback from the reactor.""" +class PreserveLoggingContext: + """Context manager which replaces the logging context - __slots__ = ["current_context", "new_context", "has_parent"] + The previous logging context is restored on exit.""" + + __slots__ = ["_old_context", "_new_context"] def __init__( self, new_context: LoggingContextOrSentinel = SENTINEL_CONTEXT ) -> None: - self.new_context = new_context + self._new_context = new_context def __enter__(self) -> None: - """Captures the current logging context""" - self.current_context = set_current_context(self.new_context) - - if self.current_context: - self.has_parent = self.current_context.previous_context is not None + self._old_context = set_current_context(self._new_context) def __exit__(self, type, value, traceback) -> None: - """Restores the current logging context""" - context = set_current_context(self.current_context) + context = set_current_context(self._old_context) - if context != self.new_context: + if context != self._new_context: if not context: - logger.warning("Expected logging context %s was lost", self.new_context) + logger.warning( + "Expected logging context %s was lost", self._new_context + ) else: logger.warning( "Expected logging context %s but found %s", - self.new_context, + self._new_context, context, ) diff --git a/synapse/logging/formatter.py b/synapse/logging/formatter.py index fbf570c756..d736ad5b9b 100644 --- a/synapse/logging/formatter.py +++ b/synapse/logging/formatter.py @@ -16,8 +16,7 @@ import logging import traceback - -from six import StringIO +from io import StringIO class LogFormatter(logging.Formatter): diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py index 5dddf57008..7df0aa197d 100644 --- a/synapse/logging/opentracing.py +++ b/synapse/logging/opentracing.py @@ -164,28 +164,28 @@ Gotchas than one caller? Will all of those calling functions have be in a context with an active span? """ - import contextlib import inspect import logging import re -import types from functools import wraps -from typing import TYPE_CHECKING, Dict +from typing import TYPE_CHECKING, Dict, Optional, Type -from canonicaljson import json +import attr from twisted.internet import defer from synapse.config import ConfigError +from synapse.util import json_decoder, json_encoder if TYPE_CHECKING: + from synapse.http.site import SynapseRequest from synapse.server import HomeServer # Helper class -class _DummyTagNames(object): +class _DummyTagNames: """wrapper of opentracings tags. We need to have them if we want to reference them without opentracing around. Clearly they should never actually show up in a trace. `set_tags` overwrites @@ -226,12 +226,37 @@ except ImportError: tags = _DummyTagNames try: from jaeger_client import Config as JaegerConfig + from synapse.logging.scopecontextmanager import LogContextScopeManager except ImportError: JaegerConfig = None # type: ignore LogContextScopeManager = None # type: ignore +try: + from rust_python_jaeger_reporter import Reporter + + @attr.s(slots=True, frozen=True) + class _WrappedRustReporter: + """Wrap the reporter to ensure `report_span` never throws. + """ + + _reporter = attr.ib(type=Reporter, default=attr.Factory(Reporter)) + + def set_process(self, *args, **kwargs): + return self._reporter.set_process(*args, **kwargs) + + def report_span(self, span): + try: + return self._reporter.report_span(span) + except Exception: + logger.exception("Failed to report span") + + RustReporter = _WrappedRustReporter # type: Optional[Type[_WrappedRustReporter]] +except ImportError: + RustReporter = None + + logger = logging.getLogger(__name__) @@ -320,11 +345,19 @@ def init_tracer(hs: "HomeServer"): set_homeserver_whitelist(hs.config.opentracer_whitelist) - JaegerConfig( + config = JaegerConfig( config=hs.config.jaeger_config, service_name="{} {}".format(hs.config.server_name, hs.get_instance_name()), scope_manager=LogContextScopeManager(hs.config), - ).initialize_tracer() + ) + + # If we have the rust jaeger reporter available let's use that. + if RustReporter: + logger.info("Using rust_python_jaeger_reporter library") + tracer = config.create_tracer(RustReporter(), config.sampler) + opentracing.set_global_tracer(tracer) + else: + config.initialize_tracer() # Whitelisting @@ -466,7 +499,9 @@ def start_active_span_from_edu( if opentracing is None: return _noop_context_manager() - carrier = json.loads(edu_content.get("context", "{}")).get("opentracing", {}) + carrier = json_decoder.decode(edu_content.get("context", "{}")).get( + "opentracing", {} + ) context = opentracing.tracer.extract(opentracing.Format.TEXT_MAP, carrier) _references = [ opentracing.child_of(span_context_from_string(x)) @@ -657,7 +692,7 @@ def active_span_context_as_string(): opentracing.tracer.inject( opentracing.tracer.active_span, opentracing.Format.TEXT_MAP, carrier ) - return json.dumps(carrier) + return json_encoder.encode(carrier) @only_if_tracing @@ -666,7 +701,7 @@ def span_context_from_string(carrier): Returns: The active span context decoded from a string. """ - carrier = json.loads(carrier) + carrier = json_decoder.decode(carrier) return opentracing.tracer.extract(opentracing.Format.TEXT_MAP, carrier) @@ -700,37 +735,43 @@ def trace(func=None, opname=None): _opname = opname if opname else func.__name__ - @wraps(func) - def _trace_inner(*args, **kwargs): - if opentracing is None: - return func(*args, **kwargs) + if inspect.iscoroutinefunction(func): - scope = start_active_span(_opname) - scope.__enter__() + @wraps(func) + async def _trace_inner(*args, **kwargs): + with start_active_span(_opname): + return await func(*args, **kwargs) - try: - result = func(*args, **kwargs) - if isinstance(result, defer.Deferred): + else: + # The other case here handles both sync functions and those + # decorated with inlineDeferred. + @wraps(func) + def _trace_inner(*args, **kwargs): + scope = start_active_span(_opname) + scope.__enter__() - def call_back(result): - scope.__exit__(None, None, None) - return result + try: + result = func(*args, **kwargs) + if isinstance(result, defer.Deferred): - def err_back(result): - scope.span.set_tag(tags.ERROR, True) - scope.__exit__(None, None, None) - return result + def call_back(result): + scope.__exit__(None, None, None) + return result - result.addCallbacks(call_back, err_back) + def err_back(result): + scope.__exit__(None, None, None) + return result - else: - scope.__exit__(None, None, None) + result.addCallbacks(call_back, err_back) - return result + else: + scope.__exit__(None, None, None) - except Exception as e: - scope.__exit__(type(e), None, e.__traceback__) - raise + return result + + except Exception as e: + scope.__exit__(type(e), None, e.__traceback__) + raise return _trace_inner @@ -760,48 +801,42 @@ def tag_args(func): return _tag_args_inner -def trace_servlet(servlet_name, extract_context=False): - """Decorator which traces a serlet. It starts a span with some servlet specific - tags such as the servlet_name and request information +@contextlib.contextmanager +def trace_servlet(request: "SynapseRequest", extract_context: bool = False): + """Returns a context manager which traces a request. It starts a span + with some servlet specific tags such as the request metrics name and + request information. Args: - servlet_name (str): The name to be used for the span's operation_name - extract_context (bool): Whether to attempt to extract the opentracing + request + extract_context: Whether to attempt to extract the opentracing context from the request the servlet is handling. - """ - def _trace_servlet_inner_1(func): - if not opentracing: - return func - - @wraps(func) - async def _trace_servlet_inner(request, *args, **kwargs): - request_tags = { - "request_id": request.get_request_id(), - tags.SPAN_KIND: tags.SPAN_KIND_RPC_SERVER, - tags.HTTP_METHOD: request.get_method(), - tags.HTTP_URL: request.get_redacted_uri(), - tags.PEER_HOST_IPV6: request.getClientIP(), - } - - if extract_context: - scope = start_active_span_from_request( - request, servlet_name, tags=request_tags - ) - else: - scope = start_active_span(servlet_name, tags=request_tags) - - with scope: - result = func(request, *args, **kwargs) + if opentracing is None: + yield + return - if not isinstance(result, (types.CoroutineType, defer.Deferred)): - # Some servlets aren't async and just return results - # directly, so we handle that here. - return result + request_tags = { + "request_id": request.get_request_id(), + tags.SPAN_KIND: tags.SPAN_KIND_RPC_SERVER, + tags.HTTP_METHOD: request.get_method(), + tags.HTTP_URL: request.get_redacted_uri(), + tags.PEER_HOST_IPV6: request.getClientIP(), + } - return await result + request_name = request.request_metrics.name + if extract_context: + scope = start_active_span_from_request(request, request_name, tags=request_tags) + else: + scope = start_active_span(request_name, tags=request_tags) - return _trace_servlet_inner + with scope: + try: + yield + finally: + # We set the operation name again in case its changed (which happens + # with JsonResource). + scope.span.set_operation_name(request.request_metrics.name) - return _trace_servlet_inner_1 + scope.span.set_tag("request_tag", request.request_metrics.start_context.tag) diff --git a/synapse/logging/scopecontextmanager.py b/synapse/logging/scopecontextmanager.py index dc3ab00cbb..026854b4c7 100644 --- a/synapse/logging/scopecontextmanager.py +++ b/synapse/logging/scopecontextmanager.py @@ -116,6 +116,8 @@ class _LogContextScope(Scope): if self._enter_logcontext: self.logcontext.__enter__() + return self + def __exit__(self, type, value, traceback): if type == twisted.internet.defer._DefGen_Return: super(_LogContextScope, self).__exit__(None, None, None) diff --git a/synapse/logging/utils.py b/synapse/logging/utils.py index 99049bb5d8..fea774e2e5 100644 --- a/synapse/logging/utils.py +++ b/synapse/logging/utils.py @@ -14,9 +14,7 @@ # limitations under the License. -import inspect import logging -import time from functools import wraps from inspect import getcallargs @@ -74,127 +72,3 @@ def log_function(f): wrapped.__name__ = func_name return wrapped - - -def time_function(f): - func_name = f.__name__ - - @wraps(f) - def wrapped(*args, **kwargs): - global _TIME_FUNC_ID - id = _TIME_FUNC_ID - _TIME_FUNC_ID += 1 - - start = time.clock() - - try: - _log_debug_as_f(f, "[FUNC START] {%s-%d}", (func_name, id)) - - r = f(*args, **kwargs) - finally: - end = time.clock() - _log_debug_as_f( - f, "[FUNC END] {%s-%d} %.3f sec", (func_name, id, end - start) - ) - - return r - - return wrapped - - -def trace_function(f): - func_name = f.__name__ - linenum = f.func_code.co_firstlineno - pathname = f.func_code.co_filename - - @wraps(f) - def wrapped(*args, **kwargs): - name = f.__module__ - logger = logging.getLogger(name) - level = logging.DEBUG - - frame = inspect.currentframe() - if frame is None: - raise Exception("Can't get current frame!") - - s = frame.f_back - - to_print = [ - "\t%s:%s %s. Args: args=%s, kwargs=%s" - % (pathname, linenum, func_name, args, kwargs) - ] - while s: - if True or s.f_globals["__name__"].startswith("synapse"): - filename, lineno, function, _, _ = inspect.getframeinfo(s) - args_string = inspect.formatargvalues(*inspect.getargvalues(s)) - - to_print.append( - "\t%s:%d %s. Args: %s" % (filename, lineno, function, args_string) - ) - - s = s.f_back - - msg = "\nTraceback for %s:\n" % (func_name,) + "\n".join(to_print) - - record = logging.LogRecord( - name=name, - level=level, - pathname=pathname, - lineno=lineno, - msg=msg, - args=(), - exc_info=None, - ) - - logger.handle(record) - - return f(*args, **kwargs) - - wrapped.__name__ = func_name - return wrapped - - -def get_previous_frames(): - - frame = inspect.currentframe() - if frame is None: - raise Exception("Can't get current frame!") - - s = frame.f_back.f_back - to_return = [] - while s: - if s.f_globals["__name__"].startswith("synapse"): - filename, lineno, function, _, _ = inspect.getframeinfo(s) - args_string = inspect.formatargvalues(*inspect.getargvalues(s)) - - to_return.append( - "{{ %s:%d %s - Args: %s }}" % (filename, lineno, function, args_string) - ) - - s = s.f_back - - return ", ".join(to_return) - - -def get_previous_frame(ignore=[]): - frame = inspect.currentframe() - if frame is None: - raise Exception("Can't get current frame!") - s = frame.f_back.f_back - - while s: - if s.f_globals["__name__"].startswith("synapse"): - if not any(s.f_globals["__name__"].startswith(ig) for ig in ignore): - filename, lineno, function, _, _ = inspect.getframeinfo(s) - args_string = inspect.formatargvalues(*inspect.getargvalues(s)) - - return "{{ %s:%d %s - Args: %s }}" % ( - filename, - lineno, - function, - args_string, - ) - - s = s.f_back - - return None |