diff --git a/synapse/logging/_structured.py b/synapse/logging/_structured.py
index 7372450b45..144506c8f2 100644
--- a/synapse/logging/_structured.py
+++ b/synapse/logging/_structured.py
@@ -55,7 +55,7 @@ def stdlib_log_level_to_twisted(level: str) -> LogLevel:
@attr.s
@implementer(ILogObserver)
-class LogContextObserver(object):
+class LogContextObserver:
"""
An ILogObserver which adds Synapse-specific log context information.
@@ -169,7 +169,7 @@ class OutputPipeType(Values):
@attr.s
-class DrainConfiguration(object):
+class DrainConfiguration:
name = attr.ib()
type = attr.ib()
location = attr.ib()
@@ -177,7 +177,7 @@ class DrainConfiguration(object):
@attr.s
-class NetworkJSONTerseOptions(object):
+class NetworkJSONTerseOptions:
maximum_buffer = attr.ib(type=int)
diff --git a/synapse/logging/_terse_json.py b/synapse/logging/_terse_json.py
index c0b9384189..1b8916cfa2 100644
--- a/synapse/logging/_terse_json.py
+++ b/synapse/logging/_terse_json.py
@@ -152,7 +152,7 @@ def TerseJSONToConsoleLogObserver(outFile: IO[str], metadata: dict) -> FileLogOb
@attr.s
@implementer(IPushProducer)
-class LogProducer(object):
+class LogProducer:
"""
An IPushProducer that writes logs from its buffer to its transport when it
is resumed.
@@ -190,7 +190,7 @@ class LogProducer(object):
@attr.s
@implementer(ILogObserver)
-class TerseJSONToTCPLogObserver(object):
+class TerseJSONToTCPLogObserver:
"""
An IObserver that writes JSON logs to a TCP target.
diff --git a/synapse/logging/context.py b/synapse/logging/context.py
index 8b9c4e38bd..22598e02d2 100644
--- a/synapse/logging/context.py
+++ b/synapse/logging/context.py
@@ -74,7 +74,7 @@ except Exception:
get_thread_id = threading.get_ident
-class ContextResourceUsage(object):
+class ContextResourceUsage:
"""Object for tracking the resources used by a log context
Attributes:
@@ -179,7 +179,7 @@ class ContextResourceUsage(object):
LoggingContextOrSentinel = Union["LoggingContext", "_Sentinel"]
-class _Sentinel(object):
+class _Sentinel:
"""Sentinel to represent the root context"""
__slots__ = ["previous_context", "finished", "request", "scope", "tag"]
@@ -226,7 +226,7 @@ class _Sentinel(object):
SENTINEL_CONTEXT = _Sentinel()
-class LoggingContext(object):
+class LoggingContext:
"""Additional context for log formatting. Contexts are scoped within a
"with" block.
@@ -566,36 +566,33 @@ class LoggingContextFilter(logging.Filter):
return True
-class PreserveLoggingContext(object):
- """Captures the current logging context and restores it when the scope is
- exited. Used to restore the context after a function using
- @defer.inlineCallbacks is resumed by a callback from the reactor."""
+class PreserveLoggingContext:
+ """Context manager which replaces the logging context
- __slots__ = ["current_context", "new_context", "has_parent"]
+ The previous logging context is restored on exit."""
+
+ __slots__ = ["_old_context", "_new_context"]
def __init__(
self, new_context: LoggingContextOrSentinel = SENTINEL_CONTEXT
) -> None:
- self.new_context = new_context
+ self._new_context = new_context
def __enter__(self) -> None:
- """Captures the current logging context"""
- self.current_context = set_current_context(self.new_context)
-
- if self.current_context:
- self.has_parent = self.current_context.previous_context is not None
+ self._old_context = set_current_context(self._new_context)
def __exit__(self, type, value, traceback) -> None:
- """Restores the current logging context"""
- context = set_current_context(self.current_context)
+ context = set_current_context(self._old_context)
- if context != self.new_context:
+ if context != self._new_context:
if not context:
- logger.warning("Expected logging context %s was lost", self.new_context)
+ logger.warning(
+ "Expected logging context %s was lost", self._new_context
+ )
else:
logger.warning(
"Expected logging context %s but found %s",
- self.new_context,
+ self._new_context,
context,
)
diff --git a/synapse/logging/formatter.py b/synapse/logging/formatter.py
index fbf570c756..d736ad5b9b 100644
--- a/synapse/logging/formatter.py
+++ b/synapse/logging/formatter.py
@@ -16,8 +16,7 @@
import logging
import traceback
-
-from six import StringIO
+from io import StringIO
class LogFormatter(logging.Formatter):
diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py
index 5dddf57008..7df0aa197d 100644
--- a/synapse/logging/opentracing.py
+++ b/synapse/logging/opentracing.py
@@ -164,28 +164,28 @@ Gotchas
than one caller? Will all of those calling functions have be in a context
with an active span?
"""
-
import contextlib
import inspect
import logging
import re
-import types
from functools import wraps
-from typing import TYPE_CHECKING, Dict
+from typing import TYPE_CHECKING, Dict, Optional, Type
-from canonicaljson import json
+import attr
from twisted.internet import defer
from synapse.config import ConfigError
+from synapse.util import json_decoder, json_encoder
if TYPE_CHECKING:
+ from synapse.http.site import SynapseRequest
from synapse.server import HomeServer
# Helper class
-class _DummyTagNames(object):
+class _DummyTagNames:
"""wrapper of opentracings tags. We need to have them if we
want to reference them without opentracing around. Clearly they
should never actually show up in a trace. `set_tags` overwrites
@@ -226,12 +226,37 @@ except ImportError:
tags = _DummyTagNames
try:
from jaeger_client import Config as JaegerConfig
+
from synapse.logging.scopecontextmanager import LogContextScopeManager
except ImportError:
JaegerConfig = None # type: ignore
LogContextScopeManager = None # type: ignore
+try:
+ from rust_python_jaeger_reporter import Reporter
+
+ @attr.s(slots=True, frozen=True)
+ class _WrappedRustReporter:
+ """Wrap the reporter to ensure `report_span` never throws.
+ """
+
+ _reporter = attr.ib(type=Reporter, default=attr.Factory(Reporter))
+
+ def set_process(self, *args, **kwargs):
+ return self._reporter.set_process(*args, **kwargs)
+
+ def report_span(self, span):
+ try:
+ return self._reporter.report_span(span)
+ except Exception:
+ logger.exception("Failed to report span")
+
+ RustReporter = _WrappedRustReporter # type: Optional[Type[_WrappedRustReporter]]
+except ImportError:
+ RustReporter = None
+
+
logger = logging.getLogger(__name__)
@@ -320,11 +345,19 @@ def init_tracer(hs: "HomeServer"):
set_homeserver_whitelist(hs.config.opentracer_whitelist)
- JaegerConfig(
+ config = JaegerConfig(
config=hs.config.jaeger_config,
service_name="{} {}".format(hs.config.server_name, hs.get_instance_name()),
scope_manager=LogContextScopeManager(hs.config),
- ).initialize_tracer()
+ )
+
+ # If we have the rust jaeger reporter available let's use that.
+ if RustReporter:
+ logger.info("Using rust_python_jaeger_reporter library")
+ tracer = config.create_tracer(RustReporter(), config.sampler)
+ opentracing.set_global_tracer(tracer)
+ else:
+ config.initialize_tracer()
# Whitelisting
@@ -466,7 +499,9 @@ def start_active_span_from_edu(
if opentracing is None:
return _noop_context_manager()
- carrier = json.loads(edu_content.get("context", "{}")).get("opentracing", {})
+ carrier = json_decoder.decode(edu_content.get("context", "{}")).get(
+ "opentracing", {}
+ )
context = opentracing.tracer.extract(opentracing.Format.TEXT_MAP, carrier)
_references = [
opentracing.child_of(span_context_from_string(x))
@@ -657,7 +692,7 @@ def active_span_context_as_string():
opentracing.tracer.inject(
opentracing.tracer.active_span, opentracing.Format.TEXT_MAP, carrier
)
- return json.dumps(carrier)
+ return json_encoder.encode(carrier)
@only_if_tracing
@@ -666,7 +701,7 @@ def span_context_from_string(carrier):
Returns:
The active span context decoded from a string.
"""
- carrier = json.loads(carrier)
+ carrier = json_decoder.decode(carrier)
return opentracing.tracer.extract(opentracing.Format.TEXT_MAP, carrier)
@@ -700,37 +735,43 @@ def trace(func=None, opname=None):
_opname = opname if opname else func.__name__
- @wraps(func)
- def _trace_inner(*args, **kwargs):
- if opentracing is None:
- return func(*args, **kwargs)
+ if inspect.iscoroutinefunction(func):
- scope = start_active_span(_opname)
- scope.__enter__()
+ @wraps(func)
+ async def _trace_inner(*args, **kwargs):
+ with start_active_span(_opname):
+ return await func(*args, **kwargs)
- try:
- result = func(*args, **kwargs)
- if isinstance(result, defer.Deferred):
+ else:
+ # The other case here handles both sync functions and those
+ # decorated with inlineDeferred.
+ @wraps(func)
+ def _trace_inner(*args, **kwargs):
+ scope = start_active_span(_opname)
+ scope.__enter__()
- def call_back(result):
- scope.__exit__(None, None, None)
- return result
+ try:
+ result = func(*args, **kwargs)
+ if isinstance(result, defer.Deferred):
- def err_back(result):
- scope.span.set_tag(tags.ERROR, True)
- scope.__exit__(None, None, None)
- return result
+ def call_back(result):
+ scope.__exit__(None, None, None)
+ return result
- result.addCallbacks(call_back, err_back)
+ def err_back(result):
+ scope.__exit__(None, None, None)
+ return result
- else:
- scope.__exit__(None, None, None)
+ result.addCallbacks(call_back, err_back)
- return result
+ else:
+ scope.__exit__(None, None, None)
- except Exception as e:
- scope.__exit__(type(e), None, e.__traceback__)
- raise
+ return result
+
+ except Exception as e:
+ scope.__exit__(type(e), None, e.__traceback__)
+ raise
return _trace_inner
@@ -760,48 +801,42 @@ def tag_args(func):
return _tag_args_inner
-def trace_servlet(servlet_name, extract_context=False):
- """Decorator which traces a serlet. It starts a span with some servlet specific
- tags such as the servlet_name and request information
+@contextlib.contextmanager
+def trace_servlet(request: "SynapseRequest", extract_context: bool = False):
+ """Returns a context manager which traces a request. It starts a span
+ with some servlet specific tags such as the request metrics name and
+ request information.
Args:
- servlet_name (str): The name to be used for the span's operation_name
- extract_context (bool): Whether to attempt to extract the opentracing
+ request
+ extract_context: Whether to attempt to extract the opentracing
context from the request the servlet is handling.
-
"""
- def _trace_servlet_inner_1(func):
- if not opentracing:
- return func
-
- @wraps(func)
- async def _trace_servlet_inner(request, *args, **kwargs):
- request_tags = {
- "request_id": request.get_request_id(),
- tags.SPAN_KIND: tags.SPAN_KIND_RPC_SERVER,
- tags.HTTP_METHOD: request.get_method(),
- tags.HTTP_URL: request.get_redacted_uri(),
- tags.PEER_HOST_IPV6: request.getClientIP(),
- }
-
- if extract_context:
- scope = start_active_span_from_request(
- request, servlet_name, tags=request_tags
- )
- else:
- scope = start_active_span(servlet_name, tags=request_tags)
-
- with scope:
- result = func(request, *args, **kwargs)
+ if opentracing is None:
+ yield
+ return
- if not isinstance(result, (types.CoroutineType, defer.Deferred)):
- # Some servlets aren't async and just return results
- # directly, so we handle that here.
- return result
+ request_tags = {
+ "request_id": request.get_request_id(),
+ tags.SPAN_KIND: tags.SPAN_KIND_RPC_SERVER,
+ tags.HTTP_METHOD: request.get_method(),
+ tags.HTTP_URL: request.get_redacted_uri(),
+ tags.PEER_HOST_IPV6: request.getClientIP(),
+ }
- return await result
+ request_name = request.request_metrics.name
+ if extract_context:
+ scope = start_active_span_from_request(request, request_name, tags=request_tags)
+ else:
+ scope = start_active_span(request_name, tags=request_tags)
- return _trace_servlet_inner
+ with scope:
+ try:
+ yield
+ finally:
+ # We set the operation name again in case its changed (which happens
+ # with JsonResource).
+ scope.span.set_operation_name(request.request_metrics.name)
- return _trace_servlet_inner_1
+ scope.span.set_tag("request_tag", request.request_metrics.start_context.tag)
diff --git a/synapse/logging/scopecontextmanager.py b/synapse/logging/scopecontextmanager.py
index dc3ab00cbb..026854b4c7 100644
--- a/synapse/logging/scopecontextmanager.py
+++ b/synapse/logging/scopecontextmanager.py
@@ -116,6 +116,8 @@ class _LogContextScope(Scope):
if self._enter_logcontext:
self.logcontext.__enter__()
+ return self
+
def __exit__(self, type, value, traceback):
if type == twisted.internet.defer._DefGen_Return:
super(_LogContextScope, self).__exit__(None, None, None)
diff --git a/synapse/logging/utils.py b/synapse/logging/utils.py
index 99049bb5d8..fea774e2e5 100644
--- a/synapse/logging/utils.py
+++ b/synapse/logging/utils.py
@@ -14,9 +14,7 @@
# limitations under the License.
-import inspect
import logging
-import time
from functools import wraps
from inspect import getcallargs
@@ -74,127 +72,3 @@ def log_function(f):
wrapped.__name__ = func_name
return wrapped
-
-
-def time_function(f):
- func_name = f.__name__
-
- @wraps(f)
- def wrapped(*args, **kwargs):
- global _TIME_FUNC_ID
- id = _TIME_FUNC_ID
- _TIME_FUNC_ID += 1
-
- start = time.clock()
-
- try:
- _log_debug_as_f(f, "[FUNC START] {%s-%d}", (func_name, id))
-
- r = f(*args, **kwargs)
- finally:
- end = time.clock()
- _log_debug_as_f(
- f, "[FUNC END] {%s-%d} %.3f sec", (func_name, id, end - start)
- )
-
- return r
-
- return wrapped
-
-
-def trace_function(f):
- func_name = f.__name__
- linenum = f.func_code.co_firstlineno
- pathname = f.func_code.co_filename
-
- @wraps(f)
- def wrapped(*args, **kwargs):
- name = f.__module__
- logger = logging.getLogger(name)
- level = logging.DEBUG
-
- frame = inspect.currentframe()
- if frame is None:
- raise Exception("Can't get current frame!")
-
- s = frame.f_back
-
- to_print = [
- "\t%s:%s %s. Args: args=%s, kwargs=%s"
- % (pathname, linenum, func_name, args, kwargs)
- ]
- while s:
- if True or s.f_globals["__name__"].startswith("synapse"):
- filename, lineno, function, _, _ = inspect.getframeinfo(s)
- args_string = inspect.formatargvalues(*inspect.getargvalues(s))
-
- to_print.append(
- "\t%s:%d %s. Args: %s" % (filename, lineno, function, args_string)
- )
-
- s = s.f_back
-
- msg = "\nTraceback for %s:\n" % (func_name,) + "\n".join(to_print)
-
- record = logging.LogRecord(
- name=name,
- level=level,
- pathname=pathname,
- lineno=lineno,
- msg=msg,
- args=(),
- exc_info=None,
- )
-
- logger.handle(record)
-
- return f(*args, **kwargs)
-
- wrapped.__name__ = func_name
- return wrapped
-
-
-def get_previous_frames():
-
- frame = inspect.currentframe()
- if frame is None:
- raise Exception("Can't get current frame!")
-
- s = frame.f_back.f_back
- to_return = []
- while s:
- if s.f_globals["__name__"].startswith("synapse"):
- filename, lineno, function, _, _ = inspect.getframeinfo(s)
- args_string = inspect.formatargvalues(*inspect.getargvalues(s))
-
- to_return.append(
- "{{ %s:%d %s - Args: %s }}" % (filename, lineno, function, args_string)
- )
-
- s = s.f_back
-
- return ", ".join(to_return)
-
-
-def get_previous_frame(ignore=[]):
- frame = inspect.currentframe()
- if frame is None:
- raise Exception("Can't get current frame!")
- s = frame.f_back.f_back
-
- while s:
- if s.f_globals["__name__"].startswith("synapse"):
- if not any(s.f_globals["__name__"].startswith(ig) for ig in ignore):
- filename, lineno, function, _, _ = inspect.getframeinfo(s)
- args_string = inspect.formatargvalues(*inspect.getargvalues(s))
-
- return "{{ %s:%d %s - Args: %s }}" % (
- filename,
- lineno,
- function,
- args_string,
- )
-
- s = s.f_back
-
- return None
|