diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index 523bad0c55..61da585ad0 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -31,9 +31,9 @@ from synapse.api.errors import (
from synapse.appservice import ApplicationService
from synapse.http import get_request_user_agent
from synapse.http.site import SynapseRequest
-from synapse.logging.opentracing import (
- active_span,
+from synapse.logging.tracing import (
force_tracing,
+ get_active_span,
start_active_span,
trace,
)
@@ -141,7 +141,7 @@ class Auth:
is invalid.
AuthError if access is denied for the user in the access token
"""
- parent_span = active_span()
+ parent_span = get_active_span()
with start_active_span("get_user_by_req"):
requester = await self._wrapped_get_user_by_req(
request, allow_guest, allow_expired
@@ -151,19 +151,18 @@ class Auth:
if requester.authenticated_entity in self._force_tracing_for_users:
# request tracing is enabled for this user, so we need to force it
# tracing on for the parent span (which will be the servlet span).
- #
+ force_tracing(parent_span)
# It's too late for the get_user_by_req span to inherit the setting,
# so we also force it on for that.
force_tracing()
- force_tracing(parent_span)
- parent_span.set_tag(
+ parent_span.set_attribute(
"authenticated_entity", requester.authenticated_entity
)
- parent_span.set_tag("user_id", requester.user.to_string())
+ parent_span.set_attribute("user_id", requester.user.to_string())
if requester.device_id is not None:
- parent_span.set_tag("device_id", requester.device_id)
+ parent_span.set_attribute("device_id", requester.device_id)
if requester.app_service is not None:
- parent_span.set_tag("appservice_id", requester.app_service.id)
+ parent_span.set_attribute("appservice_id", requester.app_service.id)
return requester
async def _wrapped_get_user_by_req(
@@ -174,7 +173,7 @@ class Auth:
) -> Requester:
"""Helper for get_user_by_req
- Once get_user_by_req has set up the opentracing span, this does the actual work.
+ Once get_user_by_req has set up the tracing span, this does the actual work.
"""
try:
ip_addr = request.getClientAddress().host
diff --git a/synapse/api/constants.py b/synapse/api/constants.py
index 789859e69e..fc04e4d4bd 100644
--- a/synapse/api/constants.py
+++ b/synapse/api/constants.py
@@ -193,6 +193,9 @@ class LimitBlockingTypes:
class EventContentFields:
"""Fields found in events' content, regardless of type."""
+ # Synapse internal content field for tracing
+ TRACING_CONTEXT: Final = "org.matrix.tracing_context"
+
# Labels for the event, cf https://github.com/matrix-org/matrix-doc/pull/2326
LABELS: Final = "org.matrix.labels"
diff --git a/synapse/app/_base.py b/synapse/app/_base.py
index 923891ae0d..5763352b29 100644
--- a/synapse/app/_base.py
+++ b/synapse/app/_base.py
@@ -62,7 +62,7 @@ from synapse.events.spamcheck import load_legacy_spam_checkers
from synapse.events.third_party_rules import load_legacy_third_party_event_rules
from synapse.handlers.auth import load_legacy_password_auth_providers
from synapse.logging.context import PreserveLoggingContext
-from synapse.logging.opentracing import init_tracer
+from synapse.logging.tracing import init_tracer
from synapse.metrics import install_gc_manager, register_threadpool
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.metrics.jemalloc import setup_jemalloc_stats
diff --git a/synapse/config/tracer.py b/synapse/config/tracer.py
index c19270c6c5..d67498f50d 100644
--- a/synapse/config/tracer.py
+++ b/synapse/config/tracer.py
@@ -24,41 +24,50 @@ class TracerConfig(Config):
section = "tracing"
def read_config(self, config: JsonDict, **kwargs: Any) -> None:
- opentracing_config = config.get("opentracing")
- if opentracing_config is None:
- opentracing_config = {}
+ tracing_config = config.get("tracing")
+ if tracing_config is None:
+ tracing_config = {}
- self.opentracer_enabled = opentracing_config.get("enabled", False)
+ self.tracing_enabled = tracing_config.get("enabled", False)
- self.jaeger_config = opentracing_config.get(
- "jaeger_config",
- {"sampler": {"type": "const", "param": 1}, "logging": False},
+ self.jaeger_exporter_config = tracing_config.get(
+ "jaeger_exporter_config",
+ {},
)
self.force_tracing_for_users: Set[str] = set()
- if not self.opentracer_enabled:
+ if not self.tracing_enabled:
return
- check_requirements("opentracing")
+ check_requirements("opentelemetry")
# The tracer is enabled so sanitize the config
- self.opentracer_whitelist: List[str] = opentracing_config.get(
+ # Default to always sample. Range: [0.0 - 1.0]
+ self.sample_rate: float = float(tracing_config.get("sample_rate", 1))
+ if self.sample_rate < 0.0 or self.sample_rate > 1.0:
+ raise ConfigError(
+ "Tracing sample_rate must be in range [0.0, 1.0].",
+ ("tracing", "sample_rate"),
+ )
+
+ self.homeserver_whitelist: List[str] = tracing_config.get(
"homeserver_whitelist", []
)
- if not isinstance(self.opentracer_whitelist, list):
- raise ConfigError("Tracer homeserver_whitelist config is malformed")
-
- force_tracing_for_users = opentracing_config.get("force_tracing_for_users", [])
- if not isinstance(force_tracing_for_users, list):
+ if not isinstance(self.homeserver_whitelist, list):
raise ConfigError(
- "Expected a list", ("opentracing", "force_tracing_for_users")
+ "Tracing homeserver_whitelist config is malformed",
+ ("tracing", "homeserver_whitelist"),
)
+
+ force_tracing_for_users = tracing_config.get("force_tracing_for_users", [])
+ if not isinstance(force_tracing_for_users, list):
+ raise ConfigError("Expected a list", ("tracing", "force_tracing_for_users"))
for i, u in enumerate(force_tracing_for_users):
if not isinstance(u, str):
raise ConfigError(
"Expected a string",
- ("opentracing", "force_tracing_for_users", f"index {i}"),
+ ("tracing", "force_tracing_for_users", f"index {i}"),
)
self.force_tracing_for_users.add(u)
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 1d60137411..c9cd02ebeb 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -61,7 +61,7 @@ from synapse.logging.context import (
nested_logging_context,
run_in_background,
)
-from synapse.logging.opentracing import log_kv, start_active_span_from_edu, trace
+from synapse.logging.tracing import log_kv, start_active_span_from_edu, trace
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.replication.http.federation import (
ReplicationFederationSendEduRestServlet,
@@ -1399,7 +1399,7 @@ class FederationHandlerRegistry:
# Check if we have a handler on this instance
handler = self.edu_handlers.get(edu_type)
if handler:
- with start_active_span_from_edu(content, "handle_edu"):
+ with start_active_span_from_edu("handle_edu", edu_content=content):
try:
await handler(origin, content)
except SynapseError as e:
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index 41d8b937af..72bc935452 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -32,7 +32,7 @@ from synapse.events import EventBase
from synapse.federation.units import Edu
from synapse.handlers.presence import format_user_presence_state
from synapse.logging import issue9533_logger
-from synapse.logging.opentracing import SynapseTags, set_tag
+from synapse.logging.tracing import SynapseTags, set_attribute
from synapse.metrics import sent_transactions_counter
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import ReadReceipt
@@ -596,7 +596,7 @@ class PerDestinationQueue:
if not message_id:
continue
- set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id)
+ set_attribute(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id)
edus = [
Edu(
diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py
index 75081810fd..3f2c8bcfa1 100644
--- a/synapse/federation/sender/transaction_manager.py
+++ b/synapse/federation/sender/transaction_manager.py
@@ -21,11 +21,13 @@ from synapse.api.errors import HttpResponseException
from synapse.events import EventBase
from synapse.federation.persistence import TransactionActions
from synapse.federation.units import Edu, Transaction
-from synapse.logging.opentracing import (
+from synapse.logging.tracing import (
+ Link,
+ StatusCode,
extract_text_map,
- set_tag,
- start_active_span_follows_from,
- tags,
+ get_span_context_from_context,
+ set_status,
+ start_active_span,
whitelisted_homeserver,
)
from synapse.types import JsonDict
@@ -79,7 +81,7 @@ class TransactionManager:
edus: List of EDUs to send
"""
- # Make a transaction-sending opentracing span. This span follows on from
+ # Make a transaction-sending tracing span. This span follows on from
# all the edus in that transaction. This needs to be done since there is
# no active span here, so if the edus were not received by the remote the
# span would have no causality and it would be forgotten.
@@ -88,13 +90,20 @@ class TransactionManager:
keep_destination = whitelisted_homeserver(destination)
for edu in edus:
- context = edu.get_context()
- if context:
- span_contexts.append(extract_text_map(json_decoder.decode(context)))
+ tracing_context_json = edu.get_tracing_context_json()
+ if tracing_context_json:
+ context = extract_text_map(json_decoder.decode(tracing_context_json))
+ if context:
+ span_context = get_span_context_from_context(context)
+ if span_context:
+ span_contexts.append(span_context)
if keep_destination:
- edu.strip_context()
+ edu.strip_tracing_context()
- with start_active_span_follows_from("send_transaction", span_contexts):
+ with start_active_span(
+ "send_transaction",
+ links=[Link(span_context) for span_context in span_contexts],
+ ):
logger.debug("TX [%s] _attempt_new_transaction", destination)
txn_id = str(self._next_txn_id)
@@ -166,7 +175,7 @@ class TransactionManager:
except HttpResponseException as e:
code = e.code
- set_tag(tags.ERROR, True)
+ set_status(StatusCode.ERROR, e)
logger.info("TX [%s] {%s} got %d response", destination, txn_id, code)
raise
diff --git a/synapse/federation/transport/server/_base.py b/synapse/federation/transport/server/_base.py
index bb0f8d6b7b..9425106a3f 100644
--- a/synapse/federation/transport/server/_base.py
+++ b/synapse/federation/transport/server/_base.py
@@ -15,7 +15,6 @@
import functools
import logging
import re
-import time
from http import HTTPStatus
from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, Optional, Tuple, cast
@@ -25,12 +24,15 @@ from synapse.http.server import HttpServer, ServletCallback, is_method_cancellab
from synapse.http.servlet import parse_json_object_from_request
from synapse.http.site import SynapseRequest
from synapse.logging.context import run_in_background
-from synapse.logging.opentracing import (
- active_span,
- set_tag,
- span_context_from_request,
+from synapse.logging.tracing import (
+ Link,
+ context_from_request,
+ create_non_recording_span,
+ get_active_span,
+ set_attribute,
start_active_span,
- start_active_span_follows_from,
+ start_span,
+ use_span,
whitelisted_homeserver,
)
from synapse.types import JsonDict
@@ -308,60 +310,70 @@ class BaseFederationServlet:
logger.warning("authenticate_request failed: %s", e)
raise
- # update the active opentracing span with the authenticated entity
- set_tag("authenticated_entity", str(origin))
+ # update the active tracing span with the authenticated entity
+ set_attribute("authenticated_entity", str(origin))
# if the origin is authenticated and whitelisted, use its span context
# as the parent.
- context = None
+ origin_context = None
if origin and whitelisted_homeserver(origin):
- context = span_context_from_request(request)
-
- if context:
- servlet_span = active_span()
- # a scope which uses the origin's context as a parent
- processing_start_time = time.time()
- scope = start_active_span_follows_from(
+ origin_context = context_from_request(request)
+
+ if origin_context:
+ local_servlet_span = get_active_span()
+ # Create a span which uses the `origin_context` as a parent
+ # so we can see how the incoming payload was processed while
+ # we're looking at the outgoing trace. Since the parent is set
+ # to a remote span (from the origin), it won't show up in the
+ # local trace which is why we create another span below for the
+ # local trace. A span can only have one parent so we have to
+ # create two separate ones.
+ remote_parent_span = start_span(
"incoming-federation-request",
- child_of=context,
- contexts=(servlet_span,),
- start_time=processing_start_time,
+ context=origin_context,
+ # Cross-link back to the local trace so we can jump
+ # to the incoming side from the remote origin trace.
+ links=[Link(local_servlet_span.get_span_context())]
+ if local_servlet_span
+ else None,
)
+ # Create a local span to appear in the local trace
+ local_parent_span_cm = start_active_span(
+ "process-federation-request",
+ # Cross-link back to the remote outgoing trace so we can
+ # jump over there.
+ links=[Link(remote_parent_span.get_span_context())],
+ )
else:
- # just use our context as a parent
- scope = start_active_span(
- "incoming-federation-request",
+ # Otherwise just use our local active servlet context as a parent
+ local_parent_span_cm = start_active_span(
+ "process-federation-request",
)
- try:
- with scope:
- if origin and self.RATELIMIT:
- with ratelimiter.ratelimit(origin) as d:
- await d
- if request._disconnected:
- logger.warning(
- "client disconnected before we started processing "
- "request"
- )
- return None
- response = await func(
- origin, content, request.args, *args, **kwargs
+ # Don't need to record anything for the remote because no remote
+ # trace context given.
+ remote_parent_span = create_non_recording_span()
+
+ remote_parent_span_cm = use_span(remote_parent_span, end_on_exit=True)
+
+ with remote_parent_span_cm, local_parent_span_cm:
+ if origin and self.RATELIMIT:
+ with ratelimiter.ratelimit(origin) as d:
+ await d
+ if request._disconnected:
+ logger.warning(
+ "client disconnected before we started processing "
+ "request"
)
- else:
+ return None
response = await func(
origin, content, request.args, *args, **kwargs
)
- finally:
- # if we used the origin's context as the parent, add a new span using
- # the servlet span as a parent, so that we have a link
- if context:
- scope2 = start_active_span_follows_from(
- "process-federation_request",
- contexts=(scope.span,),
- start_time=processing_start_time,
+ else:
+ response = await func(
+ origin, content, request.args, *args, **kwargs
)
- scope2.close()
return response
diff --git a/synapse/federation/units.py b/synapse/federation/units.py
index b9b12fbea5..a6b590269e 100644
--- a/synapse/federation/units.py
+++ b/synapse/federation/units.py
@@ -21,6 +21,7 @@ from typing import List, Optional
import attr
+from synapse.api.constants import EventContentFields
from synapse.types import JsonDict
logger = logging.getLogger(__name__)
@@ -54,11 +55,13 @@ class Edu:
"destination": self.destination,
}
- def get_context(self) -> str:
- return getattr(self, "content", {}).get("org.matrix.opentracing_context", "{}")
+ def get_tracing_context_json(self) -> str:
+ return getattr(self, "content", {}).get(
+ EventContentFields.TRACING_CONTEXT, "{}"
+ )
- def strip_context(self) -> None:
- getattr(self, "content", {})["org.matrix.opentracing_context"] = "{}"
+ def strip_tracing_context(self) -> None:
+ getattr(self, "content", {})[EventContentFields.TRACING_CONTEXT] = "{}"
def _none_to_list(edus: Optional[List[JsonDict]]) -> List[JsonDict]:
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 1a8379854c..659ee0ef5e 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -36,7 +36,7 @@ from synapse.api.errors import (
RequestSendFailed,
SynapseError,
)
-from synapse.logging.opentracing import log_kv, set_tag, trace
+from synapse.logging.tracing import log_kv, set_attribute, trace
from synapse.metrics.background_process_metrics import (
run_as_background_process,
wrap_as_background_process,
@@ -86,7 +86,7 @@ class DeviceWorkerHandler:
info on each device
"""
- set_tag("user_id", user_id)
+ set_attribute("user_id", user_id)
device_map = await self.store.get_devices_by_user(user_id)
ips = await self.store.get_last_client_ip_by_device(user_id, device_id=None)
@@ -118,8 +118,8 @@ class DeviceWorkerHandler:
ips = await self.store.get_last_client_ip_by_device(user_id, device_id)
_update_device_from_client_ips(device, ips)
- set_tag("device", str(device))
- set_tag("ips", str(ips))
+ set_attribute("device", str(device))
+ set_attribute("ips", str(ips))
return device
@@ -169,8 +169,8 @@ class DeviceWorkerHandler:
joined a room, that `user_id` may be interested in.
"""
- set_tag("user_id", user_id)
- set_tag("from_token", str(from_token))
+ set_attribute("user_id", user_id)
+ set_attribute("from_token", str(from_token))
now_room_key = self.store.get_room_max_token()
room_ids = await self.store.get_rooms_for_user(user_id)
@@ -461,8 +461,8 @@ class DeviceHandler(DeviceWorkerHandler):
except errors.StoreError as e:
if e.code == 404:
# no match
- set_tag("error", True)
- set_tag("reason", "User doesn't have that device id.")
+ set_attribute("error", True)
+ set_attribute("reason", "User doesn't have that device id.")
else:
raise
@@ -688,7 +688,7 @@ class DeviceHandler(DeviceWorkerHandler):
else:
return
- for user_id, device_id, room_id, stream_id, opentracing_context in rows:
+ for user_id, device_id, room_id, stream_id, tracing_context in rows:
hosts = set()
# Ignore any users that aren't ours
@@ -707,7 +707,7 @@ class DeviceHandler(DeviceWorkerHandler):
room_id=room_id,
stream_id=stream_id,
hosts=hosts,
- context=opentracing_context,
+ context=tracing_context,
)
# Notify replication that we've updated the device list stream.
@@ -794,8 +794,8 @@ class DeviceListUpdater:
for parsing the EDU and adding to pending updates list.
"""
- set_tag("origin", origin)
- set_tag("edu_content", str(edu_content))
+ set_attribute("origin", origin)
+ set_attribute("edu_content", str(edu_content))
user_id = edu_content.pop("user_id")
device_id = edu_content.pop("device_id")
stream_id = str(edu_content.pop("stream_id")) # They may come as ints
@@ -815,7 +815,7 @@ class DeviceListUpdater:
origin,
)
- set_tag("error", True)
+ set_attribute("error", True)
log_kv(
{
"message": "Got a device list update edu from a user and "
@@ -830,7 +830,7 @@ class DeviceListUpdater:
if not room_ids:
# We don't share any rooms with this user. Ignore update, as we
# probably won't get any further updates.
- set_tag("error", True)
+ set_attribute("error", True)
log_kv(
{
"message": "Got an update from a user for which "
@@ -1027,12 +1027,12 @@ class DeviceListUpdater:
# eventually become consistent.
return None
except FederationDeniedError as e:
- set_tag("error", True)
+ set_attribute("error", True)
log_kv({"reason": "FederationDeniedError"})
logger.info(e)
return None
except Exception as e:
- set_tag("error", True)
+ set_attribute("error", True)
log_kv(
{"message": "Exception raised by federation request", "exception": e}
)
diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py
index 444c08bc2e..9c9da0cb63 100644
--- a/synapse/handlers/devicemessage.py
+++ b/synapse/handlers/devicemessage.py
@@ -15,15 +15,15 @@
import logging
from typing import TYPE_CHECKING, Any, Dict
-from synapse.api.constants import EduTypes, ToDeviceEventTypes
+from synapse.api.constants import EduTypes, EventContentFields, ToDeviceEventTypes
from synapse.api.errors import SynapseError
from synapse.api.ratelimiting import Ratelimiter
from synapse.logging.context import run_in_background
-from synapse.logging.opentracing import (
+from synapse.logging.tracing import (
SynapseTags,
get_active_span_text_map,
log_kv,
- set_tag,
+ set_attribute,
)
from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
from synapse.types import JsonDict, Requester, StreamKeyType, UserID, get_domain_from_id
@@ -217,10 +217,10 @@ class DeviceMessageHandler:
sender_user_id = requester.user.to_string()
message_id = random_string(16)
- set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id)
+ set_attribute(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id)
log_kv({"number_of_to_device_messages": len(messages)})
- set_tag("sender", sender_user_id)
+ set_attribute("sender", sender_user_id)
local_messages = {}
remote_messages: Dict[str, Dict[str, Dict[str, JsonDict]]] = {}
for user_id, by_device in messages.items():
@@ -273,7 +273,7 @@ class DeviceMessageHandler:
"sender": sender_user_id,
"type": message_type,
"message_id": message_id,
- "org.matrix.opentracing_context": json_encoder.encode(context),
+ EventContentFields.TRACING_CONTEXT: json_encoder.encode(context),
}
# Add messages to the database.
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index c938339ddd..a3692f00d9 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -28,7 +28,7 @@ from twisted.internet import defer
from synapse.api.constants import EduTypes
from synapse.api.errors import CodeMessageException, Codes, NotFoundError, SynapseError
from synapse.logging.context import make_deferred_yieldable, run_in_background
-from synapse.logging.opentracing import log_kv, set_tag, tag_args, trace
+from synapse.logging.tracing import log_kv, set_attribute, tag_args, trace
from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
from synapse.types import (
JsonDict,
@@ -138,8 +138,8 @@ class E2eKeysHandler:
else:
remote_queries[user_id] = device_ids
- set_tag("local_key_query", str(local_query))
- set_tag("remote_key_query", str(remote_queries))
+ set_attribute("local_key_query", str(local_query))
+ set_attribute("remote_key_query", str(remote_queries))
# First get local devices.
# A map of destination -> failure response.
@@ -342,8 +342,8 @@ class E2eKeysHandler:
except Exception as e:
failure = _exception_to_failure(e)
failures[destination] = failure
- set_tag("error", True)
- set_tag("reason", str(failure))
+ set_attribute("error", True)
+ set_attribute("reason", str(failure))
return
@@ -405,7 +405,7 @@ class E2eKeysHandler:
Returns:
A map from user_id -> device_id -> device details
"""
- set_tag("local_query", str(query))
+ set_attribute("local_query", str(query))
local_query: List[Tuple[str, Optional[str]]] = []
result_dict: Dict[str, Dict[str, dict]] = {}
@@ -420,7 +420,7 @@ class E2eKeysHandler:
"user_id": user_id,
}
)
- set_tag("error", True)
+ set_attribute("error", True)
raise SynapseError(400, "Not a user here")
if not device_ids:
@@ -477,8 +477,8 @@ class E2eKeysHandler:
domain = get_domain_from_id(user_id)
remote_queries.setdefault(domain, {})[user_id] = one_time_keys
- set_tag("local_key_query", str(local_query))
- set_tag("remote_key_query", str(remote_queries))
+ set_attribute("local_key_query", str(local_query))
+ set_attribute("remote_key_query", str(remote_queries))
results = await self.store.claim_e2e_one_time_keys(local_query)
@@ -494,7 +494,7 @@ class E2eKeysHandler:
@trace
async def claim_client_keys(destination: str) -> None:
- set_tag("destination", destination)
+ set_attribute("destination", destination)
device_keys = remote_queries[destination]
try:
remote_result = await self.federation.claim_client_keys(
@@ -507,8 +507,8 @@ class E2eKeysHandler:
except Exception as e:
failure = _exception_to_failure(e)
failures[destination] = failure
- set_tag("error", True)
- set_tag("reason", str(failure))
+ set_attribute("error", True)
+ set_attribute("reason", str(failure))
await make_deferred_yieldable(
defer.gatherResults(
@@ -611,7 +611,7 @@ class E2eKeysHandler:
result = await self.store.count_e2e_one_time_keys(user_id, device_id)
- set_tag("one_time_key_counts", str(result))
+ set_attribute("one_time_key_counts", str(result))
return {"one_time_key_counts": result}
async def _upload_one_time_keys_for_user(
diff --git a/synapse/handlers/e2e_room_keys.py b/synapse/handlers/e2e_room_keys.py
index 28dc08c22a..8786534e54 100644
--- a/synapse/handlers/e2e_room_keys.py
+++ b/synapse/handlers/e2e_room_keys.py
@@ -25,7 +25,7 @@ from synapse.api.errors import (
StoreError,
SynapseError,
)
-from synapse.logging.opentracing import log_kv, trace
+from synapse.logging.tracing import log_kv, trace
from synapse.storage.databases.main.e2e_room_keys import RoomKey
from synapse.types import JsonDict
from synapse.util.async_helpers import Linearizer
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index d42a414c90..8dc05d648d 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -25,7 +25,12 @@ from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.events import EventBase
from synapse.handlers.relations import BundledAggregations
from synapse.logging.context import current_context
-from synapse.logging.opentracing import SynapseTags, log_kv, set_tag, start_active_span
+from synapse.logging.tracing import (
+ SynapseTags,
+ log_kv,
+ set_attribute,
+ start_active_span,
+)
from synapse.push.clientformat import format_push_rules_for_user
from synapse.storage.databases.main.event_push_actions import NotifCounts
from synapse.storage.roommember import MemberSummary
@@ -391,12 +396,12 @@ class SyncHandler:
indoctrination.
"""
with start_active_span("sync.current_sync_for_user"):
- log_kv({"since_token": since_token})
+ log_kv({"since_token": str(since_token)})
sync_result = await self.generate_sync_result(
sync_config, since_token, full_state
)
- set_tag(SynapseTags.SYNC_RESULT, bool(sync_result))
+ set_attribute(SynapseTags.SYNC_RESULT, bool(sync_result))
return sync_result
async def push_rules_for_user(self, user: UserID) -> Dict[str, Dict[str, list]]:
@@ -1084,7 +1089,7 @@ class SyncHandler:
# to query up to a given point.
# Always use the `now_token` in `SyncResultBuilder`
now_token = self.event_sources.get_current_token()
- log_kv({"now_token": now_token})
+ log_kv({"now_token": str(now_token)})
logger.debug(
"Calculating sync response for %r between %s and %s",
@@ -1337,7 +1342,7 @@ class SyncHandler:
# `/sync`
message_id = message.pop("message_id", None)
if message_id:
- set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id)
+ set_attribute(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id)
logger.debug(
"Returning %d to-device messages between %d and %d (current token: %d)",
@@ -1997,13 +2002,13 @@ class SyncHandler:
upto_token = room_builder.upto_token
with start_active_span("sync.generate_room_entry"):
- set_tag("room_id", room_id)
+ set_attribute("room_id", room_id)
log_kv({"events": len(events or ())})
log_kv(
{
- "since_token": since_token,
- "upto_token": upto_token,
+ "since_token": str(since_token),
+ "upto_token": str(upto_token),
}
)
@@ -2018,7 +2023,7 @@ class SyncHandler:
log_kv(
{
"batch_events": len(batch.events),
- "prev_batch": batch.prev_batch,
+ "prev_batch": str(batch.prev_batch),
"batch_limited": batch.limited,
}
)
diff --git a/synapse/http/client.py b/synapse/http/client.py
index 084d0a5b84..89bd403312 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -75,7 +75,13 @@ from synapse.http import QuieterFileBodyProducer, RequestTimedOutError, redact_u
from synapse.http.proxyagent import ProxyAgent
from synapse.http.types import QueryParams
from synapse.logging.context import make_deferred_yieldable
-from synapse.logging.opentracing import set_tag, start_active_span, tags
+from synapse.logging.tracing import (
+ SpanAttributes,
+ SpanKind,
+ StatusCode,
+ set_status,
+ start_active_span,
+)
from synapse.types import ISynapseReactor
from synapse.util import json_decoder
from synapse.util.async_helpers import timeout_deferred
@@ -402,12 +408,11 @@ class SimpleHttpClient:
with start_active_span(
"outgoing-client-request",
- tags={
- tags.SPAN_KIND: tags.SPAN_KIND_RPC_CLIENT,
- tags.HTTP_METHOD: method,
- tags.HTTP_URL: uri,
+ kind=SpanKind.CLIENT,
+ attributes={
+ SpanAttributes.HTTP_METHOD: method,
+ SpanAttributes.HTTP_URL: uri,
},
- finish_on_close=True,
):
try:
body_producer = None
@@ -459,8 +464,7 @@ class SimpleHttpClient:
type(e).__name__,
e.args[0],
)
- set_tag(tags.ERROR, True)
- set_tag("error_reason", e.args[0])
+ set_status(StatusCode.ERROR, e)
raise
async def post_urlencoded_get_json(
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 3c35b1d2c7..00704a6a7c 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -72,9 +72,14 @@ from synapse.http.client import (
)
from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent
from synapse.http.types import QueryParams
-from synapse.logging import opentracing
+from synapse.logging import tracing
from synapse.logging.context import make_deferred_yieldable, run_in_background
-from synapse.logging.opentracing import set_tag, start_active_span, tags
+from synapse.logging.tracing import (
+ SpanAttributes,
+ SpanKind,
+ set_attribute,
+ start_active_span,
+)
from synapse.types import JsonDict
from synapse.util import json_decoder
from synapse.util.async_helpers import AwakenableSleeper, timeout_deferred
@@ -517,18 +522,19 @@ class MatrixFederationHttpClient:
scope = start_active_span(
"outgoing-federation-request",
- tags={
- tags.SPAN_KIND: tags.SPAN_KIND_RPC_CLIENT,
- tags.PEER_ADDRESS: request.destination,
- tags.HTTP_METHOD: request.method,
- tags.HTTP_URL: request.path,
+ kind=SpanKind.CLIENT,
+ attributes={
+ SpanAttributes.HTTP_HOST: request.destination,
+ SpanAttributes.HTTP_METHOD: request.method,
+ SpanAttributes.HTTP_URL: request.path,
},
- finish_on_close=True,
)
# Inject the span into the headers
headers_dict: Dict[bytes, List[bytes]] = {}
- opentracing.inject_header_dict(headers_dict, request.destination)
+ tracing.inject_active_tracing_context_into_header_dict(
+ headers_dict, request.destination
+ )
headers_dict[b"User-Agent"] = [self.version_string_bytes]
@@ -614,7 +620,7 @@ class MatrixFederationHttpClient:
request.method, response.code
).inc()
- set_tag(tags.HTTP_STATUS_CODE, response.code)
+ set_attribute(SpanAttributes.HTTP_STATUS_CODE, response.code)
response_phrase = response.phrase.decode("ascii", errors="replace")
if 200 <= response.code < 300:
diff --git a/synapse/http/server.py b/synapse/http/server.py
index 19f42159b8..6420c0837b 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -61,13 +61,13 @@ from synapse.api.errors import (
from synapse.config.homeserver import HomeServerConfig
from synapse.http.site import SynapseRequest
from synapse.logging.context import defer_to_thread, preserve_fn, run_in_background
-from synapse.logging.opentracing import active_span, start_active_span, trace_servlet
+from synapse.logging.tracing import get_active_span, start_active_span, trace_servlet
from synapse.util import json_encoder
from synapse.util.caches import intern_dict
from synapse.util.iterutils import chunk_seq
if TYPE_CHECKING:
- import opentracing
+ import opentelemetry
from synapse.server import HomeServer
@@ -329,7 +329,7 @@ class HttpServer(Protocol):
subsequent arguments will be any matched groups from the regex.
This should return either tuple of (code, response), or None.
servlet_classname (str): The name of the handler to be used in prometheus
- and opentracing logs.
+ and tracing logs.
"""
@@ -340,7 +340,7 @@ class _AsyncResource(resource.Resource, metaclass=abc.ABCMeta):
requests by method, or override `_async_render` to handle all requests.
Args:
- extract_context: Whether to attempt to extract the opentracing
+ extract_context: Whether to attempt to extract the tracing
context from the request the servlet is handling.
"""
@@ -510,7 +510,7 @@ class JsonResource(DirectServeJsonResource):
callback: The handler for the request. Usually a Servlet
servlet_classname: The name of the handler to be used in prometheus
- and opentracing logs.
+ and tracing logs.
"""
method_bytes = method.encode("utf-8")
@@ -878,19 +878,19 @@ async def _async_write_json_to_request_in_thread(
expensive.
"""
- def encode(opentracing_span: "Optional[opentracing.Span]") -> bytes:
+ def encode(tracing_span: Optional["opentelemetry.trace.Span"]) -> bytes:
# it might take a while for the threadpool to schedule us, so we write
- # opentracing logs once we actually get scheduled, so that we can see how
+ # tracing logs once we actually get scheduled, so that we can see how
# much that contributed.
- if opentracing_span:
- opentracing_span.log_kv({"event": "scheduled"})
+ if tracing_span:
+ tracing_span.add_event("scheduled", attributes={"event": "scheduled"})
res = json_encoder(json_object)
- if opentracing_span:
- opentracing_span.log_kv({"event": "encoded"})
+ if tracing_span:
+ tracing_span.add_event("scheduled", attributes={"event": "encoded"})
return res
with start_active_span("encode_json_response"):
- span = active_span()
+ span = get_active_span()
json_str = await defer_to_thread(request.reactor, encode, span)
_write_bytes_to_request(request, json_str)
diff --git a/synapse/http/site.py b/synapse/http/site.py
index eeec74b78a..d82c046dd7 100644
--- a/synapse/http/site.py
+++ b/synapse/http/site.py
@@ -37,7 +37,7 @@ from synapse.logging.context import (
from synapse.types import Requester
if TYPE_CHECKING:
- import opentracing
+ import opentelemetry
logger = logging.getLogger(__name__)
@@ -85,9 +85,9 @@ class SynapseRequest(Request):
# server name, for client requests this is the Requester object.
self._requester: Optional[Union[Requester, str]] = None
- # An opentracing span for this request. Will be closed when the request is
+ # An tracing span for this request. Will be closed when the request is
# completely processed.
- self._opentracing_span: "Optional[opentracing.Span]" = None
+ self._tracing_span: Optional["opentelemetry.trace.Span"] = None
# we can't yet create the logcontext, as we don't know the method.
self.logcontext: Optional[LoggingContext] = None
@@ -164,12 +164,12 @@ class SynapseRequest(Request):
# If there's no authenticated entity, it was the requester.
self.logcontext.request.authenticated_entity = authenticated_entity or requester
- def set_opentracing_span(self, span: "opentracing.Span") -> None:
- """attach an opentracing span to this request
+ def set_tracing_span(self, span: "opentelemetry.trace.Span") -> None:
+ """attach an tracing span to this request
Doing so will cause the span to be closed when we finish processing the request
"""
- self._opentracing_span = span
+ self._tracing_span = span
def get_request_id(self) -> str:
return "%s-%i" % (self.get_method(), self.request_seq)
@@ -309,8 +309,10 @@ class SynapseRequest(Request):
self._processing_finished_time = time.time()
self._is_processing = False
- if self._opentracing_span:
- self._opentracing_span.log_kv({"event": "finished processing"})
+ if self._tracing_span:
+ self._tracing_span.add_event(
+ "finished processing", attributes={"event": "finished processing"}
+ )
# if we've already sent the response, log it now; otherwise, we wait for the
# response to be sent.
@@ -325,8 +327,10 @@ class SynapseRequest(Request):
"""
self.finish_time = time.time()
Request.finish(self)
- if self._opentracing_span:
- self._opentracing_span.log_kv({"event": "response sent"})
+ if self._tracing_span:
+ self._tracing_span.add_event(
+ "response sent", attributes={"event": "response sent"}
+ )
if not self._is_processing:
assert self.logcontext is not None
with PreserveLoggingContext(self.logcontext):
@@ -361,9 +365,13 @@ class SynapseRequest(Request):
with PreserveLoggingContext(self.logcontext):
logger.info("Connection from client lost before response was sent")
- if self._opentracing_span:
- self._opentracing_span.log_kv(
- {"event": "client connection lost", "reason": str(reason.value)}
+ if self._tracing_span:
+ self._tracing_span.add_event(
+ "client connection lost",
+ attributes={
+ "event": "client connection lost",
+ "reason": str(reason.value),
+ },
)
if self._is_processing:
@@ -471,9 +479,9 @@ class SynapseRequest(Request):
usage.evt_db_fetch_count,
)
- # complete the opentracing span, if any.
- if self._opentracing_span:
- self._opentracing_span.finish()
+ # complete the tracing span, if any.
+ if self._tracing_span:
+ self._tracing_span.end()
try:
self.request_metrics.stop(self.finish_time, self.code, self.sentLength)
diff --git a/synapse/logging/context.py b/synapse/logging/context.py
index fd9cb97920..a417b13ffd 100644
--- a/synapse/logging/context.py
+++ b/synapse/logging/context.py
@@ -46,7 +46,6 @@ from twisted.internet import defer, threads
from twisted.python.threadpool import ThreadPool
if TYPE_CHECKING:
- from synapse.logging.scopecontextmanager import _LogContextScope
from synapse.types import ISynapseReactor
logger = logging.getLogger(__name__)
@@ -221,14 +220,13 @@ LoggingContextOrSentinel = Union["LoggingContext", "_Sentinel"]
class _Sentinel:
"""Sentinel to represent the root context"""
- __slots__ = ["previous_context", "finished", "request", "scope", "tag"]
+ __slots__ = ["previous_context", "finished", "request", "tag"]
def __init__(self) -> None:
# Minimal set for compatibility with LoggingContext
self.previous_context = None
self.finished = False
self.request = None
- self.scope = None
self.tag = None
def __str__(self) -> str:
@@ -281,7 +279,6 @@ class LoggingContext:
"finished",
"request",
"tag",
- "scope",
]
def __init__(
@@ -302,7 +299,6 @@ class LoggingContext:
self.main_thread = get_thread_id()
self.request = None
self.tag = ""
- self.scope: Optional["_LogContextScope"] = None
# keep track of whether we have hit the __exit__ block for this context
# (suggesting that the the thing that created the context thinks it should
@@ -315,9 +311,6 @@ class LoggingContext:
# we track the current request_id
self.request = self.parent_context.request
- # we also track the current scope:
- self.scope = self.parent_context.scope
-
if request is not None:
# the request param overrides the request from the parent context
self.request = request
diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py
deleted file mode 100644
index c1aa205eed..0000000000
--- a/synapse/logging/opentracing.py
+++ /dev/null
@@ -1,972 +0,0 @@
-# Copyright 2019 The Matrix.org Foundation C.I.C.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-# NOTE
-# This is a small wrapper around opentracing because opentracing is not currently
-# packaged downstream (specifically debian). Since opentracing instrumentation is
-# fairly invasive it was awkward to make it optional. As a result we opted to encapsulate
-# all opentracing state in these methods which effectively noop if opentracing is
-# not present. We should strongly consider encouraging the downstream distributers
-# to package opentracing and making opentracing a full dependency. In order to facilitate
-# this move the methods have work very similarly to opentracing's and it should only
-# be a matter of few regexes to move over to opentracing's access patterns proper.
-
-"""
-============================
-Using OpenTracing in Synapse
-============================
-
-Python-specific tracing concepts are at https://opentracing.io/guides/python/.
-Note that Synapse wraps OpenTracing in a small module (this one) in order to make the
-OpenTracing dependency optional. That means that the access patterns are
-different to those demonstrated in the OpenTracing guides. However, it is
-still useful to know, especially if OpenTracing is included as a full dependency
-in the future or if you are modifying this module.
-
-
-OpenTracing is encapsulated so that
-no span objects from OpenTracing are exposed in Synapse's code. This allows
-OpenTracing to be easily disabled in Synapse and thereby have OpenTracing as
-an optional dependency. This does however limit the number of modifiable spans
-at any point in the code to one. From here out references to `opentracing`
-in the code snippets refer to the Synapses module.
-Most methods provided in the module have a direct correlation to those provided
-by opentracing. Refer to docs there for a more in-depth documentation on some of
-the args and methods.
-
-Tracing
--------
-
-In Synapse it is not possible to start a non-active span. Spans can be started
-using the ``start_active_span`` method. This returns a scope (see
-OpenTracing docs) which is a context manager that needs to be entered and
-exited. This is usually done by using ``with``.
-
-.. code-block:: python
-
- from synapse.logging.opentracing import start_active_span
-
- with start_active_span("operation name"):
- # Do something we want to tracer
-
-Forgetting to enter or exit a scope will result in some mysterious and grievous log
-context errors.
-
-At anytime where there is an active span ``opentracing.set_tag`` can be used to
-set a tag on the current active span.
-
-Tracing functions
------------------
-
-Functions can be easily traced using decorators. The name of
-the function becomes the operation name for the span.
-
-.. code-block:: python
-
- from synapse.logging.opentracing import trace
-
- # Start a span using 'interesting_function' as the operation name
- @trace
- def interesting_function(*args, **kwargs):
- # Does all kinds of cool and expected things
- return something_usual_and_useful
-
-
-Operation names can be explicitly set for a function by using ``trace_with_opname``:
-
-.. code-block:: python
-
- from synapse.logging.opentracing import trace_with_opname
-
- @trace_with_opname("a_better_operation_name")
- def interesting_badly_named_function(*args, **kwargs):
- # Does all kinds of cool and expected things
- return something_usual_and_useful
-
-Setting Tags
-------------
-
-To set a tag on the active span do
-
-.. code-block:: python
-
- from synapse.logging.opentracing import set_tag
-
- set_tag(tag_name, tag_value)
-
-There's a convenient decorator to tag all the args of the method. It uses
-inspection in order to use the formal parameter names prefixed with 'ARG_' as
-tag names. It uses kwarg names as tag names without the prefix.
-
-.. code-block:: python
-
- from synapse.logging.opentracing import tag_args
-
- @tag_args
- def set_fates(clotho, lachesis, atropos, father="Zues", mother="Themis"):
- pass
-
- set_fates("the story", "the end", "the act")
- # This will have the following tags
- # - ARG_clotho: "the story"
- # - ARG_lachesis: "the end"
- # - ARG_atropos: "the act"
- # - father: "Zues"
- # - mother: "Themis"
-
-Contexts and carriers
----------------------
-
-There are a selection of wrappers for injecting and extracting contexts from
-carriers provided. Unfortunately OpenTracing's three context injection
-techniques are not adequate for our inject of OpenTracing span-contexts into
-Twisted's http headers, EDU contents and our database tables. Also note that
-the binary encoding format mandated by OpenTracing is not actually implemented
-by jaeger_client v4.0.0 - it will silently noop.
-Please refer to the end of ``logging/opentracing.py`` for the available
-injection and extraction methods.
-
-Homeserver whitelisting
------------------------
-
-Most of the whitelist checks are encapsulated in the modules's injection
-and extraction method but be aware that using custom carriers or crossing
-unchartered waters will require the enforcement of the whitelist.
-``logging/opentracing.py`` has a ``whitelisted_homeserver`` method which takes
-in a destination and compares it to the whitelist.
-
-Most injection methods take a 'destination' arg. The context will only be injected
-if the destination matches the whitelist or the destination is None.
-
-=======
-Gotchas
-=======
-
-- Checking whitelists on span propagation
-- Inserting pii
-- Forgetting to enter or exit a scope
-- Span source: make sure that the span you expect to be active across a
- function call really will be that one. Does the current function have more
- than one caller? Will all of those calling functions have be in a context
- with an active span?
-"""
-import contextlib
-import enum
-import inspect
-import logging
-import re
-from functools import wraps
-from typing import (
- TYPE_CHECKING,
- Any,
- Callable,
- Collection,
- Dict,
- Generator,
- Iterable,
- List,
- Optional,
- Pattern,
- Type,
- TypeVar,
- Union,
- cast,
- overload,
-)
-
-import attr
-from typing_extensions import ParamSpec
-
-from twisted.internet import defer
-from twisted.web.http import Request
-from twisted.web.http_headers import Headers
-
-from synapse.config import ConfigError
-from synapse.util import json_decoder, json_encoder
-
-if TYPE_CHECKING:
- from synapse.http.site import SynapseRequest
- from synapse.server import HomeServer
-
-# Helper class
-
-
-class _DummyTagNames:
- """wrapper of opentracings tags. We need to have them if we
- want to reference them without opentracing around. Clearly they
- should never actually show up in a trace. `set_tags` overwrites
- these with the correct ones."""
-
- INVALID_TAG = "invalid-tag"
- COMPONENT = INVALID_TAG
- DATABASE_INSTANCE = INVALID_TAG
- DATABASE_STATEMENT = INVALID_TAG
- DATABASE_TYPE = INVALID_TAG
- DATABASE_USER = INVALID_TAG
- ERROR = INVALID_TAG
- HTTP_METHOD = INVALID_TAG
- HTTP_STATUS_CODE = INVALID_TAG
- HTTP_URL = INVALID_TAG
- MESSAGE_BUS_DESTINATION = INVALID_TAG
- PEER_ADDRESS = INVALID_TAG
- PEER_HOSTNAME = INVALID_TAG
- PEER_HOST_IPV4 = INVALID_TAG
- PEER_HOST_IPV6 = INVALID_TAG
- PEER_PORT = INVALID_TAG
- PEER_SERVICE = INVALID_TAG
- SAMPLING_PRIORITY = INVALID_TAG
- SERVICE = INVALID_TAG
- SPAN_KIND = INVALID_TAG
- SPAN_KIND_CONSUMER = INVALID_TAG
- SPAN_KIND_PRODUCER = INVALID_TAG
- SPAN_KIND_RPC_CLIENT = INVALID_TAG
- SPAN_KIND_RPC_SERVER = INVALID_TAG
-
-
-try:
- import opentracing
- import opentracing.tags
-
- tags = opentracing.tags
-except ImportError:
- opentracing = None # type: ignore[assignment]
- tags = _DummyTagNames # type: ignore[assignment]
-try:
- from jaeger_client import Config as JaegerConfig
-
- from synapse.logging.scopecontextmanager import LogContextScopeManager
-except ImportError:
- JaegerConfig = None # type: ignore
- LogContextScopeManager = None # type: ignore
-
-
-try:
- from rust_python_jaeger_reporter import Reporter
-
- # jaeger-client 4.7.0 requires that reporters inherit from BaseReporter, which
- # didn't exist before that version.
- try:
- from jaeger_client.reporter import BaseReporter
- except ImportError:
-
- class BaseReporter: # type: ignore[no-redef]
- pass
-
- @attr.s(slots=True, frozen=True, auto_attribs=True)
- class _WrappedRustReporter(BaseReporter):
- """Wrap the reporter to ensure `report_span` never throws."""
-
- _reporter: Reporter = attr.Factory(Reporter)
-
- def set_process(self, *args: Any, **kwargs: Any) -> None:
- return self._reporter.set_process(*args, **kwargs)
-
- def report_span(self, span: "opentracing.Span") -> None:
- try:
- return self._reporter.report_span(span)
- except Exception:
- logger.exception("Failed to report span")
-
- RustReporter: Optional[Type[_WrappedRustReporter]] = _WrappedRustReporter
-except ImportError:
- RustReporter = None
-
-
-logger = logging.getLogger(__name__)
-
-
-class SynapseTags:
- # The message ID of any to_device message processed
- TO_DEVICE_MESSAGE_ID = "to_device.message_id"
-
- # Whether the sync response has new data to be returned to the client.
- SYNC_RESULT = "sync.new_data"
-
- # incoming HTTP request ID (as written in the logs)
- REQUEST_ID = "request_id"
-
- # HTTP request tag (used to distinguish full vs incremental syncs, etc)
- REQUEST_TAG = "request_tag"
-
- # Text description of a database transaction
- DB_TXN_DESC = "db.txn_desc"
-
- # Uniqueish ID of a database transaction
- DB_TXN_ID = "db.txn_id"
-
- # The name of the external cache
- CACHE_NAME = "cache.name"
-
-
-class SynapseBaggage:
- FORCE_TRACING = "synapse-force-tracing"
-
-
-# Block everything by default
-# A regex which matches the server_names to expose traces for.
-# None means 'block everything'.
-_homeserver_whitelist: Optional[Pattern[str]] = None
-
-# Util methods
-
-
-class _Sentinel(enum.Enum):
- # defining a sentinel in this way allows mypy to correctly handle the
- # type of a dictionary lookup.
- sentinel = object()
-
-
-P = ParamSpec("P")
-R = TypeVar("R")
-T = TypeVar("T")
-
-
-def only_if_tracing(func: Callable[P, R]) -> Callable[P, Optional[R]]:
- """Executes the function only if we're tracing. Otherwise returns None."""
-
- @wraps(func)
- def _only_if_tracing_inner(*args: P.args, **kwargs: P.kwargs) -> Optional[R]:
- if opentracing:
- return func(*args, **kwargs)
- else:
- return None
-
- return _only_if_tracing_inner
-
-
-@overload
-def ensure_active_span(
- message: str,
-) -> Callable[[Callable[P, R]], Callable[P, Optional[R]]]:
- ...
-
-
-@overload
-def ensure_active_span(
- message: str, ret: T
-) -> Callable[[Callable[P, R]], Callable[P, Union[T, R]]]:
- ...
-
-
-def ensure_active_span(
- message: str, ret: Optional[T] = None
-) -> Callable[[Callable[P, R]], Callable[P, Union[Optional[T], R]]]:
- """Executes the operation only if opentracing is enabled and there is an active span.
- If there is no active span it logs message at the error level.
-
- Args:
- message: Message which fills in "There was no active span when trying to %s"
- in the error log if there is no active span and opentracing is enabled.
- ret: return value if opentracing is None or there is no active span.
-
- Returns:
- The result of the func, falling back to ret if opentracing is disabled or there
- was no active span.
- """
-
- def ensure_active_span_inner_1(
- func: Callable[P, R]
- ) -> Callable[P, Union[Optional[T], R]]:
- @wraps(func)
- def ensure_active_span_inner_2(
- *args: P.args, **kwargs: P.kwargs
- ) -> Union[Optional[T], R]:
- if not opentracing:
- return ret
-
- if not opentracing.tracer.active_span:
- logger.error(
- "There was no active span when trying to %s."
- " Did you forget to start one or did a context slip?",
- message,
- stack_info=True,
- )
-
- return ret
-
- return func(*args, **kwargs)
-
- return ensure_active_span_inner_2
-
- return ensure_active_span_inner_1
-
-
-# Setup
-
-
-def init_tracer(hs: "HomeServer") -> None:
- """Set the whitelists and initialise the JaegerClient tracer"""
- global opentracing
- if not hs.config.tracing.opentracer_enabled:
- # We don't have a tracer
- opentracing = None # type: ignore[assignment]
- return
-
- if not opentracing or not JaegerConfig:
- raise ConfigError(
- "The server has been configured to use opentracing but opentracing is not "
- "installed."
- )
-
- # Pull out the jaeger config if it was given. Otherwise set it to something sensible.
- # See https://github.com/jaegertracing/jaeger-client-python/blob/master/jaeger_client/config.py
-
- set_homeserver_whitelist(hs.config.tracing.opentracer_whitelist)
-
- from jaeger_client.metrics.prometheus import PrometheusMetricsFactory
-
- config = JaegerConfig(
- config=hs.config.tracing.jaeger_config,
- service_name=f"{hs.config.server.server_name} {hs.get_instance_name()}",
- scope_manager=LogContextScopeManager(),
- metrics_factory=PrometheusMetricsFactory(),
- )
-
- # If we have the rust jaeger reporter available let's use that.
- if RustReporter:
- logger.info("Using rust_python_jaeger_reporter library")
- assert config.sampler is not None
- tracer = config.create_tracer(RustReporter(), config.sampler)
- opentracing.set_global_tracer(tracer)
- else:
- config.initialize_tracer()
-
-
-# Whitelisting
-
-
-@only_if_tracing
-def set_homeserver_whitelist(homeserver_whitelist: Iterable[str]) -> None:
- """Sets the homeserver whitelist
-
- Args:
- homeserver_whitelist: regexes specifying whitelisted homeservers
- """
- global _homeserver_whitelist
- if homeserver_whitelist:
- # Makes a single regex which accepts all passed in regexes in the list
- _homeserver_whitelist = re.compile(
- "({})".format(")|(".join(homeserver_whitelist))
- )
-
-
-@only_if_tracing
-def whitelisted_homeserver(destination: str) -> bool:
- """Checks if a destination matches the whitelist
-
- Args:
- destination
- """
-
- if _homeserver_whitelist:
- return _homeserver_whitelist.match(destination) is not None
- return False
-
-
-# Start spans and scopes
-
-# Could use kwargs but I want these to be explicit
-def start_active_span(
- operation_name: str,
- child_of: Optional[Union["opentracing.Span", "opentracing.SpanContext"]] = None,
- references: Optional[List["opentracing.Reference"]] = None,
- tags: Optional[Dict[str, str]] = None,
- start_time: Optional[float] = None,
- ignore_active_span: bool = False,
- finish_on_close: bool = True,
- *,
- tracer: Optional["opentracing.Tracer"] = None,
-) -> "opentracing.Scope":
- """Starts an active opentracing span.
-
- Records the start time for the span, and sets it as the "active span" in the
- scope manager.
-
- Args:
- See opentracing.tracer
- Returns:
- scope (Scope) or contextlib.nullcontext
- """
-
- if opentracing is None:
- return contextlib.nullcontext() # type: ignore[unreachable]
-
- if tracer is None:
- # use the global tracer by default
- tracer = opentracing.tracer
-
- return tracer.start_active_span(
- operation_name,
- child_of=child_of,
- references=references,
- tags=tags,
- start_time=start_time,
- ignore_active_span=ignore_active_span,
- finish_on_close=finish_on_close,
- )
-
-
-def start_active_span_follows_from(
- operation_name: str,
- contexts: Collection,
- child_of: Optional[Union["opentracing.Span", "opentracing.SpanContext"]] = None,
- start_time: Optional[float] = None,
- *,
- inherit_force_tracing: bool = False,
- tracer: Optional["opentracing.Tracer"] = None,
-) -> "opentracing.Scope":
- """Starts an active opentracing span, with additional references to previous spans
-
- Args:
- operation_name: name of the operation represented by the new span
- contexts: the previous spans to inherit from
-
- child_of: optionally override the parent span. If unset, the currently active
- span will be the parent. (If there is no currently active span, the first
- span in `contexts` will be the parent.)
-
- start_time: optional override for the start time of the created span. Seconds
- since the epoch.
-
- inherit_force_tracing: if set, and any of the previous contexts have had tracing
- forced, the new span will also have tracing forced.
- tracer: override the opentracing tracer. By default the global tracer is used.
- """
- if opentracing is None:
- return contextlib.nullcontext() # type: ignore[unreachable]
-
- references = [opentracing.follows_from(context) for context in contexts]
- scope = start_active_span(
- operation_name,
- child_of=child_of,
- references=references,
- start_time=start_time,
- tracer=tracer,
- )
-
- if inherit_force_tracing and any(
- is_context_forced_tracing(ctx) for ctx in contexts
- ):
- force_tracing(scope.span)
-
- return scope
-
-
-def start_active_span_from_edu(
- edu_content: Dict[str, Any],
- operation_name: str,
- references: Optional[List["opentracing.Reference"]] = None,
- tags: Optional[Dict[str, str]] = None,
- start_time: Optional[float] = None,
- ignore_active_span: bool = False,
- finish_on_close: bool = True,
-) -> "opentracing.Scope":
- """
- Extracts a span context from an edu and uses it to start a new active span
-
- Args:
- edu_content: an edu_content with a `context` field whose value is
- canonical json for a dict which contains opentracing information.
-
- For the other args see opentracing.tracer
- """
- references = references or []
-
- if opentracing is None:
- return contextlib.nullcontext() # type: ignore[unreachable]
-
- carrier = json_decoder.decode(edu_content.get("context", "{}")).get(
- "opentracing", {}
- )
- context = opentracing.tracer.extract(opentracing.Format.TEXT_MAP, carrier)
- _references = [
- opentracing.child_of(span_context_from_string(x))
- for x in carrier.get("references", [])
- ]
-
- # For some reason jaeger decided not to support the visualization of multiple parent
- # spans or explicitly show references. I include the span context as a tag here as
- # an aid to people debugging but it's really not an ideal solution.
-
- references += _references
-
- scope = opentracing.tracer.start_active_span(
- operation_name,
- child_of=context,
- references=references,
- tags=tags,
- start_time=start_time,
- ignore_active_span=ignore_active_span,
- finish_on_close=finish_on_close,
- )
-
- scope.span.set_tag("references", carrier.get("references", []))
- return scope
-
-
-# Opentracing setters for tags, logs, etc
-@only_if_tracing
-def active_span() -> Optional["opentracing.Span"]:
- """Get the currently active span, if any"""
- return opentracing.tracer.active_span
-
-
-@ensure_active_span("set a tag")
-def set_tag(key: str, value: Union[str, bool, int, float]) -> None:
- """Sets a tag on the active span"""
- assert opentracing.tracer.active_span is not None
- opentracing.tracer.active_span.set_tag(key, value)
-
-
-@ensure_active_span("log")
-def log_kv(key_values: Dict[str, Any], timestamp: Optional[float] = None) -> None:
- """Log to the active span"""
- assert opentracing.tracer.active_span is not None
- opentracing.tracer.active_span.log_kv(key_values, timestamp)
-
-
-@ensure_active_span("set the traces operation name")
-def set_operation_name(operation_name: str) -> None:
- """Sets the operation name of the active span"""
- assert opentracing.tracer.active_span is not None
- opentracing.tracer.active_span.set_operation_name(operation_name)
-
-
-@only_if_tracing
-def force_tracing(
- span: Union["opentracing.Span", _Sentinel] = _Sentinel.sentinel
-) -> None:
- """Force sampling for the active/given span and its children.
-
- Args:
- span: span to force tracing for. By default, the active span.
- """
- if isinstance(span, _Sentinel):
- span_to_trace = opentracing.tracer.active_span
- else:
- span_to_trace = span
- if span_to_trace is None:
- logger.error("No active span in force_tracing")
- return
-
- span_to_trace.set_tag(opentracing.tags.SAMPLING_PRIORITY, 1)
-
- # also set a bit of baggage, so that we have a way of figuring out if
- # it is enabled later
- span_to_trace.set_baggage_item(SynapseBaggage.FORCE_TRACING, "1")
-
-
-def is_context_forced_tracing(
- span_context: Optional["opentracing.SpanContext"],
-) -> bool:
- """Check if sampling has been force for the given span context."""
- if span_context is None:
- return False
- return span_context.baggage.get(SynapseBaggage.FORCE_TRACING) is not None
-
-
-# Injection and extraction
-
-
-@ensure_active_span("inject the span into a header dict")
-def inject_header_dict(
- headers: Dict[bytes, List[bytes]],
- destination: Optional[str] = None,
- check_destination: bool = True,
-) -> None:
- """
- Injects a span context into a dict of HTTP headers
-
- Args:
- headers: the dict to inject headers into
- destination: address of entity receiving the span context. Must be given unless
- check_destination is False. The context will only be injected if the
- destination matches the opentracing whitelist
- check_destination (bool): If false, destination will be ignored and the context
- will always be injected.
-
- Note:
- The headers set by the tracer are custom to the tracer implementation which
- should be unique enough that they don't interfere with any headers set by
- synapse or twisted. If we're still using jaeger these headers would be those
- here:
- https://github.com/jaegertracing/jaeger-client-python/blob/master/jaeger_client/constants.py
- """
- if check_destination:
- if destination is None:
- raise ValueError(
- "destination must be given unless check_destination is False"
- )
- if not whitelisted_homeserver(destination):
- return
-
- span = opentracing.tracer.active_span
-
- carrier: Dict[str, str] = {}
- assert span is not None
- opentracing.tracer.inject(span.context, opentracing.Format.HTTP_HEADERS, carrier)
-
- for key, value in carrier.items():
- headers[key.encode()] = [value.encode()]
-
-
-def inject_response_headers(response_headers: Headers) -> None:
- """Inject the current trace id into the HTTP response headers"""
- if not opentracing:
- return
- span = opentracing.tracer.active_span
- if not span:
- return
-
- # This is a bit implementation-specific.
- #
- # Jaeger's Spans have a trace_id property; other implementations (including the
- # dummy opentracing.span.Span which we use if init_tracer is not called) do not
- # expose it
- trace_id = getattr(span, "trace_id", None)
-
- if trace_id is not None:
- response_headers.addRawHeader("Synapse-Trace-Id", f"{trace_id:x}")
-
-
-@ensure_active_span(
- "get the active span context as a dict", ret=cast(Dict[str, str], {})
-)
-def get_active_span_text_map(destination: Optional[str] = None) -> Dict[str, str]:
- """
- Gets a span context as a dict. This can be used instead of manually
- injecting a span into an empty carrier.
-
- Args:
- destination: the name of the remote server.
-
- Returns:
- dict: the active span's context if opentracing is enabled, otherwise empty.
- """
-
- if destination and not whitelisted_homeserver(destination):
- return {}
-
- carrier: Dict[str, str] = {}
- assert opentracing.tracer.active_span is not None
- opentracing.tracer.inject(
- opentracing.tracer.active_span.context, opentracing.Format.TEXT_MAP, carrier
- )
-
- return carrier
-
-
-@ensure_active_span("get the span context as a string.", ret={})
-def active_span_context_as_string() -> str:
- """
- Returns:
- The active span context encoded as a string.
- """
- carrier: Dict[str, str] = {}
- if opentracing:
- assert opentracing.tracer.active_span is not None
- opentracing.tracer.inject(
- opentracing.tracer.active_span.context, opentracing.Format.TEXT_MAP, carrier
- )
- return json_encoder.encode(carrier)
-
-
-def span_context_from_request(request: Request) -> "Optional[opentracing.SpanContext]":
- """Extract an opentracing context from the headers on an HTTP request
-
- This is useful when we have received an HTTP request from another part of our
- system, and want to link our spans to those of the remote system.
- """
- if not opentracing:
- return None
- header_dict = {
- k.decode(): v[0].decode() for k, v in request.requestHeaders.getAllRawHeaders()
- }
- return opentracing.tracer.extract(opentracing.Format.HTTP_HEADERS, header_dict)
-
-
-@only_if_tracing
-def span_context_from_string(carrier: str) -> Optional["opentracing.SpanContext"]:
- """
- Returns:
- The active span context decoded from a string.
- """
- payload: Dict[str, str] = json_decoder.decode(carrier)
- return opentracing.tracer.extract(opentracing.Format.TEXT_MAP, payload)
-
-
-@only_if_tracing
-def extract_text_map(carrier: Dict[str, str]) -> Optional["opentracing.SpanContext"]:
- """
- Wrapper method for opentracing's tracer.extract for TEXT_MAP.
- Args:
- carrier: a dict possibly containing a span context.
-
- Returns:
- The active span context extracted from carrier.
- """
- return opentracing.tracer.extract(opentracing.Format.TEXT_MAP, carrier)
-
-
-# Tracing decorators
-
-
-def trace_with_opname(opname: str) -> Callable[[Callable[P, R]], Callable[P, R]]:
- """
- Decorator to trace a function with a custom opname.
-
- See the module's doc string for usage examples.
-
- """
-
- def decorator(func: Callable[P, R]) -> Callable[P, R]:
- if opentracing is None:
- return func # type: ignore[unreachable]
-
- if inspect.iscoroutinefunction(func):
-
- @wraps(func)
- async def _trace_inner(*args: P.args, **kwargs: P.kwargs) -> R:
- with start_active_span(opname):
- return await func(*args, **kwargs) # type: ignore[misc]
-
- else:
- # The other case here handles both sync functions and those
- # decorated with inlineDeferred.
- @wraps(func)
- def _trace_inner(*args: P.args, **kwargs: P.kwargs) -> R:
- scope = start_active_span(opname)
- scope.__enter__()
-
- try:
- result = func(*args, **kwargs)
- if isinstance(result, defer.Deferred):
-
- def call_back(result: R) -> R:
- scope.__exit__(None, None, None)
- return result
-
- def err_back(result: R) -> R:
- scope.__exit__(None, None, None)
- return result
-
- result.addCallbacks(call_back, err_back)
-
- else:
- if inspect.isawaitable(result):
- logger.error(
- "@trace may not have wrapped %s correctly! "
- "The function is not async but returned a %s.",
- func.__qualname__,
- type(result).__name__,
- )
-
- scope.__exit__(None, None, None)
-
- return result
-
- except Exception as e:
- scope.__exit__(type(e), None, e.__traceback__)
- raise
-
- return _trace_inner # type: ignore[return-value]
-
- return decorator
-
-
-def trace(func: Callable[P, R]) -> Callable[P, R]:
- """
- Decorator to trace a function.
-
- Sets the operation name to that of the function's name.
-
- See the module's doc string for usage examples.
- """
-
- return trace_with_opname(func.__name__)(func)
-
-
-def tag_args(func: Callable[P, R]) -> Callable[P, R]:
- """
- Tags all of the args to the active span.
- """
-
- if not opentracing:
- return func
-
- @wraps(func)
- def _tag_args_inner(*args: P.args, **kwargs: P.kwargs) -> R:
- argspec = inspect.getfullargspec(func)
- for i, arg in enumerate(argspec.args[1:]):
- set_tag("ARG_" + arg, str(args[i])) # type: ignore[index]
- set_tag("args", str(args[len(argspec.args) :])) # type: ignore[index]
- set_tag("kwargs", str(kwargs))
- return func(*args, **kwargs)
-
- return _tag_args_inner
-
-
-@contextlib.contextmanager
-def trace_servlet(
- request: "SynapseRequest", extract_context: bool = False
-) -> Generator[None, None, None]:
- """Returns a context manager which traces a request. It starts a span
- with some servlet specific tags such as the request metrics name and
- request information.
-
- Args:
- request
- extract_context: Whether to attempt to extract the opentracing
- context from the request the servlet is handling.
- """
-
- if opentracing is None:
- yield # type: ignore[unreachable]
- return
-
- request_tags = {
- SynapseTags.REQUEST_ID: request.get_request_id(),
- tags.SPAN_KIND: tags.SPAN_KIND_RPC_SERVER,
- tags.HTTP_METHOD: request.get_method(),
- tags.HTTP_URL: request.get_redacted_uri(),
- tags.PEER_HOST_IPV6: request.getClientAddress().host,
- }
-
- request_name = request.request_metrics.name
- context = span_context_from_request(request) if extract_context else None
-
- # we configure the scope not to finish the span immediately on exit, and instead
- # pass the span into the SynapseRequest, which will finish it once we've finished
- # sending the response to the client.
- scope = start_active_span(request_name, child_of=context, finish_on_close=False)
- request.set_opentracing_span(scope.span)
-
- with scope:
- inject_response_headers(request.responseHeaders)
- try:
- yield
- finally:
- # We set the operation name again in case its changed (which happens
- # with JsonResource).
- scope.span.set_operation_name(request.request_metrics.name)
-
- # set the tags *after* the servlet completes, in case it decided to
- # prioritise the span (tags will get dropped on unprioritised spans)
- request_tags[
- SynapseTags.REQUEST_TAG
- ] = request.request_metrics.start_context.tag
-
- for k, v in request_tags.items():
- scope.span.set_tag(k, v)
diff --git a/synapse/logging/scopecontextmanager.py b/synapse/logging/scopecontextmanager.py
deleted file mode 100644
index 10877bdfc5..0000000000
--- a/synapse/logging/scopecontextmanager.py
+++ /dev/null
@@ -1,171 +0,0 @@
-# Copyright 2019 The Matrix.org Foundation C.I.C.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.import logging
-
-import logging
-from types import TracebackType
-from typing import Optional, Type
-
-from opentracing import Scope, ScopeManager, Span
-
-import twisted
-
-from synapse.logging.context import (
- LoggingContext,
- current_context,
- nested_logging_context,
-)
-
-logger = logging.getLogger(__name__)
-
-
-class LogContextScopeManager(ScopeManager):
- """
- The LogContextScopeManager tracks the active scope in opentracing
- by using the log contexts which are native to synapse. This is so
- that the basic opentracing api can be used across twisted defereds.
-
- It would be nice just to use opentracing's ContextVarsScopeManager,
- but currently that doesn't work due to https://twistedmatrix.com/trac/ticket/10301.
- """
-
- def __init__(self) -> None:
- pass
-
- @property
- def active(self) -> Optional[Scope]:
- """
- Returns the currently active Scope which can be used to access the
- currently active Scope.span.
- If there is a non-null Scope, its wrapped Span
- becomes an implicit parent of any newly-created Span at
- Tracer.start_active_span() time.
-
- Return:
- The Scope that is active, or None if not available.
- """
- ctx = current_context()
- return ctx.scope
-
- def activate(self, span: Span, finish_on_close: bool) -> Scope:
- """
- Makes a Span active.
- Args
- span: the span that should become active.
- finish_on_close: whether Span should be automatically finished when
- Scope.close() is called.
-
- Returns:
- Scope to control the end of the active period for
- *span*. It is a programming error to neglect to call
- Scope.close() on the returned instance.
- """
-
- ctx = current_context()
-
- if not ctx:
- logger.error("Tried to activate scope outside of loggingcontext")
- return Scope(None, span) # type: ignore[arg-type]
-
- if ctx.scope is not None:
- # start a new logging context as a child of the existing one.
- # Doing so -- rather than updating the existing logcontext -- means that
- # creating several concurrent spans under the same logcontext works
- # correctly.
- ctx = nested_logging_context("")
- enter_logcontext = True
- else:
- # if there is no span currently associated with the current logcontext, we
- # just store the scope in it.
- #
- # This feels a bit dubious, but it does hack around a problem where a
- # span outlasts its parent logcontext (which would otherwise lead to
- # "Re-starting finished log context" errors).
- enter_logcontext = False
-
- scope = _LogContextScope(self, span, ctx, enter_logcontext, finish_on_close)
- ctx.scope = scope
- if enter_logcontext:
- ctx.__enter__()
-
- return scope
-
-
-class _LogContextScope(Scope):
- """
- A custom opentracing scope, associated with a LogContext
-
- * filters out _DefGen_Return exceptions which arise from calling
- `defer.returnValue` in Twisted code
-
- * When the scope is closed, the logcontext's active scope is reset to None.
- and - if enter_logcontext was set - the logcontext is finished too.
- """
-
- def __init__(
- self,
- manager: LogContextScopeManager,
- span: Span,
- logcontext: LoggingContext,
- enter_logcontext: bool,
- finish_on_close: bool,
- ):
- """
- Args:
- manager:
- the manager that is responsible for this scope.
- span:
- the opentracing span which this scope represents the local
- lifetime for.
- logcontext:
- the log context to which this scope is attached.
- enter_logcontext:
- if True the log context will be exited when the scope is finished
- finish_on_close:
- if True finish the span when the scope is closed
- """
- super().__init__(manager, span)
- self.logcontext = logcontext
- self._finish_on_close = finish_on_close
- self._enter_logcontext = enter_logcontext
-
- def __exit__(
- self,
- exc_type: Optional[Type[BaseException]],
- value: Optional[BaseException],
- traceback: Optional[TracebackType],
- ) -> None:
- if exc_type == twisted.internet.defer._DefGen_Return:
- # filter out defer.returnValue() calls
- exc_type = value = traceback = None
- super().__exit__(exc_type, value, traceback)
-
- def __str__(self) -> str:
- return f"Scope<{self.span}>"
-
- def close(self) -> None:
- active_scope = self.manager.active
- if active_scope is not self:
- logger.error(
- "Closing scope %s which is not the currently-active one %s",
- self,
- active_scope,
- )
-
- if self._finish_on_close:
- self.span.finish()
-
- self.logcontext.scope = None
-
- if self._enter_logcontext:
- self.logcontext.__exit__(None, None, None)
diff --git a/synapse/logging/tracing.py b/synapse/logging/tracing.py
new file mode 100644
index 0000000000..e3a1a010a2
--- /dev/null
+++ b/synapse/logging/tracing.py
@@ -0,0 +1,942 @@
+# Copyright 2019 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+# NOTE This is a small wrapper around opentelemetry because tracing is optional
+# and not always packaged downstream. Since opentelemetry instrumentation is
+# fairly invasive it was awkward to make it optional. As a result we opted to
+# encapsulate all opentelemetry state in these methods which effectively noop if
+# opentelemetry is not present. We should strongly consider encouraging the
+# downstream distributers to package opentelemetry and making opentelemetry a
+# full dependency. In order to facilitate this move the methods have work very
+# similarly to opentelemetry's and it should only be a matter of few regexes to
+# move over to opentelemetry's access patterns proper.
+
+"""
+============================
+Using OpenTelemetry in Synapse
+============================
+
+Python-specific tracing concepts are at
+https://opentelemetry.io/docs/instrumentation/python/. Note that Synapse wraps
+OpenTelemetry in a small module (this one) in order to make the OpenTelemetry
+dependency optional. That means that some access patterns are different to those
+demonstrated in the OpenTelemetry guides. However, it is still useful to know,
+especially if OpenTelemetry is included as a full dependency in the future or if
+you are modifying this module.
+
+
+OpenTelemetry is encapsulated so that no span objects from OpenTelemetry are
+exposed in Synapse's code. This allows OpenTelemetry to be easily disabled in
+Synapse and thereby have OpenTelemetry as an optional dependency. This does
+however limit the number of modifiable spans at any point in the code to one.
+From here out references to `tracing` in the code snippets refer to the Synapses
+module. Most methods provided in the module have a direct correlation to those
+provided by OpenTelemetry. Refer to docs there for a more in-depth documentation
+on some of the args and methods.
+
+Tracing
+-------
+
+In Synapse, it is not possible to start a non-active span. Spans can be started
+using the ``start_active_span`` method. This returns a context manager that
+needs to be entered and exited to expose the ``span``. This is usually done by
+using a ``with`` statement.
+
+.. code-block:: python
+
+ from synapse.logging.tracing import start_active_span
+
+ with start_active_span("operation name"):
+ # Do something we want to trace
+
+Forgetting to enter or exit a scope will result in unstarted and unfinished
+spans that will not be reported (exported).
+
+At anytime where there is an active span ``set_attribute`` can be
+used to set a tag on the current active span.
+
+Tracing functions
+-----------------
+
+Functions can be easily traced using decorators. The name of the function
+becomes the operation name for the span.
+
+.. code-block:: python
+
+ from synapse.logging.tracing import trace
+
+ # Start a span using 'interesting_function' as the operation name
+ @trace
+ def interesting_function(*args, **kwargs):
+ # Does all kinds of cool and expected things return
+ something_usual_and_useful
+
+
+Operation names can be explicitly set for a function by using
+``trace_with_opname``:
+
+.. code-block:: python
+
+ from synapse.logging.tracing import trace_with_opname
+
+ @trace_with_opname("a_better_operation_name")
+ def interesting_badly_named_function(*args, **kwargs):
+ # Does all kinds of cool and expected things return
+ something_usual_and_useful
+
+Setting Tags
+------------
+
+To set a tag on the active span do
+
+.. code-block:: python
+
+ from synapse.logging.tracing import set_attribute
+
+ set_attribute(tag_name, tag_value)
+
+There's a convenient decorator to tag all the args of the method. It uses
+inspection in order to use the formal parameter names prefixed with 'ARG_' as
+tag names. It uses kwarg names as tag names without the prefix.
+
+.. code-block:: python
+ from synapse.logging.tracing import tag_args
+ @tag_args
+ def set_fates(clotho, lachesis, atropos, father="Zues", mother="Themis"):
+ pass
+ set_fates("the story", "the end", "the act")
+ # This will have the following tags
+ # - ARG_clotho: "the story"
+ # - ARG_lachesis: "the end"
+ # - ARG_atropos: "the act"
+ # - father: "Zues"
+ # - mother: "Themis"
+
+Contexts and carriers
+---------------------
+
+There are a selection of wrappers for injecting and extracting contexts from
+carriers provided. We use these to inject of OpenTelemetry Contexts into
+Twisted's http headers, EDU contents and our database tables. Please refer to
+the end of ``logging/tracing.py`` for the available injection and extraction
+methods.
+
+Homeserver whitelisting
+-----------------------
+
+Most of the whitelist checks are encapsulated in the modules's injection and
+extraction method but be aware that using custom carriers or crossing
+unchartered waters will require the enforcement of the whitelist.
+``logging/tracing.py`` has a ``whitelisted_homeserver`` method which takes
+in a destination and compares it to the whitelist.
+
+Most injection methods take a 'destination' arg. The context will only be
+injected if the destination matches the whitelist or the destination is None.
+
+=======
+Gotchas
+=======
+
+- Checking whitelists on span propagation
+- Inserting pii
+- Forgetting to enter or exit a scope
+- Span source: make sure that the span you expect to be active across a function
+ call really will be that one. Does the current function have more than one
+ caller? Will all of those calling functions have be in a context with an
+ active span?
+"""
+import contextlib
+import inspect
+import logging
+import re
+from abc import ABC
+from functools import wraps
+from typing import (
+ TYPE_CHECKING,
+ Any,
+ Callable,
+ ContextManager,
+ Dict,
+ Generator,
+ Iterable,
+ List,
+ Optional,
+ Pattern,
+ Sequence,
+ TypeVar,
+ Union,
+ cast,
+ overload,
+)
+
+from typing_extensions import ParamSpec
+
+from twisted.internet import defer
+from twisted.web.http import Request
+from twisted.web.http_headers import Headers
+
+from synapse.api.constants import EventContentFields
+from synapse.config import ConfigError
+from synapse.util import json_decoder
+
+if TYPE_CHECKING:
+ from synapse.http.site import SynapseRequest
+ from synapse.server import HomeServer
+
+# Helper class
+
+T = TypeVar("T")
+
+
+class _DummyLookup(object):
+ """This will always returns the fixed value given for any accessed property"""
+
+ def __init__(self, value: T) -> None:
+ self.value = value
+
+ def __getattribute__(self, name: str) -> T:
+ return object.__getattribute__(self, "value")
+
+
+class DummyLink(ABC):
+ """Dummy placeholder for `opentelemetry.trace.Link`"""
+
+ def __init__(self) -> None:
+ self.not_implemented_message = (
+ "opentelemetry isn't installed so this is just a dummy link placeholder"
+ )
+
+ @property
+ def context(self) -> None:
+ raise NotImplementedError(self.not_implemented_message)
+
+ @property
+ def attributes(self) -> None:
+ raise NotImplementedError(self.not_implemented_message)
+
+
+# These dependencies are optional so they can fail to import
+# and we
+try:
+ import opentelemetry
+ import opentelemetry.exporter.jaeger.thrift
+ import opentelemetry.propagate
+ import opentelemetry.sdk.resources
+ import opentelemetry.sdk.trace
+ import opentelemetry.sdk.trace.export
+ import opentelemetry.semconv.trace
+ import opentelemetry.trace
+ import opentelemetry.trace.propagation
+ import opentelemetry.trace.status
+
+ SpanKind = opentelemetry.trace.SpanKind
+ SpanAttributes = opentelemetry.semconv.trace.SpanAttributes
+ StatusCode = opentelemetry.trace.status.StatusCode
+ Link = opentelemetry.trace.Link
+except ImportError:
+ opentelemetry = None # type: ignore[assignment]
+ SpanKind = _DummyLookup(0) # type: ignore
+ SpanAttributes = _DummyLookup("fake-attribute") # type: ignore
+ StatusCode = _DummyLookup(0) # type: ignore
+ Link = DummyLink # type: ignore
+
+
+logger = logging.getLogger(__name__)
+
+
+class SynapseTags:
+ """FIXME: Rename to `SynapseAttributes` so it matches OpenTelemetry `SpanAttributes`"""
+
+ # The message ID of any to_device message processed
+ TO_DEVICE_MESSAGE_ID = "to_device.message_id"
+
+ # Whether the sync response has new data to be returned to the client.
+ SYNC_RESULT = "sync.new_data"
+
+ # incoming HTTP request ID (as written in the logs)
+ REQUEST_ID = "request_id"
+
+ # HTTP request tag (used to distinguish full vs incremental syncs, etc)
+ REQUEST_TAG = "request_tag"
+
+ # Text description of a database transaction
+ DB_TXN_DESC = "db.txn_desc"
+
+ # Uniqueish ID of a database transaction
+ DB_TXN_ID = "db.txn_id"
+
+ # The name of the external cache
+ CACHE_NAME = "cache.name"
+
+
+class SynapseBaggage:
+ FORCE_TRACING = "synapse-force-tracing"
+
+
+# Block everything by default
+# A regex which matches the server_names to expose traces for.
+# None means 'block everything'.
+_homeserver_whitelist: Optional[Pattern[str]] = None
+
+# Util methods
+
+
+P = ParamSpec("P")
+R = TypeVar("R")
+
+
+def only_if_tracing(func: Callable[P, R]) -> Callable[P, Optional[R]]:
+ """Decorator function that executes the function only if we're tracing. Otherwise returns None."""
+
+ @wraps(func)
+ def _only_if_tracing_inner(*args: P.args, **kwargs: P.kwargs) -> Optional[R]:
+ if opentelemetry:
+ return func(*args, **kwargs)
+ else:
+ return None
+
+ return _only_if_tracing_inner
+
+
+@overload
+def ensure_active_span(
+ message: str,
+) -> Callable[[Callable[P, R]], Callable[P, Optional[R]]]:
+ ...
+
+
+@overload
+def ensure_active_span(
+ message: str, ret: T
+) -> Callable[[Callable[P, R]], Callable[P, Union[T, R]]]:
+ ...
+
+
+def ensure_active_span(
+ message: str, ret: Optional[T] = None
+) -> Callable[[Callable[P, R]], Callable[P, Union[Optional[T], R]]]:
+ """Executes the operation only if opentelemetry is enabled and there is an active span.
+ If there is no active span it logs message at the error level.
+
+ Args:
+ message: Message which fills in "There was no active span when trying to %s"
+ in the error log if there is no active span and opentelemetry is enabled.
+ ret: return value if opentelemetry is None or there is no active span.
+
+ Returns:
+ The result of the func, falling back to ret if opentelemetry is disabled or there
+ was no active span.
+ """
+
+ def ensure_active_span_inner_1(
+ func: Callable[P, R]
+ ) -> Callable[P, Union[Optional[T], R]]:
+ @wraps(func)
+ def ensure_active_span_inner_2(
+ *args: P.args, **kwargs: P.kwargs
+ ) -> Union[Optional[T], R]:
+ if not opentelemetry:
+ return ret
+
+ if not opentelemetry.trace.get_current_span():
+ logger.error(
+ "There was no active span when trying to %s."
+ " Did you forget to start one or did a context slip?",
+ message,
+ stack_info=True,
+ )
+
+ return ret
+
+ return func(*args, **kwargs)
+
+ return ensure_active_span_inner_2
+
+ return ensure_active_span_inner_1
+
+
+# Setup
+
+
+def init_tracer(hs: "HomeServer") -> None:
+ """Set the whitelists and initialise the OpenTelemetry tracer"""
+ global opentelemetry
+ if not hs.config.tracing.tracing_enabled:
+ # We don't have a tracer
+ opentelemetry = None # type: ignore[assignment]
+ return
+
+ if not opentelemetry:
+ raise ConfigError(
+ "The server has been configured to use OpenTelemetry but OpenTelemetry is not "
+ "installed."
+ )
+
+ # Pull out of the config if it was given. Otherwise set it to something sensible.
+ set_homeserver_whitelist(hs.config.tracing.homeserver_whitelist)
+
+ resource = opentelemetry.sdk.resources.Resource(
+ attributes={
+ opentelemetry.sdk.resources.SERVICE_NAME: f"{hs.config.server.server_name} {hs.get_instance_name()}"
+ }
+ )
+
+ # TODO: `force_tracing_for_users` is not compatible with OTEL samplers
+ # because you can only determine `opentelemetry.trace.TraceFlags.SAMPLED`
+ # and whether it uses a recording span when the span is created and we don't
+ # have enough information at that time (we can determine in
+ # `synapse/api/auth.py`). There isn't a way to change the trace flags after
+ # the fact so there is no way to programmatically force
+ # recording/tracing/sampling like there was in opentracing.
+ sampler = opentelemetry.sdk.trace.sampling.ParentBasedTraceIdRatio(
+ hs.config.tracing.sample_rate
+ )
+
+ tracer_provider = opentelemetry.sdk.trace.TracerProvider(
+ resource=resource, sampler=sampler
+ )
+
+ # consoleProcessor = opentelemetry.sdk.trace.export.BatchSpanProcessor(
+ # opentelemetry.sdk.trace.export.ConsoleSpanExporter()
+ # )
+ # tracer_provider.add_span_processor(consoleProcessor)
+
+ jaeger_exporter = opentelemetry.exporter.jaeger.thrift.JaegerExporter(
+ **hs.config.tracing.jaeger_exporter_config
+ )
+ jaeger_processor = opentelemetry.sdk.trace.export.BatchSpanProcessor(
+ jaeger_exporter
+ )
+ tracer_provider.add_span_processor(jaeger_processor)
+
+ # Sets the global default tracer provider
+ opentelemetry.trace.set_tracer_provider(tracer_provider)
+
+
+# Whitelisting
+
+
+@only_if_tracing
+def set_homeserver_whitelist(homeserver_whitelist: Iterable[str]) -> None:
+ """Sets the homeserver whitelist
+
+ Args:
+ homeserver_whitelist: regexes specifying whitelisted homeservers
+ """
+ global _homeserver_whitelist
+ if homeserver_whitelist:
+ # Makes a single regex which accepts all passed in regexes in the list
+ _homeserver_whitelist = re.compile(
+ "({})".format(")|(".join(homeserver_whitelist))
+ )
+
+
+@only_if_tracing
+def whitelisted_homeserver(destination: str) -> bool:
+ """Checks if a destination matches the whitelist
+
+ Args:
+ destination
+ """
+
+ if _homeserver_whitelist:
+ return _homeserver_whitelist.match(destination) is not None
+ return False
+
+
+# Start spans and scopes
+
+
+def use_span(
+ span: "opentelemetry.trace.Span",
+ end_on_exit: bool = True,
+) -> ContextManager["opentelemetry.trace.Span"]:
+ if opentelemetry is None:
+ return contextlib.nullcontext() # type: ignore[unreachable]
+
+ return opentelemetry.trace.use_span(span=span, end_on_exit=end_on_exit)
+
+
+def create_non_recording_span() -> "opentelemetry.trace.Span":
+ """Create a no-op span that does not record or become part of a recorded trace"""
+
+ return opentelemetry.trace.NonRecordingSpan(
+ opentelemetry.trace.INVALID_SPAN_CONTEXT
+ )
+
+
+def start_span(
+ name: str,
+ *,
+ context: Optional["opentelemetry.context.context.Context"] = None,
+ kind: Optional["opentelemetry.trace.SpanKind"] = SpanKind.INTERNAL,
+ attributes: "opentelemetry.util.types.Attributes" = None,
+ links: Optional[Sequence["opentelemetry.trace.Link"]] = None,
+ start_time: Optional[int] = None,
+ record_exception: bool = True,
+ set_status_on_exception: bool = True,
+ end_on_exit: bool = True,
+ # For testing only
+ tracer: Optional["opentelemetry.trace.Tracer"] = None,
+) -> "opentelemetry.trace.Span":
+ if opentelemetry is None:
+ raise Exception("Not able to create span without opentelemetry installed.")
+
+ if tracer is None:
+ tracer = opentelemetry.trace.get_tracer(__name__)
+
+ # TODO: Why is this necessary to satisfy this error? It has a default?
+ # ` error: Argument "kind" to "start_span" of "Tracer" has incompatible type "Optional[SpanKind]"; expected "SpanKind" [arg-type]`
+ if kind is None:
+ kind = SpanKind.INTERNAL
+
+ return tracer.start_span(
+ name=name,
+ context=context,
+ kind=kind,
+ attributes=attributes,
+ links=links,
+ start_time=start_time,
+ record_exception=record_exception,
+ set_status_on_exception=set_status_on_exception,
+ )
+
+
+def start_active_span(
+ name: str,
+ *,
+ context: Optional["opentelemetry.context.context.Context"] = None,
+ kind: Optional["opentelemetry.trace.SpanKind"] = SpanKind.INTERNAL,
+ attributes: "opentelemetry.util.types.Attributes" = None,
+ links: Optional[Sequence["opentelemetry.trace.Link"]] = None,
+ start_time: Optional[int] = None,
+ record_exception: bool = True,
+ set_status_on_exception: bool = True,
+ end_on_exit: bool = True,
+ # For testing only
+ tracer: Optional["opentelemetry.trace.Tracer"] = None,
+) -> ContextManager["opentelemetry.trace.Span"]:
+ if opentelemetry is None:
+ return contextlib.nullcontext() # type: ignore[unreachable]
+
+ # TODO: Why is this necessary to satisfy this error? It has a default?
+ # ` error: Argument "kind" to "start_span" of "Tracer" has incompatible type "Optional[SpanKind]"; expected "SpanKind" [arg-type]`
+ if kind is None:
+ kind = SpanKind.INTERNAL
+
+ span = start_span(
+ name=name,
+ context=context,
+ kind=kind,
+ attributes=attributes,
+ links=links,
+ start_time=start_time,
+ record_exception=record_exception,
+ set_status_on_exception=set_status_on_exception,
+ tracer=tracer,
+ )
+
+ # Equivalent to `tracer.start_as_current_span`
+ return opentelemetry.trace.use_span(
+ span,
+ end_on_exit=end_on_exit,
+ record_exception=record_exception,
+ set_status_on_exception=set_status_on_exception,
+ )
+
+
+def start_active_span_from_edu(
+ operation_name: str,
+ *,
+ edu_content: Dict[str, Any],
+) -> ContextManager["opentelemetry.trace.Span"]:
+ """
+ Extracts a span context from an edu and uses it to start a new active span
+
+ Args:
+ operation_name: The label for the chunk of time used to process the given edu.
+ edu_content: an edu_content with a `context` field whose value is
+ canonical json for a dict which contains tracing information.
+ """
+ if opentelemetry is None:
+ return contextlib.nullcontext() # type: ignore[unreachable]
+
+ carrier = json_decoder.decode(
+ edu_content.get(EventContentFields.TRACING_CONTEXT, "{}")
+ )
+
+ context = extract_text_map(carrier)
+
+ return start_active_span(name=operation_name, context=context)
+
+
+# OpenTelemetry setters for attributes, logs, etc
+@only_if_tracing
+def get_active_span() -> Optional["opentelemetry.trace.Span"]:
+ """Get the currently active span, if any"""
+ return opentelemetry.trace.get_current_span()
+
+
+def get_span_context_from_context(
+ context: "opentelemetry.context.context.Context",
+) -> Optional["opentelemetry.trace.SpanContext"]:
+ """Utility function to convert a `Context` to a `SpanContext`
+
+ Based on https://github.com/open-telemetry/opentelemetry-python/blob/43288ca9a36144668797c11ca2654836ec8b5e99/opentelemetry-api/src/opentelemetry/trace/propagation/tracecontext.py#L99-L102
+ """
+ span = opentelemetry.trace.get_current_span(context=context)
+ span_context = span.get_span_context()
+ if span_context == opentelemetry.trace.INVALID_SPAN_CONTEXT:
+ return None
+ return span_context
+
+
+def get_context_from_span(
+ span: "opentelemetry.trace.Span",
+) -> "opentelemetry.context.context.Context":
+ # This doesn't affect the current context at all, it just converts a span
+ # into `Context` object basically (bad name).
+ ctx = opentelemetry.trace.propagation.set_span_in_context(span)
+ return ctx
+
+
+@ensure_active_span("set a tag")
+def set_attribute(key: str, value: Union[str, bool, int, float]) -> None:
+ """Sets a tag on the active span"""
+ active_span = get_active_span()
+ assert active_span is not None
+ active_span.set_attribute(key, value)
+
+
+@ensure_active_span("set the status")
+def set_status(
+ status_code: "opentelemetry.trace.status.StatusCode", exc: Optional[Exception]
+) -> None:
+ """Sets a tag on the active span"""
+ active_span = get_active_span()
+ assert active_span is not None
+ active_span.set_status(opentelemetry.trace.status.Status(status_code=status_code))
+ if exc:
+ active_span.record_exception(exc)
+
+
+DEFAULT_LOG_NAME = "log"
+
+
+@ensure_active_span("log")
+def log_kv(key_values: Dict[str, Any], timestamp: Optional[int] = None) -> None:
+ """Log to the active span"""
+ active_span = get_active_span()
+ assert active_span is not None
+ event_name = key_values.get("event", DEFAULT_LOG_NAME)
+ active_span.add_event(event_name, attributes=key_values, timestamp=timestamp)
+
+
+@only_if_tracing
+def force_tracing(span: Optional["opentelemetry.trace.Span"] = None) -> None:
+ """Force sampling for the active/given span and its children.
+
+ Args:
+ span: span to force tracing for. By default, the active span.
+ """
+ # TODO
+ pass
+
+
+def is_context_forced_tracing(
+ context: "opentelemetry.context.context.Context",
+) -> bool:
+ """Check if sampling has been force for the given span context."""
+ # TODO
+ return False
+
+
+# Injection and extraction
+
+
+@ensure_active_span("inject the active tracing context into a header dict")
+def inject_active_tracing_context_into_header_dict(
+ headers: Dict[bytes, List[bytes]],
+ destination: Optional[str] = None,
+ check_destination: bool = True,
+) -> None:
+ """
+ Injects the active tracing context into a dict of HTTP headers
+
+ Args:
+ headers: the dict to inject headers into
+ destination: address of entity receiving the span context. Must be given unless
+ `check_destination` is False.
+ check_destination (bool): If False, destination will be ignored and the context
+ will always be injected. If True, the context will only be injected if the
+ destination matches the tracing allowlist
+
+ Note:
+ The headers set by the tracer are custom to the tracer implementation which
+ should be unique enough that they don't interfere with any headers set by
+ synapse or twisted.
+ """
+ if check_destination:
+ if destination is None:
+ raise ValueError(
+ "destination must be given unless check_destination is False"
+ )
+ if not whitelisted_homeserver(destination):
+ return
+
+ active_span = get_active_span()
+ assert active_span is not None
+ ctx = get_context_from_span(active_span)
+
+ propagator = opentelemetry.propagate.get_global_textmap()
+ # Put all of SpanContext properties into the headers dict
+ propagator.inject(headers, context=ctx)
+
+
+def inject_trace_id_into_response_headers(response_headers: Headers) -> None:
+ """Inject the current trace id into the HTTP response headers"""
+ if not opentelemetry:
+ return
+ active_span = get_active_span()
+ if not active_span:
+ return
+
+ trace_id = active_span.get_span_context().trace_id
+
+ if trace_id is not None:
+ response_headers.addRawHeader("Synapse-Trace-Id", f"{trace_id:x}")
+
+
+@ensure_active_span(
+ "get the active span context as a dict", ret=cast(Dict[str, str], {})
+)
+def get_active_span_text_map(destination: Optional[str] = None) -> Dict[str, str]:
+ """
+ Gets the active tracing Context serialized as a dict. This can be used
+ instead of manually injecting a span into an empty carrier.
+
+ Args:
+ destination: the name of the remote server.
+
+ Returns:
+ dict: the serialized active span's context if opentelemetry is enabled, otherwise
+ empty.
+ """
+ if destination and not whitelisted_homeserver(destination):
+ return {}
+
+ active_span = get_active_span()
+ assert active_span is not None
+ ctx = get_context_from_span(active_span)
+
+ carrier_text_map: Dict[str, str] = {}
+ propagator = opentelemetry.propagate.get_global_textmap()
+ # Put all of Context properties onto the carrier text map that we can return
+ propagator.inject(carrier_text_map, context=ctx)
+
+ return carrier_text_map
+
+
+def context_from_request(
+ request: Request,
+) -> Optional["opentelemetry.context.context.Context"]:
+ """Extract an opentelemetry context from the headers on an HTTP request
+
+ This is useful when we have received an HTTP request from another part of our
+ system, and want to link our spans to those of the remote system.
+ """
+ if not opentelemetry:
+ return None
+ header_dict = {
+ k.decode(): v[0].decode() for k, v in request.requestHeaders.getAllRawHeaders()
+ }
+
+ # Extract all of the relevant values from the headers to construct a
+ # SpanContext to return.
+ return extract_text_map(header_dict)
+
+
+@only_if_tracing
+def extract_text_map(
+ carrier: Dict[str, str]
+) -> Optional["opentelemetry.context.context.Context"]:
+ """
+ Wrapper method for opentelemetry's propagator.extract for TEXT_MAP.
+ Args:
+ carrier: a dict possibly containing a context.
+
+ Returns:
+ The active context extracted from carrier.
+ """
+ propagator = opentelemetry.propagate.get_global_textmap()
+ # Extract all of the relevant values from the `carrier` to construct a
+ # Context to return.
+ return propagator.extract(carrier)
+
+
+# Tracing decorators
+
+
+def trace_with_opname(opname: str) -> Callable[[Callable[P, R]], Callable[P, R]]:
+ """
+ Decorator to trace a function with a custom opname.
+
+ See the module's doc string for usage examples.
+
+ """
+
+ def decorator(func: Callable[P, R]) -> Callable[P, R]:
+ if opentelemetry is None:
+ return func # type: ignore[unreachable]
+
+ if inspect.iscoroutinefunction(func):
+
+ @wraps(func)
+ async def _trace_inner(*args: P.args, **kwargs: P.kwargs) -> R:
+ with start_active_span(opname):
+ return await func(*args, **kwargs) # type: ignore[misc]
+
+ else:
+ # The other case here handles both sync functions and those
+ # decorated with inlineDeferred.
+ @wraps(func)
+ def _trace_inner(*args: P.args, **kwargs: P.kwargs) -> R:
+ scope = start_active_span(opname)
+ scope.__enter__()
+
+ try:
+ result = func(*args, **kwargs)
+ if isinstance(result, defer.Deferred):
+
+ def call_back(result: R) -> R:
+ scope.__exit__(None, None, None)
+ return result
+
+ def err_back(result: R) -> R:
+ scope.__exit__(None, None, None)
+ return result
+
+ result.addCallbacks(call_back, err_back)
+
+ else:
+ if inspect.isawaitable(result):
+ logger.error(
+ "@trace may not have wrapped %s correctly! "
+ "The function is not async but returned a %s.",
+ func.__qualname__,
+ type(result).__name__,
+ )
+
+ scope.__exit__(None, None, None)
+
+ return result
+
+ except Exception as e:
+ scope.__exit__(type(e), None, e.__traceback__)
+ raise
+
+ return _trace_inner # type: ignore[return-value]
+
+ return decorator
+
+
+def trace(func: Callable[P, R]) -> Callable[P, R]:
+ """
+ Decorator to trace a function.
+
+ Sets the operation name to that of the function's name.
+
+ See the module's doc string for usage examples.
+ """
+
+ return trace_with_opname(func.__name__)(func)
+
+
+def tag_args(func: Callable[P, R]) -> Callable[P, R]:
+ """
+ Tags all of the args to the active span.
+ """
+
+ if not opentelemetry:
+ return func
+
+ @wraps(func)
+ def _tag_args_inner(*args: P.args, **kwargs: P.kwargs) -> R:
+ argspec = inspect.getfullargspec(func)
+ for i, arg in enumerate(argspec.args[1:]):
+ set_attribute("ARG_" + arg, str(args[i])) # type: ignore[index]
+ set_attribute("args", str(args[len(argspec.args) :])) # type: ignore[index]
+ set_attribute("kwargs", str(kwargs))
+ return func(*args, **kwargs)
+
+ return _tag_args_inner
+
+
+@contextlib.contextmanager
+def trace_servlet(
+ request: "SynapseRequest", extract_context: bool = False
+) -> Generator[None, None, None]:
+ """Returns a context manager which traces a request. It starts a span
+ with some servlet specific tags such as the request metrics name and
+ request information.
+
+ Args:
+ request
+ extract_context: Whether to attempt to extract the tracing
+ context from the request the servlet is handling.
+ """
+
+ if opentelemetry is None:
+ yield # type: ignore[unreachable]
+ return
+
+ attrs = {
+ SynapseTags.REQUEST_ID: request.get_request_id(),
+ SpanAttributes.HTTP_METHOD: request.get_method(),
+ SpanAttributes.HTTP_URL: request.get_redacted_uri(),
+ SpanAttributes.HTTP_HOST: request.getClientAddress().host,
+ }
+
+ request_name = request.request_metrics.name
+ tracing_context = context_from_request(request) if extract_context else None
+
+ # This is will end up being the root span for all of servlet traces and we
+ # aren't able to determine whether to force tracing yet. We can determine
+ # whether to force trace later in `synapse/api/auth.py`.
+ with start_active_span(
+ request_name,
+ kind=SpanKind.SERVER,
+ context=tracing_context,
+ attributes=attrs,
+ # we configure the span not to finish immediately on exiting the scope,
+ # and instead pass the span into the SynapseRequest (via
+ # `request.set_tracing_span(span)`), which will finish it once we've
+ # finished sending the response to the client.
+ end_on_exit=False,
+ ) as span:
+ request.set_tracing_span(span)
+
+ inject_trace_id_into_response_headers(request.responseHeaders)
+ try:
+ yield
+ finally:
+ # We set the operation name again in case its changed (which happens
+ # with JsonResource).
+ span.update_name(request.request_metrics.name)
+
+ if request.request_metrics.start_context.tag is not None:
+ span.set_attribute(
+ SynapseTags.REQUEST_TAG, request.request_metrics.start_context.tag
+ )
diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py
index 7a1516d3a8..59d956dd9d 100644
--- a/synapse/metrics/background_process_metrics.py
+++ b/synapse/metrics/background_process_metrics.py
@@ -42,7 +42,7 @@ from synapse.logging.context import (
LoggingContext,
PreserveLoggingContext,
)
-from synapse.logging.opentracing import SynapseTags, start_active_span
+from synapse.logging.tracing import SynapseTags, start_active_span
from synapse.metrics._types import Collector
if TYPE_CHECKING:
@@ -208,7 +208,7 @@ def run_as_background_process(
Args:
desc: a description for this background process type
func: a function, which may return a Deferred or a coroutine
- bg_start_span: Whether to start an opentracing span. Defaults to True.
+ bg_start_span: Whether to start an tracing span. Defaults to True.
Should only be disabled for processes that will not log to or tag
a span.
args: positional args for func
@@ -232,7 +232,8 @@ def run_as_background_process(
try:
if bg_start_span:
ctx = start_active_span(
- f"bgproc.{desc}", tags={SynapseTags.REQUEST_ID: str(context)}
+ f"bgproc.{desc}",
+ attributes={SynapseTags.REQUEST_ID: str(context)},
)
else:
ctx = nullcontext() # type: ignore[assignment]
diff --git a/synapse/notifier.py b/synapse/notifier.py
index c42bb8266a..8fd8cb8100 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -39,7 +39,7 @@ from synapse.events import EventBase
from synapse.handlers.presence import format_user_presence_state
from synapse.logging import issue9533_logger
from synapse.logging.context import PreserveLoggingContext
-from synapse.logging.opentracing import log_kv, start_active_span
+from synapse.logging.tracing import log_kv, start_active_span
from synapse.metrics import LaterGauge
from synapse.streams.config import PaginationConfig
from synapse.types import (
@@ -536,7 +536,7 @@ class Notifier:
log_kv(
{
"wait_for_events": "sleep",
- "token": prev_token,
+ "token": str(prev_token),
}
)
@@ -546,7 +546,7 @@ class Notifier:
log_kv(
{
"wait_for_events": "woken",
- "token": user_stream.current_token,
+ "token": str(user_stream.current_token),
}
)
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index e96fb45e9f..11299367d2 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -23,7 +23,7 @@ from twisted.internet.interfaces import IDelayedCall
from synapse.api.constants import EventTypes
from synapse.events import EventBase
-from synapse.logging import opentracing
+from synapse.logging import tracing
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.push import Pusher, PusherConfig, PusherConfigException
from synapse.storage.databases.main.event_push_actions import HttpPushAction
@@ -198,9 +198,9 @@ class HttpPusher(Pusher):
)
for push_action in unprocessed:
- with opentracing.start_active_span(
+ with tracing.start_active_span(
"http-push",
- tags={
+ attributes={
"authenticated_entity": self.user_id,
"event_id": push_action.event_id,
"app_id": self.app_id,
diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py
index 561ad5bf04..af160e31aa 100644
--- a/synapse/replication/http/_base.py
+++ b/synapse/replication/http/_base.py
@@ -28,8 +28,8 @@ from synapse.api.errors import HttpResponseException, SynapseError
from synapse.http import RequestTimedOutError
from synapse.http.server import HttpServer, is_method_cancellable
from synapse.http.site import SynapseRequest
-from synapse.logging import opentracing
-from synapse.logging.opentracing import trace_with_opname
+from synapse.logging import tracing
+from synapse.logging.tracing import trace_with_opname
from synapse.types import JsonDict
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.stringutils import random_string
@@ -248,7 +248,9 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
# Add an authorization header, if configured.
if replication_secret:
headers[b"Authorization"] = [b"Bearer " + replication_secret]
- opentracing.inject_header_dict(headers, check_destination=False)
+ tracing.inject_active_tracing_context_into_header_dict(
+ headers, check_destination=False
+ )
try:
# Keep track of attempts made so we can bail if we don't manage to
diff --git a/synapse/replication/tcp/external_cache.py b/synapse/replication/tcp/external_cache.py
index a448dd7eb1..e928fded36 100644
--- a/synapse/replication/tcp/external_cache.py
+++ b/synapse/replication/tcp/external_cache.py
@@ -17,7 +17,7 @@ from typing import TYPE_CHECKING, Any, Optional
from prometheus_client import Counter, Histogram
-from synapse.logging import opentracing
+from synapse.logging import tracing
from synapse.logging.context import make_deferred_yieldable
from synapse.util import json_decoder, json_encoder
@@ -94,9 +94,9 @@ class ExternalCache:
logger.debug("Caching %s %s: %r", cache_name, key, encoded_value)
- with opentracing.start_active_span(
+ with tracing.start_active_span(
"ExternalCache.set",
- tags={opentracing.SynapseTags.CACHE_NAME: cache_name},
+ attributes={tracing.SynapseTags.CACHE_NAME: cache_name},
):
with response_timer.labels("set").time():
return await make_deferred_yieldable(
@@ -113,9 +113,9 @@ class ExternalCache:
if self._redis_connection is None:
return None
- with opentracing.start_active_span(
+ with tracing.start_active_span(
"ExternalCache.get",
- tags={opentracing.SynapseTags.CACHE_NAME: cache_name},
+ attributes={tracing.SynapseTags.CACHE_NAME: cache_name},
):
with response_timer.labels("get").time():
result = await make_deferred_yieldable(
diff --git a/synapse/rest/client/keys.py b/synapse/rest/client/keys.py
index e3f454896a..a592fd2cfb 100644
--- a/synapse/rest/client/keys.py
+++ b/synapse/rest/client/keys.py
@@ -26,7 +26,7 @@ from synapse.http.servlet import (
parse_string,
)
from synapse.http.site import SynapseRequest
-from synapse.logging.opentracing import log_kv, set_tag, trace_with_opname
+from synapse.logging.tracing import log_kv, set_attribute, trace_with_opname
from synapse.types import JsonDict, StreamToken
from ._base import client_patterns, interactive_auth_handler
@@ -88,7 +88,7 @@ class KeyUploadServlet(RestServlet):
user_id
)
if dehydrated_device is not None and device_id != dehydrated_device[0]:
- set_tag("error", True)
+ set_attribute("error", True)
log_kv(
{
"message": "Client uploading keys for a different device",
@@ -204,13 +204,13 @@ class KeyChangesServlet(RestServlet):
requester = await self.auth.get_user_by_req(request, allow_guest=True)
from_token_string = parse_string(request, "from", required=True)
- set_tag("from", from_token_string)
+ set_attribute("from", from_token_string)
# We want to enforce they do pass us one, but we ignore it and return
# changes after the "to" as well as before.
#
# XXX This does not enforce that "to" is passed.
- set_tag("to", str(parse_string(request, "to")))
+ set_attribute("to", str(parse_string(request, "to")))
from_token = await StreamToken.from_string(self.store, from_token_string)
diff --git a/synapse/rest/client/knock.py b/synapse/rest/client/knock.py
index ad025c8a45..8201f2bd86 100644
--- a/synapse/rest/client/knock.py
+++ b/synapse/rest/client/knock.py
@@ -24,7 +24,7 @@ from synapse.http.servlet import (
parse_strings_from_args,
)
from synapse.http.site import SynapseRequest
-from synapse.logging.opentracing import set_tag
+from synapse.logging.tracing import set_attribute
from synapse.rest.client.transactions import HttpTransactionCache
from synapse.types import JsonDict, RoomAlias, RoomID
@@ -97,7 +97,7 @@ class KnockRoomAliasServlet(RestServlet):
def on_PUT(
self, request: SynapseRequest, room_identifier: str, txn_id: str
) -> Awaitable[Tuple[int, JsonDict]]:
- set_tag("txn_id", txn_id)
+ set_attribute("txn_id", txn_id)
return self.txns.fetch_or_execute_request(
request, self.on_POST, request, room_identifier, txn_id
diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py
index 2f513164cb..3880846e9a 100644
--- a/synapse/rest/client/room.py
+++ b/synapse/rest/client/room.py
@@ -46,7 +46,7 @@ from synapse.http.servlet import (
parse_strings_from_args,
)
from synapse.http.site import SynapseRequest
-from synapse.logging.opentracing import set_tag
+from synapse.logging.tracing import set_attribute
from synapse.rest.client._base import client_patterns
from synapse.rest.client.transactions import HttpTransactionCache
from synapse.storage.state import StateFilter
@@ -82,7 +82,7 @@ class RoomCreateRestServlet(TransactionRestServlet):
def on_PUT(
self, request: SynapseRequest, txn_id: str
) -> Awaitable[Tuple[int, JsonDict]]:
- set_tag("txn_id", txn_id)
+ set_attribute("txn_id", txn_id)
return self.txns.fetch_or_execute_request(request, self.on_POST, request)
async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
@@ -194,7 +194,7 @@ class RoomStateEventRestServlet(TransactionRestServlet):
requester = await self.auth.get_user_by_req(request, allow_guest=True)
if txn_id:
- set_tag("txn_id", txn_id)
+ set_attribute("txn_id", txn_id)
content = parse_json_object_from_request(request)
@@ -229,7 +229,7 @@ class RoomStateEventRestServlet(TransactionRestServlet):
except ShadowBanError:
event_id = "$" + random_string(43)
- set_tag("event_id", event_id)
+ set_attribute("event_id", event_id)
ret = {"event_id": event_id}
return 200, ret
@@ -279,7 +279,7 @@ class RoomSendEventRestServlet(TransactionRestServlet):
except ShadowBanError:
event_id = "$" + random_string(43)
- set_tag("event_id", event_id)
+ set_attribute("event_id", event_id)
return 200, {"event_id": event_id}
def on_GET(
@@ -290,7 +290,7 @@ class RoomSendEventRestServlet(TransactionRestServlet):
def on_PUT(
self, request: SynapseRequest, room_id: str, event_type: str, txn_id: str
) -> Awaitable[Tuple[int, JsonDict]]:
- set_tag("txn_id", txn_id)
+ set_attribute("txn_id", txn_id)
return self.txns.fetch_or_execute_request(
request, self.on_POST, request, room_id, event_type, txn_id
@@ -348,7 +348,7 @@ class JoinRoomAliasServlet(ResolveRoomIdMixin, TransactionRestServlet):
def on_PUT(
self, request: SynapseRequest, room_identifier: str, txn_id: str
) -> Awaitable[Tuple[int, JsonDict]]:
- set_tag("txn_id", txn_id)
+ set_attribute("txn_id", txn_id)
return self.txns.fetch_or_execute_request(
request, self.on_POST, request, room_identifier, txn_id
@@ -816,7 +816,7 @@ class RoomForgetRestServlet(TransactionRestServlet):
def on_PUT(
self, request: SynapseRequest, room_id: str, txn_id: str
) -> Awaitable[Tuple[int, JsonDict]]:
- set_tag("txn_id", txn_id)
+ set_attribute("txn_id", txn_id)
return self.txns.fetch_or_execute_request(
request, self.on_POST, request, room_id, txn_id
@@ -916,7 +916,7 @@ class RoomMembershipRestServlet(TransactionRestServlet):
def on_PUT(
self, request: SynapseRequest, room_id: str, membership_action: str, txn_id: str
) -> Awaitable[Tuple[int, JsonDict]]:
- set_tag("txn_id", txn_id)
+ set_attribute("txn_id", txn_id)
return self.txns.fetch_or_execute_request(
request, self.on_POST, request, room_id, membership_action, txn_id
@@ -962,13 +962,13 @@ class RoomRedactEventRestServlet(TransactionRestServlet):
except ShadowBanError:
event_id = "$" + random_string(43)
- set_tag("event_id", event_id)
+ set_attribute("event_id", event_id)
return 200, {"event_id": event_id}
def on_PUT(
self, request: SynapseRequest, room_id: str, event_id: str, txn_id: str
) -> Awaitable[Tuple[int, JsonDict]]:
- set_tag("txn_id", txn_id)
+ set_attribute("txn_id", txn_id)
return self.txns.fetch_or_execute_request(
request, self.on_POST, request, room_id, event_id, txn_id
diff --git a/synapse/rest/client/sendtodevice.py b/synapse/rest/client/sendtodevice.py
index 1a8e9a96d4..a3f8fdb317 100644
--- a/synapse/rest/client/sendtodevice.py
+++ b/synapse/rest/client/sendtodevice.py
@@ -19,7 +19,7 @@ from synapse.http import servlet
from synapse.http.server import HttpServer
from synapse.http.servlet import assert_params_in_dict, parse_json_object_from_request
from synapse.http.site import SynapseRequest
-from synapse.logging.opentracing import set_tag, trace_with_opname
+from synapse.logging.tracing import set_attribute, trace_with_opname
from synapse.rest.client.transactions import HttpTransactionCache
from synapse.types import JsonDict
@@ -47,8 +47,8 @@ class SendToDeviceRestServlet(servlet.RestServlet):
def on_PUT(
self, request: SynapseRequest, message_type: str, txn_id: str
) -> Awaitable[Tuple[int, JsonDict]]:
- set_tag("message_type", message_type)
- set_tag("txn_id", txn_id)
+ set_attribute("message_type", message_type)
+ set_attribute("txn_id", txn_id)
return self.txns.fetch_or_execute_request(
request, self._put, request, message_type, txn_id
)
diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py
index c2989765ce..5ddb08eb2f 100644
--- a/synapse/rest/client/sync.py
+++ b/synapse/rest/client/sync.py
@@ -37,7 +37,7 @@ from synapse.handlers.sync import (
from synapse.http.server import HttpServer
from synapse.http.servlet import RestServlet, parse_boolean, parse_integer, parse_string
from synapse.http.site import SynapseRequest
-from synapse.logging.opentracing import trace_with_opname
+from synapse.logging.tracing import trace_with_opname
from synapse.types import JsonDict, StreamToken
from synapse.util import json_decoder
diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py
index cf98b0ab48..f34c067515 100644
--- a/synapse/storage/controllers/persist_events.py
+++ b/synapse/storage/controllers/persist_events.py
@@ -45,8 +45,8 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
-from synapse.logging import opentracing
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
+from synapse.logging.tracing import Link, get_active_span, start_active_span, trace
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.controllers.state import StateStorageController
from synapse.storage.databases import Databases
@@ -118,7 +118,7 @@ times_pruned_extremities = Counter(
class _PersistEventsTask:
"""A batch of events to persist."""
- name: ClassVar[str] = "persist_event_batch" # used for opentracing
+ name: ClassVar[str] = "persist_event_batch" # used for tracing
events_and_contexts: List[Tuple[EventBase, EventContext]]
backfilled: bool
@@ -139,7 +139,7 @@ class _PersistEventsTask:
class _UpdateCurrentStateTask:
"""A room whose current state needs recalculating."""
- name: ClassVar[str] = "update_current_state" # used for opentracing
+ name: ClassVar[str] = "update_current_state" # used for tracing
def try_merge(self, task: "_EventPersistQueueTask") -> bool:
"""Deduplicates consecutive recalculations of current state."""
@@ -154,11 +154,11 @@ class _EventPersistQueueItem:
task: _EventPersistQueueTask
deferred: ObservableDeferred
- parent_opentracing_span_contexts: List = attr.ib(factory=list)
- """A list of opentracing spans waiting for this batch"""
+ parent_tracing_span_contexts: List = attr.ib(factory=list)
+ """A list of tracing spans waiting for this batch"""
- opentracing_span_context: Any = None
- """The opentracing span under which the persistence actually happened"""
+ tracing_span_context: Any = None
+ """The tracing span under which the persistence actually happened"""
_PersistResult = TypeVar("_PersistResult")
@@ -222,10 +222,10 @@ class _EventPeristenceQueue(Generic[_PersistResult]):
)
queue.append(end_item)
- # also add our active opentracing span to the item so that we get a link back
- span = opentracing.active_span()
+ # also add our active tracing span to the item so that we get a link back
+ span = get_active_span()
if span:
- end_item.parent_opentracing_span_contexts.append(span.context)
+ end_item.parent_tracing_span_contexts.append(span.get_span_context())
# start a processor for the queue, if there isn't one already
self._handle_queue(room_id)
@@ -233,9 +233,10 @@ class _EventPeristenceQueue(Generic[_PersistResult]):
# wait for the queue item to complete
res = await make_deferred_yieldable(end_item.deferred.observe())
- # add another opentracing span which links to the persist trace.
- with opentracing.start_active_span_follows_from(
- f"{task.name}_complete", (end_item.opentracing_span_context,)
+ # add another tracing span which links to the persist trace.
+ with start_active_span(
+ f"{task.name}_complete",
+ links=[Link(end_item.tracing_span_context)],
):
pass
@@ -266,13 +267,15 @@ class _EventPeristenceQueue(Generic[_PersistResult]):
queue = self._get_drainining_queue(room_id)
for item in queue:
try:
- with opentracing.start_active_span_follows_from(
+ with start_active_span(
item.task.name,
- item.parent_opentracing_span_contexts,
- inherit_force_tracing=True,
- ) as scope:
- if scope:
- item.opentracing_span_context = scope.span.context
+ links=[
+ Link(span_context)
+ for span_context in item.parent_tracing_span_contexts
+ ],
+ ) as span:
+ if span:
+ item.tracing_span_context = span.get_span_context()
ret = await self._per_item_callback(room_id, item.task)
except Exception:
@@ -355,7 +358,7 @@ class EventsPersistenceStorageController:
f"Found an unexpected task type in event persistence queue: {task}"
)
- @opentracing.trace
+ @trace
async def persist_events(
self,
events_and_contexts: Iterable[Tuple[EventBase, EventContext]],
@@ -418,7 +421,7 @@ class EventsPersistenceStorageController:
self.main_store.get_room_max_token(),
)
- @opentracing.trace
+ @trace
async def persist_event(
self, event: EventBase, context: EventContext, backfilled: bool = False
) -> Tuple[EventBase, PersistedEventPosition, RoomStreamToken]:
diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index b394a6658b..ca0f606797 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -47,7 +47,7 @@ from twisted.internet.interfaces import IReactorCore
from synapse.api.errors import StoreError
from synapse.config.database import DatabaseConnectionConfig
-from synapse.logging import opentracing
+from synapse.logging import tracing
from synapse.logging.context import (
LoggingContext,
current_context,
@@ -422,11 +422,11 @@ class LoggingTransaction:
start = time.time()
try:
- with opentracing.start_active_span(
+ with tracing.start_active_span(
"db.query",
- tags={
- opentracing.tags.DATABASE_TYPE: "sql",
- opentracing.tags.DATABASE_STATEMENT: one_line_sql,
+ attributes={
+ tracing.SpanAttributes.DB_SYSTEM: "sql",
+ tracing.SpanAttributes.DB_STATEMENT: one_line_sql,
},
):
return func(sql, *args, **kwargs)
@@ -701,15 +701,15 @@ class DatabasePool:
exception_callbacks=exception_callbacks,
)
try:
- with opentracing.start_active_span(
+ with tracing.start_active_span(
"db.txn",
- tags={
- opentracing.SynapseTags.DB_TXN_DESC: desc,
- opentracing.SynapseTags.DB_TXN_ID: name,
+ attributes={
+ tracing.SynapseTags.DB_TXN_DESC: desc,
+ tracing.SynapseTags.DB_TXN_ID: name,
},
):
r = func(cursor, *args, **kwargs)
- opentracing.log_kv({"message": "commit"})
+ tracing.log_kv({"message": "commit"})
conn.commit()
return r
except self.engine.module.OperationalError as e:
@@ -725,7 +725,7 @@ class DatabasePool:
if i < N:
i += 1
try:
- with opentracing.start_active_span("db.rollback"):
+ with tracing.start_active_span("db.rollback"):
conn.rollback()
except self.engine.module.Error as e1:
transaction_logger.warning("[TXN EROLL] {%s} %s", name, e1)
@@ -739,7 +739,7 @@ class DatabasePool:
if i < N:
i += 1
try:
- with opentracing.start_active_span("db.rollback"):
+ with tracing.start_active_span("db.rollback"):
conn.rollback()
except self.engine.module.Error as e1:
transaction_logger.warning(
@@ -845,7 +845,7 @@ class DatabasePool:
logger.warning("Starting db txn '%s' from sentinel context", desc)
try:
- with opentracing.start_active_span(f"db.{desc}"):
+ with tracing.start_active_span(f"db.{desc}"):
result = await self.runWithConnection(
self.new_transaction,
desc,
@@ -928,9 +928,7 @@ class DatabasePool:
with LoggingContext(
str(curr_context), parent_context=parent_context
) as context:
- with opentracing.start_active_span(
- operation_name="db.connection",
- ):
+ with tracing.start_active_span("db.connection"):
sched_duration_sec = monotonic_time() - start_time
sql_scheduling_timer.observe(sched_duration_sec)
context.add_database_scheduled(sched_duration_sec)
@@ -944,15 +942,13 @@ class DatabasePool:
"Reconnecting database connection over transaction limit"
)
conn.reconnect()
- opentracing.log_kv(
- {"message": "reconnected due to txn limit"}
- )
+ tracing.log_kv({"message": "reconnected due to txn limit"})
self._txn_counters[tid] = 1
if self.engine.is_connection_closed(conn):
logger.debug("Reconnecting closed database connection")
conn.reconnect()
- opentracing.log_kv({"message": "reconnected"})
+ tracing.log_kv({"message": "reconnected"})
if self._txn_limit > 0:
self._txn_counters[tid] = 1
diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py
index 73c95ffb6f..1503d74b1f 100644
--- a/synapse/storage/databases/main/deviceinbox.py
+++ b/synapse/storage/databases/main/deviceinbox.py
@@ -27,7 +27,7 @@ from typing import (
)
from synapse.logging import issue9533_logger
-from synapse.logging.opentracing import log_kv, set_tag, trace
+from synapse.logging.tracing import log_kv, set_attribute, trace
from synapse.replication.tcp.streams import ToDeviceStream
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import (
@@ -436,7 +436,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
(user_id, device_id), None
)
- set_tag("last_deleted_stream_id", str(last_deleted_stream_id))
+ set_attribute("last_deleted_stream_id", str(last_deleted_stream_id))
if last_deleted_stream_id:
has_changed = self._device_inbox_stream_cache.has_entity_changed(
@@ -485,10 +485,10 @@ class DeviceInboxWorkerStore(SQLBaseStore):
A list of messages for the device and where in the stream the messages got to.
"""
- set_tag("destination", destination)
- set_tag("last_stream_id", last_stream_id)
- set_tag("current_stream_id", current_stream_id)
- set_tag("limit", limit)
+ set_attribute("destination", destination)
+ set_attribute("last_stream_id", last_stream_id)
+ set_attribute("current_stream_id", current_stream_id)
+ set_attribute("limit", limit)
has_changed = self._device_federation_outbox_stream_cache.has_entity_changed(
destination, last_stream_id
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index ca0fe8c4be..7ceb7a202b 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -30,11 +30,11 @@ from typing import (
from typing_extensions import Literal
-from synapse.api.constants import EduTypes
+from synapse.api.constants import EduTypes, EventContentFields
from synapse.api.errors import Codes, StoreError
-from synapse.logging.opentracing import (
+from synapse.logging.tracing import (
get_active_span_text_map,
- set_tag,
+ set_attribute,
trace,
whitelisted_homeserver,
)
@@ -333,12 +333,12 @@ class DeviceWorkerStore(EndToEndKeyWorkerStore):
# (user_id, device_id) entries into a map, with the value being
# the max stream_id across each set of duplicate entries
#
- # maps (user_id, device_id) -> (stream_id, opentracing_context)
+ # maps (user_id, device_id) -> (stream_id,tracing_context)
#
- # opentracing_context contains the opentracing metadata for the request
+ # tracing_context contains the opentelemetry metadata for the request
# that created the poke
#
- # The most recent request's opentracing_context is used as the
+ # The most recent request's tracing_context is used as the
# context which created the Edu.
# This is the stream ID that we will return for the consumer to resume
@@ -401,8 +401,8 @@ class DeviceWorkerStore(EndToEndKeyWorkerStore):
if update_stream_id > previous_update_stream_id:
# FIXME If this overwrites an older update, this discards the
- # previous OpenTracing context.
- # It might make it harder to track down issues using OpenTracing.
+ # previous tracing context.
+ # It might make it harder to track down issues using tracing.
# If there's a good reason why it doesn't matter, a comment here
# about that would not hurt.
query_map[key] = (update_stream_id, update_context)
@@ -468,11 +468,11 @@ class DeviceWorkerStore(EndToEndKeyWorkerStore):
- user_id
- device_id
- stream_id
- - opentracing_context
+ - tracing_context
"""
# get the list of device updates that need to be sent
sql = """
- SELECT user_id, device_id, stream_id, opentracing_context FROM device_lists_outbound_pokes
+ SELECT user_id, device_id, stream_id, tracing_context FROM device_lists_outbound_pokes
WHERE destination = ? AND ? < stream_id AND stream_id <= ?
ORDER BY stream_id
LIMIT ?
@@ -493,7 +493,7 @@ class DeviceWorkerStore(EndToEndKeyWorkerStore):
destination: The host the device updates are intended for
from_stream_id: The minimum stream_id to filter updates by, exclusive
query_map: Dictionary mapping (user_id, device_id) to
- (update stream_id, the relevant json-encoded opentracing context)
+ (update stream_id, the relevant json-encoded tracing context)
Returns:
List of objects representing a device update EDU.
@@ -531,13 +531,13 @@ class DeviceWorkerStore(EndToEndKeyWorkerStore):
for device_id in device_ids:
device = user_devices[device_id]
- stream_id, opentracing_context = query_map[(user_id, device_id)]
+ stream_id, tracing_context = query_map[(user_id, device_id)]
result = {
"user_id": user_id,
"device_id": device_id,
"prev_id": [prev_id] if prev_id else [],
"stream_id": stream_id,
- "org.matrix.opentracing_context": opentracing_context,
+ EventContentFields.TRACING_CONTEXT: tracing_context,
}
prev_id = stream_id
@@ -706,8 +706,8 @@ class DeviceWorkerStore(EndToEndKeyWorkerStore):
else:
results[user_id] = await self.get_cached_devices_for_user(user_id)
- set_tag("in_cache", str(results))
- set_tag("not_in_cache", str(user_ids_not_in_cache))
+ set_attribute("in_cache", str(results))
+ set_attribute("not_in_cache", str(user_ids_not_in_cache))
return user_ids_not_in_cache, results
@@ -1801,7 +1801,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
"device_id",
"sent",
"ts",
- "opentracing_context",
+ "tracing_context",
),
values=[
(
@@ -1846,7 +1846,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
"room_id",
"stream_id",
"converted_to_destinations",
- "opentracing_context",
+ "tracing_context",
),
values=[
(
@@ -1870,11 +1870,11 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
written to `device_lists_outbound_pokes`.
Returns:
- A list of user ID, device ID, room ID, stream ID and optional opentracing context.
+ A list of user ID, device ID, room ID, stream ID and optional opentelemetry context.
"""
sql = """
- SELECT user_id, device_id, room_id, stream_id, opentracing_context
+ SELECT user_id, device_id, room_id, stream_id, tracing_context
FROM device_lists_changes_in_room
WHERE NOT converted_to_destinations
ORDER BY stream_id
@@ -1892,9 +1892,9 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
device_id,
room_id,
stream_id,
- db_to_json(opentracing_context),
+ db_to_json(tracing_context),
)
- for user_id, device_id, room_id, stream_id, opentracing_context in txn
+ for user_id, device_id, room_id, stream_id, tracing_context in txn
]
return await self.db_pool.runInteraction(
diff --git a/synapse/storage/databases/main/e2e_room_keys.py b/synapse/storage/databases/main/e2e_room_keys.py
index af59be6b48..6d565102ac 100644
--- a/synapse/storage/databases/main/e2e_room_keys.py
+++ b/synapse/storage/databases/main/e2e_room_keys.py
@@ -18,7 +18,7 @@ from typing import Dict, Iterable, Mapping, Optional, Tuple, cast
from typing_extensions import Literal, TypedDict
from synapse.api.errors import StoreError
-from synapse.logging.opentracing import log_kv, trace
+from synapse.logging.tracing import log_kv, trace
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import LoggingTransaction
from synapse.types import JsonDict, JsonSerializable, StreamKeyType
diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py
index 46c0d06157..2df8101390 100644
--- a/synapse/storage/databases/main/end_to_end_keys.py
+++ b/synapse/storage/databases/main/end_to_end_keys.py
@@ -36,7 +36,7 @@ from synapse.appservice import (
TransactionOneTimeKeyCounts,
TransactionUnusedFallbackKeys,
)
-from synapse.logging.opentracing import log_kv, set_tag, trace
+from synapse.logging.tracing import log_kv, set_attribute, trace
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import (
DatabasePool,
@@ -146,7 +146,7 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
key data. The key data will be a dict in the same format as the
DeviceKeys type returned by POST /_matrix/client/r0/keys/query.
"""
- set_tag("query_list", str(query_list))
+ set_attribute("query_list", str(query_list))
if not query_list:
return {}
@@ -228,8 +228,8 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
Dict mapping from user-id to dict mapping from device_id to
key data.
"""
- set_tag("include_all_devices", include_all_devices)
- set_tag("include_deleted_devices", include_deleted_devices)
+ set_attribute("include_all_devices", include_all_devices)
+ set_attribute("include_deleted_devices", include_deleted_devices)
result = await self.db_pool.runInteraction(
"get_e2e_device_keys",
@@ -416,9 +416,9 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
"""
def _add_e2e_one_time_keys(txn: LoggingTransaction) -> None:
- set_tag("user_id", user_id)
- set_tag("device_id", device_id)
- set_tag("new_keys", str(new_keys))
+ set_attribute("user_id", user_id)
+ set_attribute("device_id", device_id)
+ set_attribute("new_keys", str(new_keys))
# We are protected from race between lookup and insertion due to
# a unique constraint. If there is a race of two calls to
# `add_e2e_one_time_keys` then they'll conflict and we will only
@@ -1158,10 +1158,10 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
"""
def _set_e2e_device_keys_txn(txn: LoggingTransaction) -> bool:
- set_tag("user_id", user_id)
- set_tag("device_id", device_id)
- set_tag("time_now", time_now)
- set_tag("device_keys", str(device_keys))
+ set_attribute("user_id", user_id)
+ set_attribute("device_id", device_id)
+ set_attribute("time_now", time_now)
+ set_attribute("device_keys", str(device_keys))
old_key_json = self.db_pool.simple_select_one_onecol_txn(
txn,
diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py
index a9a88c8bfd..dd187f7422 100644
--- a/synapse/storage/schema/__init__.py
+++ b/synapse/storage/schema/__init__.py
@@ -75,6 +75,8 @@ Changes in SCHEMA_VERSION = 71:
Changes in SCHEMA_VERSION = 72:
- event_edges.(room_id, is_state) are no longer written to.
- Tables related to groups are dropped.
+ - Rename column in `device_lists_outbound_pokes` and `device_lists_changes_in_room`
+ from `opentracing_context` to generalized `tracing_context`.
"""
diff --git a/synapse/storage/schema/main/delta/72/04rename_opentelemtetry_tracing_context.sql b/synapse/storage/schema/main/delta/72/04rename_opentelemtetry_tracing_context.sql
new file mode 100644
index 0000000000..ae904863f8
--- /dev/null
+++ b/synapse/storage/schema/main/delta/72/04rename_opentelemtetry_tracing_context.sql
@@ -0,0 +1,18 @@
+/* Copyright 2021 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Rename to generalized `tracing_context` since we're moving from opentracing to opentelemetry
+ALTER TABLE device_lists_outbound_pokes RENAME COLUMN opentracing_context TO tracing_context;
+ALTER TABLE device_lists_changes_in_room RENAME COLUMN opentracing_context TO tracing_context;
diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py
index a3eb5f741b..1dd2d3e62e 100644
--- a/synapse/util/caches/response_cache.py
+++ b/synapse/util/caches/response_cache.py
@@ -29,11 +29,7 @@ import attr
from twisted.internet import defer
from synapse.logging.context import make_deferred_yieldable, run_in_background
-from synapse.logging.opentracing import (
- active_span,
- start_active_span,
- start_active_span_follows_from,
-)
+from synapse.logging.tracing import Link, get_active_span, start_active_span
from synapse.util import Clock
from synapse.util.async_helpers import AbstractObservableDeferred, ObservableDeferred
from synapse.util.caches import register_cache
@@ -41,7 +37,7 @@ from synapse.util.caches import register_cache
logger = logging.getLogger(__name__)
if TYPE_CHECKING:
- import opentracing
+ import opentelemetry
# the type of the key in the cache
KV = TypeVar("KV")
@@ -82,8 +78,8 @@ class ResponseCacheEntry:
easier to cache Failure results.
"""
- opentracing_span_context: "Optional[opentracing.SpanContext]"
- """The opentracing span which generated/is generating the result"""
+ tracing_span_context: Optional["opentelemetry.trace.SpanContext"]
+ """The tracing span which generated/is generating the result"""
class ResponseCache(Generic[KV]):
@@ -141,7 +137,7 @@ class ResponseCache(Generic[KV]):
self,
context: ResponseCacheContext[KV],
deferred: "defer.Deferred[RV]",
- opentracing_span_context: "Optional[opentracing.SpanContext]",
+ tracing_span_context: Optional["opentelemetry.trace.SpanContext"],
) -> ResponseCacheEntry:
"""Set the entry for the given key to the given deferred.
@@ -152,14 +148,14 @@ class ResponseCache(Generic[KV]):
Args:
context: Information about the cache miss
deferred: The deferred which resolves to the result.
- opentracing_span_context: An opentracing span wrapping the calculation
+ tracing_span_context: An tracing span wrapping the calculation
Returns:
The cache entry object.
"""
result = ObservableDeferred(deferred, consumeErrors=True)
key = context.cache_key
- entry = ResponseCacheEntry(result, opentracing_span_context)
+ entry = ResponseCacheEntry(result, tracing_span_context)
self._result_cache[key] = entry
def on_complete(r: RV) -> RV:
@@ -234,15 +230,15 @@ class ResponseCache(Generic[KV]):
if cache_context:
kwargs["cache_context"] = context
- span_context: Optional[opentracing.SpanContext] = None
+ span_context: Optional["opentelemetry.trace.SpanContext"] = None
async def cb() -> RV:
# NB it is important that we do not `await` before setting span_context!
nonlocal span_context
with start_active_span(f"ResponseCache[{self._name}].calculate"):
- span = active_span()
+ span = get_active_span()
if span:
- span_context = span.context
+ span_context = span.get_span_context()
return await callback(*args, **kwargs)
d = run_in_background(cb)
@@ -257,9 +253,9 @@ class ResponseCache(Generic[KV]):
"[%s]: using incomplete cached result for [%s]", self._name, key
)
- span_context = entry.opentracing_span_context
- with start_active_span_follows_from(
+ span_context = entry.tracing_span_context
+ with start_active_span(
f"ResponseCache[{self._name}].wait",
- contexts=(span_context,) if span_context else (),
+ links=[Link(span_context)] if span_context else None,
):
return await make_deferred_yieldable(result)
|