summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/api/auth.py19
-rw-r--r--synapse/api/constants.py3
-rw-r--r--synapse/app/_base.py2
-rw-r--r--synapse/config/tracer.py43
-rw-r--r--synapse/federation/federation_server.py4
-rw-r--r--synapse/federation/sender/per_destination_queue.py4
-rw-r--r--synapse/federation/sender/transaction_manager.py31
-rw-r--r--synapse/federation/transport/server/_base.py102
-rw-r--r--synapse/federation/units.py11
-rw-r--r--synapse/handlers/device.py32
-rw-r--r--synapse/handlers/devicemessage.py12
-rw-r--r--synapse/handlers/e2e_keys.py26
-rw-r--r--synapse/handlers/e2e_room_keys.py2
-rw-r--r--synapse/handlers/sync.py23
-rw-r--r--synapse/http/client.py20
-rw-r--r--synapse/http/matrixfederationclient.py26
-rw-r--r--synapse/http/server.py24
-rw-r--r--synapse/http/site.py40
-rw-r--r--synapse/logging/context.py9
-rw-r--r--synapse/logging/opentracing.py972
-rw-r--r--synapse/logging/scopecontextmanager.py171
-rw-r--r--synapse/logging/tracing.py942
-rw-r--r--synapse/metrics/background_process_metrics.py7
-rw-r--r--synapse/notifier.py6
-rw-r--r--synapse/push/httppusher.py6
-rw-r--r--synapse/replication/http/_base.py8
-rw-r--r--synapse/replication/tcp/external_cache.py10
-rw-r--r--synapse/rest/client/keys.py8
-rw-r--r--synapse/rest/client/knock.py4
-rw-r--r--synapse/rest/client/room.py22
-rw-r--r--synapse/rest/client/sendtodevice.py6
-rw-r--r--synapse/rest/client/sync.py2
-rw-r--r--synapse/storage/controllers/persist_events.py45
-rw-r--r--synapse/storage/database.py36
-rw-r--r--synapse/storage/databases/main/deviceinbox.py12
-rw-r--r--synapse/storage/databases/main/devices.py42
-rw-r--r--synapse/storage/databases/main/e2e_room_keys.py2
-rw-r--r--synapse/storage/databases/main/end_to_end_keys.py22
-rw-r--r--synapse/storage/schema/__init__.py2
-rw-r--r--synapse/storage/schema/main/delta/72/04rename_opentelemtetry_tracing_context.sql18
-rw-r--r--synapse/util/caches/response_cache.py30
41 files changed, 1337 insertions, 1469 deletions
diff --git a/synapse/api/auth.py b/synapse/api/auth.py

index 523bad0c55..61da585ad0 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py
@@ -31,9 +31,9 @@ from synapse.api.errors import ( from synapse.appservice import ApplicationService from synapse.http import get_request_user_agent from synapse.http.site import SynapseRequest -from synapse.logging.opentracing import ( - active_span, +from synapse.logging.tracing import ( force_tracing, + get_active_span, start_active_span, trace, ) @@ -141,7 +141,7 @@ class Auth: is invalid. AuthError if access is denied for the user in the access token """ - parent_span = active_span() + parent_span = get_active_span() with start_active_span("get_user_by_req"): requester = await self._wrapped_get_user_by_req( request, allow_guest, allow_expired @@ -151,19 +151,18 @@ class Auth: if requester.authenticated_entity in self._force_tracing_for_users: # request tracing is enabled for this user, so we need to force it # tracing on for the parent span (which will be the servlet span). - # + force_tracing(parent_span) # It's too late for the get_user_by_req span to inherit the setting, # so we also force it on for that. force_tracing() - force_tracing(parent_span) - parent_span.set_tag( + parent_span.set_attribute( "authenticated_entity", requester.authenticated_entity ) - parent_span.set_tag("user_id", requester.user.to_string()) + parent_span.set_attribute("user_id", requester.user.to_string()) if requester.device_id is not None: - parent_span.set_tag("device_id", requester.device_id) + parent_span.set_attribute("device_id", requester.device_id) if requester.app_service is not None: - parent_span.set_tag("appservice_id", requester.app_service.id) + parent_span.set_attribute("appservice_id", requester.app_service.id) return requester async def _wrapped_get_user_by_req( @@ -174,7 +173,7 @@ class Auth: ) -> Requester: """Helper for get_user_by_req - Once get_user_by_req has set up the opentracing span, this does the actual work. + Once get_user_by_req has set up the tracing span, this does the actual work. """ try: ip_addr = request.getClientAddress().host diff --git a/synapse/api/constants.py b/synapse/api/constants.py
index 789859e69e..fc04e4d4bd 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py
@@ -193,6 +193,9 @@ class LimitBlockingTypes: class EventContentFields: """Fields found in events' content, regardless of type.""" + # Synapse internal content field for tracing + TRACING_CONTEXT: Final = "org.matrix.tracing_context" + # Labels for the event, cf https://github.com/matrix-org/matrix-doc/pull/2326 LABELS: Final = "org.matrix.labels" diff --git a/synapse/app/_base.py b/synapse/app/_base.py
index 923891ae0d..5763352b29 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py
@@ -62,7 +62,7 @@ from synapse.events.spamcheck import load_legacy_spam_checkers from synapse.events.third_party_rules import load_legacy_third_party_event_rules from synapse.handlers.auth import load_legacy_password_auth_providers from synapse.logging.context import PreserveLoggingContext -from synapse.logging.opentracing import init_tracer +from synapse.logging.tracing import init_tracer from synapse.metrics import install_gc_manager, register_threadpool from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.metrics.jemalloc import setup_jemalloc_stats diff --git a/synapse/config/tracer.py b/synapse/config/tracer.py
index c19270c6c5..d67498f50d 100644 --- a/synapse/config/tracer.py +++ b/synapse/config/tracer.py
@@ -24,41 +24,50 @@ class TracerConfig(Config): section = "tracing" def read_config(self, config: JsonDict, **kwargs: Any) -> None: - opentracing_config = config.get("opentracing") - if opentracing_config is None: - opentracing_config = {} + tracing_config = config.get("tracing") + if tracing_config is None: + tracing_config = {} - self.opentracer_enabled = opentracing_config.get("enabled", False) + self.tracing_enabled = tracing_config.get("enabled", False) - self.jaeger_config = opentracing_config.get( - "jaeger_config", - {"sampler": {"type": "const", "param": 1}, "logging": False}, + self.jaeger_exporter_config = tracing_config.get( + "jaeger_exporter_config", + {}, ) self.force_tracing_for_users: Set[str] = set() - if not self.opentracer_enabled: + if not self.tracing_enabled: return - check_requirements("opentracing") + check_requirements("opentelemetry") # The tracer is enabled so sanitize the config - self.opentracer_whitelist: List[str] = opentracing_config.get( + # Default to always sample. Range: [0.0 - 1.0] + self.sample_rate: float = float(tracing_config.get("sample_rate", 1)) + if self.sample_rate < 0.0 or self.sample_rate > 1.0: + raise ConfigError( + "Tracing sample_rate must be in range [0.0, 1.0].", + ("tracing", "sample_rate"), + ) + + self.homeserver_whitelist: List[str] = tracing_config.get( "homeserver_whitelist", [] ) - if not isinstance(self.opentracer_whitelist, list): - raise ConfigError("Tracer homeserver_whitelist config is malformed") - - force_tracing_for_users = opentracing_config.get("force_tracing_for_users", []) - if not isinstance(force_tracing_for_users, list): + if not isinstance(self.homeserver_whitelist, list): raise ConfigError( - "Expected a list", ("opentracing", "force_tracing_for_users") + "Tracing homeserver_whitelist config is malformed", + ("tracing", "homeserver_whitelist"), ) + + force_tracing_for_users = tracing_config.get("force_tracing_for_users", []) + if not isinstance(force_tracing_for_users, list): + raise ConfigError("Expected a list", ("tracing", "force_tracing_for_users")) for i, u in enumerate(force_tracing_for_users): if not isinstance(u, str): raise ConfigError( "Expected a string", - ("opentracing", "force_tracing_for_users", f"index {i}"), + ("tracing", "force_tracing_for_users", f"index {i}"), ) self.force_tracing_for_users.add(u) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 1d60137411..c9cd02ebeb 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py
@@ -61,7 +61,7 @@ from synapse.logging.context import ( nested_logging_context, run_in_background, ) -from synapse.logging.opentracing import log_kv, start_active_span_from_edu, trace +from synapse.logging.tracing import log_kv, start_active_span_from_edu, trace from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.replication.http.federation import ( ReplicationFederationSendEduRestServlet, @@ -1399,7 +1399,7 @@ class FederationHandlerRegistry: # Check if we have a handler on this instance handler = self.edu_handlers.get(edu_type) if handler: - with start_active_span_from_edu(content, "handle_edu"): + with start_active_span_from_edu("handle_edu", edu_content=content): try: await handler(origin, content) except SynapseError as e: diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index 41d8b937af..72bc935452 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py
@@ -32,7 +32,7 @@ from synapse.events import EventBase from synapse.federation.units import Edu from synapse.handlers.presence import format_user_presence_state from synapse.logging import issue9533_logger -from synapse.logging.opentracing import SynapseTags, set_tag +from synapse.logging.tracing import SynapseTags, set_attribute from synapse.metrics import sent_transactions_counter from synapse.metrics.background_process_metrics import run_as_background_process from synapse.types import ReadReceipt @@ -596,7 +596,7 @@ class PerDestinationQueue: if not message_id: continue - set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id) + set_attribute(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id) edus = [ Edu( diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py
index 75081810fd..3f2c8bcfa1 100644 --- a/synapse/federation/sender/transaction_manager.py +++ b/synapse/federation/sender/transaction_manager.py
@@ -21,11 +21,13 @@ from synapse.api.errors import HttpResponseException from synapse.events import EventBase from synapse.federation.persistence import TransactionActions from synapse.federation.units import Edu, Transaction -from synapse.logging.opentracing import ( +from synapse.logging.tracing import ( + Link, + StatusCode, extract_text_map, - set_tag, - start_active_span_follows_from, - tags, + get_span_context_from_context, + set_status, + start_active_span, whitelisted_homeserver, ) from synapse.types import JsonDict @@ -79,7 +81,7 @@ class TransactionManager: edus: List of EDUs to send """ - # Make a transaction-sending opentracing span. This span follows on from + # Make a transaction-sending tracing span. This span follows on from # all the edus in that transaction. This needs to be done since there is # no active span here, so if the edus were not received by the remote the # span would have no causality and it would be forgotten. @@ -88,13 +90,20 @@ class TransactionManager: keep_destination = whitelisted_homeserver(destination) for edu in edus: - context = edu.get_context() - if context: - span_contexts.append(extract_text_map(json_decoder.decode(context))) + tracing_context_json = edu.get_tracing_context_json() + if tracing_context_json: + context = extract_text_map(json_decoder.decode(tracing_context_json)) + if context: + span_context = get_span_context_from_context(context) + if span_context: + span_contexts.append(span_context) if keep_destination: - edu.strip_context() + edu.strip_tracing_context() - with start_active_span_follows_from("send_transaction", span_contexts): + with start_active_span( + "send_transaction", + links=[Link(span_context) for span_context in span_contexts], + ): logger.debug("TX [%s] _attempt_new_transaction", destination) txn_id = str(self._next_txn_id) @@ -166,7 +175,7 @@ class TransactionManager: except HttpResponseException as e: code = e.code - set_tag(tags.ERROR, True) + set_status(StatusCode.ERROR, e) logger.info("TX [%s] {%s} got %d response", destination, txn_id, code) raise diff --git a/synapse/federation/transport/server/_base.py b/synapse/federation/transport/server/_base.py
index bb0f8d6b7b..9425106a3f 100644 --- a/synapse/federation/transport/server/_base.py +++ b/synapse/federation/transport/server/_base.py
@@ -15,7 +15,6 @@ import functools import logging import re -import time from http import HTTPStatus from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, Optional, Tuple, cast @@ -25,12 +24,15 @@ from synapse.http.server import HttpServer, ServletCallback, is_method_cancellab from synapse.http.servlet import parse_json_object_from_request from synapse.http.site import SynapseRequest from synapse.logging.context import run_in_background -from synapse.logging.opentracing import ( - active_span, - set_tag, - span_context_from_request, +from synapse.logging.tracing import ( + Link, + context_from_request, + create_non_recording_span, + get_active_span, + set_attribute, start_active_span, - start_active_span_follows_from, + start_span, + use_span, whitelisted_homeserver, ) from synapse.types import JsonDict @@ -308,60 +310,70 @@ class BaseFederationServlet: logger.warning("authenticate_request failed: %s", e) raise - # update the active opentracing span with the authenticated entity - set_tag("authenticated_entity", str(origin)) + # update the active tracing span with the authenticated entity + set_attribute("authenticated_entity", str(origin)) # if the origin is authenticated and whitelisted, use its span context # as the parent. - context = None + origin_context = None if origin and whitelisted_homeserver(origin): - context = span_context_from_request(request) - - if context: - servlet_span = active_span() - # a scope which uses the origin's context as a parent - processing_start_time = time.time() - scope = start_active_span_follows_from( + origin_context = context_from_request(request) + + if origin_context: + local_servlet_span = get_active_span() + # Create a span which uses the `origin_context` as a parent + # so we can see how the incoming payload was processed while + # we're looking at the outgoing trace. Since the parent is set + # to a remote span (from the origin), it won't show up in the + # local trace which is why we create another span below for the + # local trace. A span can only have one parent so we have to + # create two separate ones. + remote_parent_span = start_span( "incoming-federation-request", - child_of=context, - contexts=(servlet_span,), - start_time=processing_start_time, + context=origin_context, + # Cross-link back to the local trace so we can jump + # to the incoming side from the remote origin trace. + links=[Link(local_servlet_span.get_span_context())] + if local_servlet_span + else None, ) + # Create a local span to appear in the local trace + local_parent_span_cm = start_active_span( + "process-federation-request", + # Cross-link back to the remote outgoing trace so we can + # jump over there. + links=[Link(remote_parent_span.get_span_context())], + ) else: - # just use our context as a parent - scope = start_active_span( - "incoming-federation-request", + # Otherwise just use our local active servlet context as a parent + local_parent_span_cm = start_active_span( + "process-federation-request", ) - try: - with scope: - if origin and self.RATELIMIT: - with ratelimiter.ratelimit(origin) as d: - await d - if request._disconnected: - logger.warning( - "client disconnected before we started processing " - "request" - ) - return None - response = await func( - origin, content, request.args, *args, **kwargs + # Don't need to record anything for the remote because no remote + # trace context given. + remote_parent_span = create_non_recording_span() + + remote_parent_span_cm = use_span(remote_parent_span, end_on_exit=True) + + with remote_parent_span_cm, local_parent_span_cm: + if origin and self.RATELIMIT: + with ratelimiter.ratelimit(origin) as d: + await d + if request._disconnected: + logger.warning( + "client disconnected before we started processing " + "request" ) - else: + return None response = await func( origin, content, request.args, *args, **kwargs ) - finally: - # if we used the origin's context as the parent, add a new span using - # the servlet span as a parent, so that we have a link - if context: - scope2 = start_active_span_follows_from( - "process-federation_request", - contexts=(scope.span,), - start_time=processing_start_time, + else: + response = await func( + origin, content, request.args, *args, **kwargs ) - scope2.close() return response diff --git a/synapse/federation/units.py b/synapse/federation/units.py
index b9b12fbea5..a6b590269e 100644 --- a/synapse/federation/units.py +++ b/synapse/federation/units.py
@@ -21,6 +21,7 @@ from typing import List, Optional import attr +from synapse.api.constants import EventContentFields from synapse.types import JsonDict logger = logging.getLogger(__name__) @@ -54,11 +55,13 @@ class Edu: "destination": self.destination, } - def get_context(self) -> str: - return getattr(self, "content", {}).get("org.matrix.opentracing_context", "{}") + def get_tracing_context_json(self) -> str: + return getattr(self, "content", {}).get( + EventContentFields.TRACING_CONTEXT, "{}" + ) - def strip_context(self) -> None: - getattr(self, "content", {})["org.matrix.opentracing_context"] = "{}" + def strip_tracing_context(self) -> None: + getattr(self, "content", {})[EventContentFields.TRACING_CONTEXT] = "{}" def _none_to_list(edus: Optional[List[JsonDict]]) -> List[JsonDict]: diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 1a8379854c..659ee0ef5e 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py
@@ -36,7 +36,7 @@ from synapse.api.errors import ( RequestSendFailed, SynapseError, ) -from synapse.logging.opentracing import log_kv, set_tag, trace +from synapse.logging.tracing import log_kv, set_attribute, trace from synapse.metrics.background_process_metrics import ( run_as_background_process, wrap_as_background_process, @@ -86,7 +86,7 @@ class DeviceWorkerHandler: info on each device """ - set_tag("user_id", user_id) + set_attribute("user_id", user_id) device_map = await self.store.get_devices_by_user(user_id) ips = await self.store.get_last_client_ip_by_device(user_id, device_id=None) @@ -118,8 +118,8 @@ class DeviceWorkerHandler: ips = await self.store.get_last_client_ip_by_device(user_id, device_id) _update_device_from_client_ips(device, ips) - set_tag("device", str(device)) - set_tag("ips", str(ips)) + set_attribute("device", str(device)) + set_attribute("ips", str(ips)) return device @@ -169,8 +169,8 @@ class DeviceWorkerHandler: joined a room, that `user_id` may be interested in. """ - set_tag("user_id", user_id) - set_tag("from_token", str(from_token)) + set_attribute("user_id", user_id) + set_attribute("from_token", str(from_token)) now_room_key = self.store.get_room_max_token() room_ids = await self.store.get_rooms_for_user(user_id) @@ -461,8 +461,8 @@ class DeviceHandler(DeviceWorkerHandler): except errors.StoreError as e: if e.code == 404: # no match - set_tag("error", True) - set_tag("reason", "User doesn't have that device id.") + set_attribute("error", True) + set_attribute("reason", "User doesn't have that device id.") else: raise @@ -688,7 +688,7 @@ class DeviceHandler(DeviceWorkerHandler): else: return - for user_id, device_id, room_id, stream_id, opentracing_context in rows: + for user_id, device_id, room_id, stream_id, tracing_context in rows: hosts = set() # Ignore any users that aren't ours @@ -707,7 +707,7 @@ class DeviceHandler(DeviceWorkerHandler): room_id=room_id, stream_id=stream_id, hosts=hosts, - context=opentracing_context, + context=tracing_context, ) # Notify replication that we've updated the device list stream. @@ -794,8 +794,8 @@ class DeviceListUpdater: for parsing the EDU and adding to pending updates list. """ - set_tag("origin", origin) - set_tag("edu_content", str(edu_content)) + set_attribute("origin", origin) + set_attribute("edu_content", str(edu_content)) user_id = edu_content.pop("user_id") device_id = edu_content.pop("device_id") stream_id = str(edu_content.pop("stream_id")) # They may come as ints @@ -815,7 +815,7 @@ class DeviceListUpdater: origin, ) - set_tag("error", True) + set_attribute("error", True) log_kv( { "message": "Got a device list update edu from a user and " @@ -830,7 +830,7 @@ class DeviceListUpdater: if not room_ids: # We don't share any rooms with this user. Ignore update, as we # probably won't get any further updates. - set_tag("error", True) + set_attribute("error", True) log_kv( { "message": "Got an update from a user for which " @@ -1027,12 +1027,12 @@ class DeviceListUpdater: # eventually become consistent. return None except FederationDeniedError as e: - set_tag("error", True) + set_attribute("error", True) log_kv({"reason": "FederationDeniedError"}) logger.info(e) return None except Exception as e: - set_tag("error", True) + set_attribute("error", True) log_kv( {"message": "Exception raised by federation request", "exception": e} ) diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py
index 444c08bc2e..9c9da0cb63 100644 --- a/synapse/handlers/devicemessage.py +++ b/synapse/handlers/devicemessage.py
@@ -15,15 +15,15 @@ import logging from typing import TYPE_CHECKING, Any, Dict -from synapse.api.constants import EduTypes, ToDeviceEventTypes +from synapse.api.constants import EduTypes, EventContentFields, ToDeviceEventTypes from synapse.api.errors import SynapseError from synapse.api.ratelimiting import Ratelimiter from synapse.logging.context import run_in_background -from synapse.logging.opentracing import ( +from synapse.logging.tracing import ( SynapseTags, get_active_span_text_map, log_kv, - set_tag, + set_attribute, ) from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet from synapse.types import JsonDict, Requester, StreamKeyType, UserID, get_domain_from_id @@ -217,10 +217,10 @@ class DeviceMessageHandler: sender_user_id = requester.user.to_string() message_id = random_string(16) - set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id) + set_attribute(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id) log_kv({"number_of_to_device_messages": len(messages)}) - set_tag("sender", sender_user_id) + set_attribute("sender", sender_user_id) local_messages = {} remote_messages: Dict[str, Dict[str, Dict[str, JsonDict]]] = {} for user_id, by_device in messages.items(): @@ -273,7 +273,7 @@ class DeviceMessageHandler: "sender": sender_user_id, "type": message_type, "message_id": message_id, - "org.matrix.opentracing_context": json_encoder.encode(context), + EventContentFields.TRACING_CONTEXT: json_encoder.encode(context), } # Add messages to the database. diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index c938339ddd..a3692f00d9 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py
@@ -28,7 +28,7 @@ from twisted.internet import defer from synapse.api.constants import EduTypes from synapse.api.errors import CodeMessageException, Codes, NotFoundError, SynapseError from synapse.logging.context import make_deferred_yieldable, run_in_background -from synapse.logging.opentracing import log_kv, set_tag, tag_args, trace +from synapse.logging.tracing import log_kv, set_attribute, tag_args, trace from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet from synapse.types import ( JsonDict, @@ -138,8 +138,8 @@ class E2eKeysHandler: else: remote_queries[user_id] = device_ids - set_tag("local_key_query", str(local_query)) - set_tag("remote_key_query", str(remote_queries)) + set_attribute("local_key_query", str(local_query)) + set_attribute("remote_key_query", str(remote_queries)) # First get local devices. # A map of destination -> failure response. @@ -342,8 +342,8 @@ class E2eKeysHandler: except Exception as e: failure = _exception_to_failure(e) failures[destination] = failure - set_tag("error", True) - set_tag("reason", str(failure)) + set_attribute("error", True) + set_attribute("reason", str(failure)) return @@ -405,7 +405,7 @@ class E2eKeysHandler: Returns: A map from user_id -> device_id -> device details """ - set_tag("local_query", str(query)) + set_attribute("local_query", str(query)) local_query: List[Tuple[str, Optional[str]]] = [] result_dict: Dict[str, Dict[str, dict]] = {} @@ -420,7 +420,7 @@ class E2eKeysHandler: "user_id": user_id, } ) - set_tag("error", True) + set_attribute("error", True) raise SynapseError(400, "Not a user here") if not device_ids: @@ -477,8 +477,8 @@ class E2eKeysHandler: domain = get_domain_from_id(user_id) remote_queries.setdefault(domain, {})[user_id] = one_time_keys - set_tag("local_key_query", str(local_query)) - set_tag("remote_key_query", str(remote_queries)) + set_attribute("local_key_query", str(local_query)) + set_attribute("remote_key_query", str(remote_queries)) results = await self.store.claim_e2e_one_time_keys(local_query) @@ -494,7 +494,7 @@ class E2eKeysHandler: @trace async def claim_client_keys(destination: str) -> None: - set_tag("destination", destination) + set_attribute("destination", destination) device_keys = remote_queries[destination] try: remote_result = await self.federation.claim_client_keys( @@ -507,8 +507,8 @@ class E2eKeysHandler: except Exception as e: failure = _exception_to_failure(e) failures[destination] = failure - set_tag("error", True) - set_tag("reason", str(failure)) + set_attribute("error", True) + set_attribute("reason", str(failure)) await make_deferred_yieldable( defer.gatherResults( @@ -611,7 +611,7 @@ class E2eKeysHandler: result = await self.store.count_e2e_one_time_keys(user_id, device_id) - set_tag("one_time_key_counts", str(result)) + set_attribute("one_time_key_counts", str(result)) return {"one_time_key_counts": result} async def _upload_one_time_keys_for_user( diff --git a/synapse/handlers/e2e_room_keys.py b/synapse/handlers/e2e_room_keys.py
index 28dc08c22a..8786534e54 100644 --- a/synapse/handlers/e2e_room_keys.py +++ b/synapse/handlers/e2e_room_keys.py
@@ -25,7 +25,7 @@ from synapse.api.errors import ( StoreError, SynapseError, ) -from synapse.logging.opentracing import log_kv, trace +from synapse.logging.tracing import log_kv, trace from synapse.storage.databases.main.e2e_room_keys import RoomKey from synapse.types import JsonDict from synapse.util.async_helpers import Linearizer diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index d42a414c90..8dc05d648d 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py
@@ -25,7 +25,12 @@ from synapse.api.room_versions import KNOWN_ROOM_VERSIONS from synapse.events import EventBase from synapse.handlers.relations import BundledAggregations from synapse.logging.context import current_context -from synapse.logging.opentracing import SynapseTags, log_kv, set_tag, start_active_span +from synapse.logging.tracing import ( + SynapseTags, + log_kv, + set_attribute, + start_active_span, +) from synapse.push.clientformat import format_push_rules_for_user from synapse.storage.databases.main.event_push_actions import NotifCounts from synapse.storage.roommember import MemberSummary @@ -391,12 +396,12 @@ class SyncHandler: indoctrination. """ with start_active_span("sync.current_sync_for_user"): - log_kv({"since_token": since_token}) + log_kv({"since_token": str(since_token)}) sync_result = await self.generate_sync_result( sync_config, since_token, full_state ) - set_tag(SynapseTags.SYNC_RESULT, bool(sync_result)) + set_attribute(SynapseTags.SYNC_RESULT, bool(sync_result)) return sync_result async def push_rules_for_user(self, user: UserID) -> Dict[str, Dict[str, list]]: @@ -1084,7 +1089,7 @@ class SyncHandler: # to query up to a given point. # Always use the `now_token` in `SyncResultBuilder` now_token = self.event_sources.get_current_token() - log_kv({"now_token": now_token}) + log_kv({"now_token": str(now_token)}) logger.debug( "Calculating sync response for %r between %s and %s", @@ -1337,7 +1342,7 @@ class SyncHandler: # `/sync` message_id = message.pop("message_id", None) if message_id: - set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id) + set_attribute(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id) logger.debug( "Returning %d to-device messages between %d and %d (current token: %d)", @@ -1997,13 +2002,13 @@ class SyncHandler: upto_token = room_builder.upto_token with start_active_span("sync.generate_room_entry"): - set_tag("room_id", room_id) + set_attribute("room_id", room_id) log_kv({"events": len(events or ())}) log_kv( { - "since_token": since_token, - "upto_token": upto_token, + "since_token": str(since_token), + "upto_token": str(upto_token), } ) @@ -2018,7 +2023,7 @@ class SyncHandler: log_kv( { "batch_events": len(batch.events), - "prev_batch": batch.prev_batch, + "prev_batch": str(batch.prev_batch), "batch_limited": batch.limited, } ) diff --git a/synapse/http/client.py b/synapse/http/client.py
index 084d0a5b84..89bd403312 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py
@@ -75,7 +75,13 @@ from synapse.http import QuieterFileBodyProducer, RequestTimedOutError, redact_u from synapse.http.proxyagent import ProxyAgent from synapse.http.types import QueryParams from synapse.logging.context import make_deferred_yieldable -from synapse.logging.opentracing import set_tag, start_active_span, tags +from synapse.logging.tracing import ( + SpanAttributes, + SpanKind, + StatusCode, + set_status, + start_active_span, +) from synapse.types import ISynapseReactor from synapse.util import json_decoder from synapse.util.async_helpers import timeout_deferred @@ -402,12 +408,11 @@ class SimpleHttpClient: with start_active_span( "outgoing-client-request", - tags={ - tags.SPAN_KIND: tags.SPAN_KIND_RPC_CLIENT, - tags.HTTP_METHOD: method, - tags.HTTP_URL: uri, + kind=SpanKind.CLIENT, + attributes={ + SpanAttributes.HTTP_METHOD: method, + SpanAttributes.HTTP_URL: uri, }, - finish_on_close=True, ): try: body_producer = None @@ -459,8 +464,7 @@ class SimpleHttpClient: type(e).__name__, e.args[0], ) - set_tag(tags.ERROR, True) - set_tag("error_reason", e.args[0]) + set_status(StatusCode.ERROR, e) raise async def post_urlencoded_get_json( diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 3c35b1d2c7..00704a6a7c 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py
@@ -72,9 +72,14 @@ from synapse.http.client import ( ) from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent from synapse.http.types import QueryParams -from synapse.logging import opentracing +from synapse.logging import tracing from synapse.logging.context import make_deferred_yieldable, run_in_background -from synapse.logging.opentracing import set_tag, start_active_span, tags +from synapse.logging.tracing import ( + SpanAttributes, + SpanKind, + set_attribute, + start_active_span, +) from synapse.types import JsonDict from synapse.util import json_decoder from synapse.util.async_helpers import AwakenableSleeper, timeout_deferred @@ -517,18 +522,19 @@ class MatrixFederationHttpClient: scope = start_active_span( "outgoing-federation-request", - tags={ - tags.SPAN_KIND: tags.SPAN_KIND_RPC_CLIENT, - tags.PEER_ADDRESS: request.destination, - tags.HTTP_METHOD: request.method, - tags.HTTP_URL: request.path, + kind=SpanKind.CLIENT, + attributes={ + SpanAttributes.HTTP_HOST: request.destination, + SpanAttributes.HTTP_METHOD: request.method, + SpanAttributes.HTTP_URL: request.path, }, - finish_on_close=True, ) # Inject the span into the headers headers_dict: Dict[bytes, List[bytes]] = {} - opentracing.inject_header_dict(headers_dict, request.destination) + tracing.inject_active_tracing_context_into_header_dict( + headers_dict, request.destination + ) headers_dict[b"User-Agent"] = [self.version_string_bytes] @@ -614,7 +620,7 @@ class MatrixFederationHttpClient: request.method, response.code ).inc() - set_tag(tags.HTTP_STATUS_CODE, response.code) + set_attribute(SpanAttributes.HTTP_STATUS_CODE, response.code) response_phrase = response.phrase.decode("ascii", errors="replace") if 200 <= response.code < 300: diff --git a/synapse/http/server.py b/synapse/http/server.py
index 19f42159b8..6420c0837b 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py
@@ -61,13 +61,13 @@ from synapse.api.errors import ( from synapse.config.homeserver import HomeServerConfig from synapse.http.site import SynapseRequest from synapse.logging.context import defer_to_thread, preserve_fn, run_in_background -from synapse.logging.opentracing import active_span, start_active_span, trace_servlet +from synapse.logging.tracing import get_active_span, start_active_span, trace_servlet from synapse.util import json_encoder from synapse.util.caches import intern_dict from synapse.util.iterutils import chunk_seq if TYPE_CHECKING: - import opentracing + import opentelemetry from synapse.server import HomeServer @@ -329,7 +329,7 @@ class HttpServer(Protocol): subsequent arguments will be any matched groups from the regex. This should return either tuple of (code, response), or None. servlet_classname (str): The name of the handler to be used in prometheus - and opentracing logs. + and tracing logs. """ @@ -340,7 +340,7 @@ class _AsyncResource(resource.Resource, metaclass=abc.ABCMeta): requests by method, or override `_async_render` to handle all requests. Args: - extract_context: Whether to attempt to extract the opentracing + extract_context: Whether to attempt to extract the tracing context from the request the servlet is handling. """ @@ -510,7 +510,7 @@ class JsonResource(DirectServeJsonResource): callback: The handler for the request. Usually a Servlet servlet_classname: The name of the handler to be used in prometheus - and opentracing logs. + and tracing logs. """ method_bytes = method.encode("utf-8") @@ -878,19 +878,19 @@ async def _async_write_json_to_request_in_thread( expensive. """ - def encode(opentracing_span: "Optional[opentracing.Span]") -> bytes: + def encode(tracing_span: Optional["opentelemetry.trace.Span"]) -> bytes: # it might take a while for the threadpool to schedule us, so we write - # opentracing logs once we actually get scheduled, so that we can see how + # tracing logs once we actually get scheduled, so that we can see how # much that contributed. - if opentracing_span: - opentracing_span.log_kv({"event": "scheduled"}) + if tracing_span: + tracing_span.add_event("scheduled", attributes={"event": "scheduled"}) res = json_encoder(json_object) - if opentracing_span: - opentracing_span.log_kv({"event": "encoded"}) + if tracing_span: + tracing_span.add_event("scheduled", attributes={"event": "encoded"}) return res with start_active_span("encode_json_response"): - span = active_span() + span = get_active_span() json_str = await defer_to_thread(request.reactor, encode, span) _write_bytes_to_request(request, json_str) diff --git a/synapse/http/site.py b/synapse/http/site.py
index eeec74b78a..d82c046dd7 100644 --- a/synapse/http/site.py +++ b/synapse/http/site.py
@@ -37,7 +37,7 @@ from synapse.logging.context import ( from synapse.types import Requester if TYPE_CHECKING: - import opentracing + import opentelemetry logger = logging.getLogger(__name__) @@ -85,9 +85,9 @@ class SynapseRequest(Request): # server name, for client requests this is the Requester object. self._requester: Optional[Union[Requester, str]] = None - # An opentracing span for this request. Will be closed when the request is + # An tracing span for this request. Will be closed when the request is # completely processed. - self._opentracing_span: "Optional[opentracing.Span]" = None + self._tracing_span: Optional["opentelemetry.trace.Span"] = None # we can't yet create the logcontext, as we don't know the method. self.logcontext: Optional[LoggingContext] = None @@ -164,12 +164,12 @@ class SynapseRequest(Request): # If there's no authenticated entity, it was the requester. self.logcontext.request.authenticated_entity = authenticated_entity or requester - def set_opentracing_span(self, span: "opentracing.Span") -> None: - """attach an opentracing span to this request + def set_tracing_span(self, span: "opentelemetry.trace.Span") -> None: + """attach an tracing span to this request Doing so will cause the span to be closed when we finish processing the request """ - self._opentracing_span = span + self._tracing_span = span def get_request_id(self) -> str: return "%s-%i" % (self.get_method(), self.request_seq) @@ -309,8 +309,10 @@ class SynapseRequest(Request): self._processing_finished_time = time.time() self._is_processing = False - if self._opentracing_span: - self._opentracing_span.log_kv({"event": "finished processing"}) + if self._tracing_span: + self._tracing_span.add_event( + "finished processing", attributes={"event": "finished processing"} + ) # if we've already sent the response, log it now; otherwise, we wait for the # response to be sent. @@ -325,8 +327,10 @@ class SynapseRequest(Request): """ self.finish_time = time.time() Request.finish(self) - if self._opentracing_span: - self._opentracing_span.log_kv({"event": "response sent"}) + if self._tracing_span: + self._tracing_span.add_event( + "response sent", attributes={"event": "response sent"} + ) if not self._is_processing: assert self.logcontext is not None with PreserveLoggingContext(self.logcontext): @@ -361,9 +365,13 @@ class SynapseRequest(Request): with PreserveLoggingContext(self.logcontext): logger.info("Connection from client lost before response was sent") - if self._opentracing_span: - self._opentracing_span.log_kv( - {"event": "client connection lost", "reason": str(reason.value)} + if self._tracing_span: + self._tracing_span.add_event( + "client connection lost", + attributes={ + "event": "client connection lost", + "reason": str(reason.value), + }, ) if self._is_processing: @@ -471,9 +479,9 @@ class SynapseRequest(Request): usage.evt_db_fetch_count, ) - # complete the opentracing span, if any. - if self._opentracing_span: - self._opentracing_span.finish() + # complete the tracing span, if any. + if self._tracing_span: + self._tracing_span.end() try: self.request_metrics.stop(self.finish_time, self.code, self.sentLength) diff --git a/synapse/logging/context.py b/synapse/logging/context.py
index fd9cb97920..a417b13ffd 100644 --- a/synapse/logging/context.py +++ b/synapse/logging/context.py
@@ -46,7 +46,6 @@ from twisted.internet import defer, threads from twisted.python.threadpool import ThreadPool if TYPE_CHECKING: - from synapse.logging.scopecontextmanager import _LogContextScope from synapse.types import ISynapseReactor logger = logging.getLogger(__name__) @@ -221,14 +220,13 @@ LoggingContextOrSentinel = Union["LoggingContext", "_Sentinel"] class _Sentinel: """Sentinel to represent the root context""" - __slots__ = ["previous_context", "finished", "request", "scope", "tag"] + __slots__ = ["previous_context", "finished", "request", "tag"] def __init__(self) -> None: # Minimal set for compatibility with LoggingContext self.previous_context = None self.finished = False self.request = None - self.scope = None self.tag = None def __str__(self) -> str: @@ -281,7 +279,6 @@ class LoggingContext: "finished", "request", "tag", - "scope", ] def __init__( @@ -302,7 +299,6 @@ class LoggingContext: self.main_thread = get_thread_id() self.request = None self.tag = "" - self.scope: Optional["_LogContextScope"] = None # keep track of whether we have hit the __exit__ block for this context # (suggesting that the the thing that created the context thinks it should @@ -315,9 +311,6 @@ class LoggingContext: # we track the current request_id self.request = self.parent_context.request - # we also track the current scope: - self.scope = self.parent_context.scope - if request is not None: # the request param overrides the request from the parent context self.request = request diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py deleted file mode 100644
index c1aa205eed..0000000000 --- a/synapse/logging/opentracing.py +++ /dev/null
@@ -1,972 +0,0 @@ -# Copyright 2019 The Matrix.org Foundation C.I.C. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -# NOTE -# This is a small wrapper around opentracing because opentracing is not currently -# packaged downstream (specifically debian). Since opentracing instrumentation is -# fairly invasive it was awkward to make it optional. As a result we opted to encapsulate -# all opentracing state in these methods which effectively noop if opentracing is -# not present. We should strongly consider encouraging the downstream distributers -# to package opentracing and making opentracing a full dependency. In order to facilitate -# this move the methods have work very similarly to opentracing's and it should only -# be a matter of few regexes to move over to opentracing's access patterns proper. - -""" -============================ -Using OpenTracing in Synapse -============================ - -Python-specific tracing concepts are at https://opentracing.io/guides/python/. -Note that Synapse wraps OpenTracing in a small module (this one) in order to make the -OpenTracing dependency optional. That means that the access patterns are -different to those demonstrated in the OpenTracing guides. However, it is -still useful to know, especially if OpenTracing is included as a full dependency -in the future or if you are modifying this module. - - -OpenTracing is encapsulated so that -no span objects from OpenTracing are exposed in Synapse's code. This allows -OpenTracing to be easily disabled in Synapse and thereby have OpenTracing as -an optional dependency. This does however limit the number of modifiable spans -at any point in the code to one. From here out references to `opentracing` -in the code snippets refer to the Synapses module. -Most methods provided in the module have a direct correlation to those provided -by opentracing. Refer to docs there for a more in-depth documentation on some of -the args and methods. - -Tracing -------- - -In Synapse it is not possible to start a non-active span. Spans can be started -using the ``start_active_span`` method. This returns a scope (see -OpenTracing docs) which is a context manager that needs to be entered and -exited. This is usually done by using ``with``. - -.. code-block:: python - - from synapse.logging.opentracing import start_active_span - - with start_active_span("operation name"): - # Do something we want to tracer - -Forgetting to enter or exit a scope will result in some mysterious and grievous log -context errors. - -At anytime where there is an active span ``opentracing.set_tag`` can be used to -set a tag on the current active span. - -Tracing functions ------------------ - -Functions can be easily traced using decorators. The name of -the function becomes the operation name for the span. - -.. code-block:: python - - from synapse.logging.opentracing import trace - - # Start a span using 'interesting_function' as the operation name - @trace - def interesting_function(*args, **kwargs): - # Does all kinds of cool and expected things - return something_usual_and_useful - - -Operation names can be explicitly set for a function by using ``trace_with_opname``: - -.. code-block:: python - - from synapse.logging.opentracing import trace_with_opname - - @trace_with_opname("a_better_operation_name") - def interesting_badly_named_function(*args, **kwargs): - # Does all kinds of cool and expected things - return something_usual_and_useful - -Setting Tags ------------- - -To set a tag on the active span do - -.. code-block:: python - - from synapse.logging.opentracing import set_tag - - set_tag(tag_name, tag_value) - -There's a convenient decorator to tag all the args of the method. It uses -inspection in order to use the formal parameter names prefixed with 'ARG_' as -tag names. It uses kwarg names as tag names without the prefix. - -.. code-block:: python - - from synapse.logging.opentracing import tag_args - - @tag_args - def set_fates(clotho, lachesis, atropos, father="Zues", mother="Themis"): - pass - - set_fates("the story", "the end", "the act") - # This will have the following tags - # - ARG_clotho: "the story" - # - ARG_lachesis: "the end" - # - ARG_atropos: "the act" - # - father: "Zues" - # - mother: "Themis" - -Contexts and carriers ---------------------- - -There are a selection of wrappers for injecting and extracting contexts from -carriers provided. Unfortunately OpenTracing's three context injection -techniques are not adequate for our inject of OpenTracing span-contexts into -Twisted's http headers, EDU contents and our database tables. Also note that -the binary encoding format mandated by OpenTracing is not actually implemented -by jaeger_client v4.0.0 - it will silently noop. -Please refer to the end of ``logging/opentracing.py`` for the available -injection and extraction methods. - -Homeserver whitelisting ------------------------ - -Most of the whitelist checks are encapsulated in the modules's injection -and extraction method but be aware that using custom carriers or crossing -unchartered waters will require the enforcement of the whitelist. -``logging/opentracing.py`` has a ``whitelisted_homeserver`` method which takes -in a destination and compares it to the whitelist. - -Most injection methods take a 'destination' arg. The context will only be injected -if the destination matches the whitelist or the destination is None. - -======= -Gotchas -======= - -- Checking whitelists on span propagation -- Inserting pii -- Forgetting to enter or exit a scope -- Span source: make sure that the span you expect to be active across a - function call really will be that one. Does the current function have more - than one caller? Will all of those calling functions have be in a context - with an active span? -""" -import contextlib -import enum -import inspect -import logging -import re -from functools import wraps -from typing import ( - TYPE_CHECKING, - Any, - Callable, - Collection, - Dict, - Generator, - Iterable, - List, - Optional, - Pattern, - Type, - TypeVar, - Union, - cast, - overload, -) - -import attr -from typing_extensions import ParamSpec - -from twisted.internet import defer -from twisted.web.http import Request -from twisted.web.http_headers import Headers - -from synapse.config import ConfigError -from synapse.util import json_decoder, json_encoder - -if TYPE_CHECKING: - from synapse.http.site import SynapseRequest - from synapse.server import HomeServer - -# Helper class - - -class _DummyTagNames: - """wrapper of opentracings tags. We need to have them if we - want to reference them without opentracing around. Clearly they - should never actually show up in a trace. `set_tags` overwrites - these with the correct ones.""" - - INVALID_TAG = "invalid-tag" - COMPONENT = INVALID_TAG - DATABASE_INSTANCE = INVALID_TAG - DATABASE_STATEMENT = INVALID_TAG - DATABASE_TYPE = INVALID_TAG - DATABASE_USER = INVALID_TAG - ERROR = INVALID_TAG - HTTP_METHOD = INVALID_TAG - HTTP_STATUS_CODE = INVALID_TAG - HTTP_URL = INVALID_TAG - MESSAGE_BUS_DESTINATION = INVALID_TAG - PEER_ADDRESS = INVALID_TAG - PEER_HOSTNAME = INVALID_TAG - PEER_HOST_IPV4 = INVALID_TAG - PEER_HOST_IPV6 = INVALID_TAG - PEER_PORT = INVALID_TAG - PEER_SERVICE = INVALID_TAG - SAMPLING_PRIORITY = INVALID_TAG - SERVICE = INVALID_TAG - SPAN_KIND = INVALID_TAG - SPAN_KIND_CONSUMER = INVALID_TAG - SPAN_KIND_PRODUCER = INVALID_TAG - SPAN_KIND_RPC_CLIENT = INVALID_TAG - SPAN_KIND_RPC_SERVER = INVALID_TAG - - -try: - import opentracing - import opentracing.tags - - tags = opentracing.tags -except ImportError: - opentracing = None # type: ignore[assignment] - tags = _DummyTagNames # type: ignore[assignment] -try: - from jaeger_client import Config as JaegerConfig - - from synapse.logging.scopecontextmanager import LogContextScopeManager -except ImportError: - JaegerConfig = None # type: ignore - LogContextScopeManager = None # type: ignore - - -try: - from rust_python_jaeger_reporter import Reporter - - # jaeger-client 4.7.0 requires that reporters inherit from BaseReporter, which - # didn't exist before that version. - try: - from jaeger_client.reporter import BaseReporter - except ImportError: - - class BaseReporter: # type: ignore[no-redef] - pass - - @attr.s(slots=True, frozen=True, auto_attribs=True) - class _WrappedRustReporter(BaseReporter): - """Wrap the reporter to ensure `report_span` never throws.""" - - _reporter: Reporter = attr.Factory(Reporter) - - def set_process(self, *args: Any, **kwargs: Any) -> None: - return self._reporter.set_process(*args, **kwargs) - - def report_span(self, span: "opentracing.Span") -> None: - try: - return self._reporter.report_span(span) - except Exception: - logger.exception("Failed to report span") - - RustReporter: Optional[Type[_WrappedRustReporter]] = _WrappedRustReporter -except ImportError: - RustReporter = None - - -logger = logging.getLogger(__name__) - - -class SynapseTags: - # The message ID of any to_device message processed - TO_DEVICE_MESSAGE_ID = "to_device.message_id" - - # Whether the sync response has new data to be returned to the client. - SYNC_RESULT = "sync.new_data" - - # incoming HTTP request ID (as written in the logs) - REQUEST_ID = "request_id" - - # HTTP request tag (used to distinguish full vs incremental syncs, etc) - REQUEST_TAG = "request_tag" - - # Text description of a database transaction - DB_TXN_DESC = "db.txn_desc" - - # Uniqueish ID of a database transaction - DB_TXN_ID = "db.txn_id" - - # The name of the external cache - CACHE_NAME = "cache.name" - - -class SynapseBaggage: - FORCE_TRACING = "synapse-force-tracing" - - -# Block everything by default -# A regex which matches the server_names to expose traces for. -# None means 'block everything'. -_homeserver_whitelist: Optional[Pattern[str]] = None - -# Util methods - - -class _Sentinel(enum.Enum): - # defining a sentinel in this way allows mypy to correctly handle the - # type of a dictionary lookup. - sentinel = object() - - -P = ParamSpec("P") -R = TypeVar("R") -T = TypeVar("T") - - -def only_if_tracing(func: Callable[P, R]) -> Callable[P, Optional[R]]: - """Executes the function only if we're tracing. Otherwise returns None.""" - - @wraps(func) - def _only_if_tracing_inner(*args: P.args, **kwargs: P.kwargs) -> Optional[R]: - if opentracing: - return func(*args, **kwargs) - else: - return None - - return _only_if_tracing_inner - - -@overload -def ensure_active_span( - message: str, -) -> Callable[[Callable[P, R]], Callable[P, Optional[R]]]: - ... - - -@overload -def ensure_active_span( - message: str, ret: T -) -> Callable[[Callable[P, R]], Callable[P, Union[T, R]]]: - ... - - -def ensure_active_span( - message: str, ret: Optional[T] = None -) -> Callable[[Callable[P, R]], Callable[P, Union[Optional[T], R]]]: - """Executes the operation only if opentracing is enabled and there is an active span. - If there is no active span it logs message at the error level. - - Args: - message: Message which fills in "There was no active span when trying to %s" - in the error log if there is no active span and opentracing is enabled. - ret: return value if opentracing is None or there is no active span. - - Returns: - The result of the func, falling back to ret if opentracing is disabled or there - was no active span. - """ - - def ensure_active_span_inner_1( - func: Callable[P, R] - ) -> Callable[P, Union[Optional[T], R]]: - @wraps(func) - def ensure_active_span_inner_2( - *args: P.args, **kwargs: P.kwargs - ) -> Union[Optional[T], R]: - if not opentracing: - return ret - - if not opentracing.tracer.active_span: - logger.error( - "There was no active span when trying to %s." - " Did you forget to start one or did a context slip?", - message, - stack_info=True, - ) - - return ret - - return func(*args, **kwargs) - - return ensure_active_span_inner_2 - - return ensure_active_span_inner_1 - - -# Setup - - -def init_tracer(hs: "HomeServer") -> None: - """Set the whitelists and initialise the JaegerClient tracer""" - global opentracing - if not hs.config.tracing.opentracer_enabled: - # We don't have a tracer - opentracing = None # type: ignore[assignment] - return - - if not opentracing or not JaegerConfig: - raise ConfigError( - "The server has been configured to use opentracing but opentracing is not " - "installed." - ) - - # Pull out the jaeger config if it was given. Otherwise set it to something sensible. - # See https://github.com/jaegertracing/jaeger-client-python/blob/master/jaeger_client/config.py - - set_homeserver_whitelist(hs.config.tracing.opentracer_whitelist) - - from jaeger_client.metrics.prometheus import PrometheusMetricsFactory - - config = JaegerConfig( - config=hs.config.tracing.jaeger_config, - service_name=f"{hs.config.server.server_name} {hs.get_instance_name()}", - scope_manager=LogContextScopeManager(), - metrics_factory=PrometheusMetricsFactory(), - ) - - # If we have the rust jaeger reporter available let's use that. - if RustReporter: - logger.info("Using rust_python_jaeger_reporter library") - assert config.sampler is not None - tracer = config.create_tracer(RustReporter(), config.sampler) - opentracing.set_global_tracer(tracer) - else: - config.initialize_tracer() - - -# Whitelisting - - -@only_if_tracing -def set_homeserver_whitelist(homeserver_whitelist: Iterable[str]) -> None: - """Sets the homeserver whitelist - - Args: - homeserver_whitelist: regexes specifying whitelisted homeservers - """ - global _homeserver_whitelist - if homeserver_whitelist: - # Makes a single regex which accepts all passed in regexes in the list - _homeserver_whitelist = re.compile( - "({})".format(")|(".join(homeserver_whitelist)) - ) - - -@only_if_tracing -def whitelisted_homeserver(destination: str) -> bool: - """Checks if a destination matches the whitelist - - Args: - destination - """ - - if _homeserver_whitelist: - return _homeserver_whitelist.match(destination) is not None - return False - - -# Start spans and scopes - -# Could use kwargs but I want these to be explicit -def start_active_span( - operation_name: str, - child_of: Optional[Union["opentracing.Span", "opentracing.SpanContext"]] = None, - references: Optional[List["opentracing.Reference"]] = None, - tags: Optional[Dict[str, str]] = None, - start_time: Optional[float] = None, - ignore_active_span: bool = False, - finish_on_close: bool = True, - *, - tracer: Optional["opentracing.Tracer"] = None, -) -> "opentracing.Scope": - """Starts an active opentracing span. - - Records the start time for the span, and sets it as the "active span" in the - scope manager. - - Args: - See opentracing.tracer - Returns: - scope (Scope) or contextlib.nullcontext - """ - - if opentracing is None: - return contextlib.nullcontext() # type: ignore[unreachable] - - if tracer is None: - # use the global tracer by default - tracer = opentracing.tracer - - return tracer.start_active_span( - operation_name, - child_of=child_of, - references=references, - tags=tags, - start_time=start_time, - ignore_active_span=ignore_active_span, - finish_on_close=finish_on_close, - ) - - -def start_active_span_follows_from( - operation_name: str, - contexts: Collection, - child_of: Optional[Union["opentracing.Span", "opentracing.SpanContext"]] = None, - start_time: Optional[float] = None, - *, - inherit_force_tracing: bool = False, - tracer: Optional["opentracing.Tracer"] = None, -) -> "opentracing.Scope": - """Starts an active opentracing span, with additional references to previous spans - - Args: - operation_name: name of the operation represented by the new span - contexts: the previous spans to inherit from - - child_of: optionally override the parent span. If unset, the currently active - span will be the parent. (If there is no currently active span, the first - span in `contexts` will be the parent.) - - start_time: optional override for the start time of the created span. Seconds - since the epoch. - - inherit_force_tracing: if set, and any of the previous contexts have had tracing - forced, the new span will also have tracing forced. - tracer: override the opentracing tracer. By default the global tracer is used. - """ - if opentracing is None: - return contextlib.nullcontext() # type: ignore[unreachable] - - references = [opentracing.follows_from(context) for context in contexts] - scope = start_active_span( - operation_name, - child_of=child_of, - references=references, - start_time=start_time, - tracer=tracer, - ) - - if inherit_force_tracing and any( - is_context_forced_tracing(ctx) for ctx in contexts - ): - force_tracing(scope.span) - - return scope - - -def start_active_span_from_edu( - edu_content: Dict[str, Any], - operation_name: str, - references: Optional[List["opentracing.Reference"]] = None, - tags: Optional[Dict[str, str]] = None, - start_time: Optional[float] = None, - ignore_active_span: bool = False, - finish_on_close: bool = True, -) -> "opentracing.Scope": - """ - Extracts a span context from an edu and uses it to start a new active span - - Args: - edu_content: an edu_content with a `context` field whose value is - canonical json for a dict which contains opentracing information. - - For the other args see opentracing.tracer - """ - references = references or [] - - if opentracing is None: - return contextlib.nullcontext() # type: ignore[unreachable] - - carrier = json_decoder.decode(edu_content.get("context", "{}")).get( - "opentracing", {} - ) - context = opentracing.tracer.extract(opentracing.Format.TEXT_MAP, carrier) - _references = [ - opentracing.child_of(span_context_from_string(x)) - for x in carrier.get("references", []) - ] - - # For some reason jaeger decided not to support the visualization of multiple parent - # spans or explicitly show references. I include the span context as a tag here as - # an aid to people debugging but it's really not an ideal solution. - - references += _references - - scope = opentracing.tracer.start_active_span( - operation_name, - child_of=context, - references=references, - tags=tags, - start_time=start_time, - ignore_active_span=ignore_active_span, - finish_on_close=finish_on_close, - ) - - scope.span.set_tag("references", carrier.get("references", [])) - return scope - - -# Opentracing setters for tags, logs, etc -@only_if_tracing -def active_span() -> Optional["opentracing.Span"]: - """Get the currently active span, if any""" - return opentracing.tracer.active_span - - -@ensure_active_span("set a tag") -def set_tag(key: str, value: Union[str, bool, int, float]) -> None: - """Sets a tag on the active span""" - assert opentracing.tracer.active_span is not None - opentracing.tracer.active_span.set_tag(key, value) - - -@ensure_active_span("log") -def log_kv(key_values: Dict[str, Any], timestamp: Optional[float] = None) -> None: - """Log to the active span""" - assert opentracing.tracer.active_span is not None - opentracing.tracer.active_span.log_kv(key_values, timestamp) - - -@ensure_active_span("set the traces operation name") -def set_operation_name(operation_name: str) -> None: - """Sets the operation name of the active span""" - assert opentracing.tracer.active_span is not None - opentracing.tracer.active_span.set_operation_name(operation_name) - - -@only_if_tracing -def force_tracing( - span: Union["opentracing.Span", _Sentinel] = _Sentinel.sentinel -) -> None: - """Force sampling for the active/given span and its children. - - Args: - span: span to force tracing for. By default, the active span. - """ - if isinstance(span, _Sentinel): - span_to_trace = opentracing.tracer.active_span - else: - span_to_trace = span - if span_to_trace is None: - logger.error("No active span in force_tracing") - return - - span_to_trace.set_tag(opentracing.tags.SAMPLING_PRIORITY, 1) - - # also set a bit of baggage, so that we have a way of figuring out if - # it is enabled later - span_to_trace.set_baggage_item(SynapseBaggage.FORCE_TRACING, "1") - - -def is_context_forced_tracing( - span_context: Optional["opentracing.SpanContext"], -) -> bool: - """Check if sampling has been force for the given span context.""" - if span_context is None: - return False - return span_context.baggage.get(SynapseBaggage.FORCE_TRACING) is not None - - -# Injection and extraction - - -@ensure_active_span("inject the span into a header dict") -def inject_header_dict( - headers: Dict[bytes, List[bytes]], - destination: Optional[str] = None, - check_destination: bool = True, -) -> None: - """ - Injects a span context into a dict of HTTP headers - - Args: - headers: the dict to inject headers into - destination: address of entity receiving the span context. Must be given unless - check_destination is False. The context will only be injected if the - destination matches the opentracing whitelist - check_destination (bool): If false, destination will be ignored and the context - will always be injected. - - Note: - The headers set by the tracer are custom to the tracer implementation which - should be unique enough that they don't interfere with any headers set by - synapse or twisted. If we're still using jaeger these headers would be those - here: - https://github.com/jaegertracing/jaeger-client-python/blob/master/jaeger_client/constants.py - """ - if check_destination: - if destination is None: - raise ValueError( - "destination must be given unless check_destination is False" - ) - if not whitelisted_homeserver(destination): - return - - span = opentracing.tracer.active_span - - carrier: Dict[str, str] = {} - assert span is not None - opentracing.tracer.inject(span.context, opentracing.Format.HTTP_HEADERS, carrier) - - for key, value in carrier.items(): - headers[key.encode()] = [value.encode()] - - -def inject_response_headers(response_headers: Headers) -> None: - """Inject the current trace id into the HTTP response headers""" - if not opentracing: - return - span = opentracing.tracer.active_span - if not span: - return - - # This is a bit implementation-specific. - # - # Jaeger's Spans have a trace_id property; other implementations (including the - # dummy opentracing.span.Span which we use if init_tracer is not called) do not - # expose it - trace_id = getattr(span, "trace_id", None) - - if trace_id is not None: - response_headers.addRawHeader("Synapse-Trace-Id", f"{trace_id:x}") - - -@ensure_active_span( - "get the active span context as a dict", ret=cast(Dict[str, str], {}) -) -def get_active_span_text_map(destination: Optional[str] = None) -> Dict[str, str]: - """ - Gets a span context as a dict. This can be used instead of manually - injecting a span into an empty carrier. - - Args: - destination: the name of the remote server. - - Returns: - dict: the active span's context if opentracing is enabled, otherwise empty. - """ - - if destination and not whitelisted_homeserver(destination): - return {} - - carrier: Dict[str, str] = {} - assert opentracing.tracer.active_span is not None - opentracing.tracer.inject( - opentracing.tracer.active_span.context, opentracing.Format.TEXT_MAP, carrier - ) - - return carrier - - -@ensure_active_span("get the span context as a string.", ret={}) -def active_span_context_as_string() -> str: - """ - Returns: - The active span context encoded as a string. - """ - carrier: Dict[str, str] = {} - if opentracing: - assert opentracing.tracer.active_span is not None - opentracing.tracer.inject( - opentracing.tracer.active_span.context, opentracing.Format.TEXT_MAP, carrier - ) - return json_encoder.encode(carrier) - - -def span_context_from_request(request: Request) -> "Optional[opentracing.SpanContext]": - """Extract an opentracing context from the headers on an HTTP request - - This is useful when we have received an HTTP request from another part of our - system, and want to link our spans to those of the remote system. - """ - if not opentracing: - return None - header_dict = { - k.decode(): v[0].decode() for k, v in request.requestHeaders.getAllRawHeaders() - } - return opentracing.tracer.extract(opentracing.Format.HTTP_HEADERS, header_dict) - - -@only_if_tracing -def span_context_from_string(carrier: str) -> Optional["opentracing.SpanContext"]: - """ - Returns: - The active span context decoded from a string. - """ - payload: Dict[str, str] = json_decoder.decode(carrier) - return opentracing.tracer.extract(opentracing.Format.TEXT_MAP, payload) - - -@only_if_tracing -def extract_text_map(carrier: Dict[str, str]) -> Optional["opentracing.SpanContext"]: - """ - Wrapper method for opentracing's tracer.extract for TEXT_MAP. - Args: - carrier: a dict possibly containing a span context. - - Returns: - The active span context extracted from carrier. - """ - return opentracing.tracer.extract(opentracing.Format.TEXT_MAP, carrier) - - -# Tracing decorators - - -def trace_with_opname(opname: str) -> Callable[[Callable[P, R]], Callable[P, R]]: - """ - Decorator to trace a function with a custom opname. - - See the module's doc string for usage examples. - - """ - - def decorator(func: Callable[P, R]) -> Callable[P, R]: - if opentracing is None: - return func # type: ignore[unreachable] - - if inspect.iscoroutinefunction(func): - - @wraps(func) - async def _trace_inner(*args: P.args, **kwargs: P.kwargs) -> R: - with start_active_span(opname): - return await func(*args, **kwargs) # type: ignore[misc] - - else: - # The other case here handles both sync functions and those - # decorated with inlineDeferred. - @wraps(func) - def _trace_inner(*args: P.args, **kwargs: P.kwargs) -> R: - scope = start_active_span(opname) - scope.__enter__() - - try: - result = func(*args, **kwargs) - if isinstance(result, defer.Deferred): - - def call_back(result: R) -> R: - scope.__exit__(None, None, None) - return result - - def err_back(result: R) -> R: - scope.__exit__(None, None, None) - return result - - result.addCallbacks(call_back, err_back) - - else: - if inspect.isawaitable(result): - logger.error( - "@trace may not have wrapped %s correctly! " - "The function is not async but returned a %s.", - func.__qualname__, - type(result).__name__, - ) - - scope.__exit__(None, None, None) - - return result - - except Exception as e: - scope.__exit__(type(e), None, e.__traceback__) - raise - - return _trace_inner # type: ignore[return-value] - - return decorator - - -def trace(func: Callable[P, R]) -> Callable[P, R]: - """ - Decorator to trace a function. - - Sets the operation name to that of the function's name. - - See the module's doc string for usage examples. - """ - - return trace_with_opname(func.__name__)(func) - - -def tag_args(func: Callable[P, R]) -> Callable[P, R]: - """ - Tags all of the args to the active span. - """ - - if not opentracing: - return func - - @wraps(func) - def _tag_args_inner(*args: P.args, **kwargs: P.kwargs) -> R: - argspec = inspect.getfullargspec(func) - for i, arg in enumerate(argspec.args[1:]): - set_tag("ARG_" + arg, str(args[i])) # type: ignore[index] - set_tag("args", str(args[len(argspec.args) :])) # type: ignore[index] - set_tag("kwargs", str(kwargs)) - return func(*args, **kwargs) - - return _tag_args_inner - - -@contextlib.contextmanager -def trace_servlet( - request: "SynapseRequest", extract_context: bool = False -) -> Generator[None, None, None]: - """Returns a context manager which traces a request. It starts a span - with some servlet specific tags such as the request metrics name and - request information. - - Args: - request - extract_context: Whether to attempt to extract the opentracing - context from the request the servlet is handling. - """ - - if opentracing is None: - yield # type: ignore[unreachable] - return - - request_tags = { - SynapseTags.REQUEST_ID: request.get_request_id(), - tags.SPAN_KIND: tags.SPAN_KIND_RPC_SERVER, - tags.HTTP_METHOD: request.get_method(), - tags.HTTP_URL: request.get_redacted_uri(), - tags.PEER_HOST_IPV6: request.getClientAddress().host, - } - - request_name = request.request_metrics.name - context = span_context_from_request(request) if extract_context else None - - # we configure the scope not to finish the span immediately on exit, and instead - # pass the span into the SynapseRequest, which will finish it once we've finished - # sending the response to the client. - scope = start_active_span(request_name, child_of=context, finish_on_close=False) - request.set_opentracing_span(scope.span) - - with scope: - inject_response_headers(request.responseHeaders) - try: - yield - finally: - # We set the operation name again in case its changed (which happens - # with JsonResource). - scope.span.set_operation_name(request.request_metrics.name) - - # set the tags *after* the servlet completes, in case it decided to - # prioritise the span (tags will get dropped on unprioritised spans) - request_tags[ - SynapseTags.REQUEST_TAG - ] = request.request_metrics.start_context.tag - - for k, v in request_tags.items(): - scope.span.set_tag(k, v) diff --git a/synapse/logging/scopecontextmanager.py b/synapse/logging/scopecontextmanager.py deleted file mode 100644
index 10877bdfc5..0000000000 --- a/synapse/logging/scopecontextmanager.py +++ /dev/null
@@ -1,171 +0,0 @@ -# Copyright 2019 The Matrix.org Foundation C.I.C. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License.import logging - -import logging -from types import TracebackType -from typing import Optional, Type - -from opentracing import Scope, ScopeManager, Span - -import twisted - -from synapse.logging.context import ( - LoggingContext, - current_context, - nested_logging_context, -) - -logger = logging.getLogger(__name__) - - -class LogContextScopeManager(ScopeManager): - """ - The LogContextScopeManager tracks the active scope in opentracing - by using the log contexts which are native to synapse. This is so - that the basic opentracing api can be used across twisted defereds. - - It would be nice just to use opentracing's ContextVarsScopeManager, - but currently that doesn't work due to https://twistedmatrix.com/trac/ticket/10301. - """ - - def __init__(self) -> None: - pass - - @property - def active(self) -> Optional[Scope]: - """ - Returns the currently active Scope which can be used to access the - currently active Scope.span. - If there is a non-null Scope, its wrapped Span - becomes an implicit parent of any newly-created Span at - Tracer.start_active_span() time. - - Return: - The Scope that is active, or None if not available. - """ - ctx = current_context() - return ctx.scope - - def activate(self, span: Span, finish_on_close: bool) -> Scope: - """ - Makes a Span active. - Args - span: the span that should become active. - finish_on_close: whether Span should be automatically finished when - Scope.close() is called. - - Returns: - Scope to control the end of the active period for - *span*. It is a programming error to neglect to call - Scope.close() on the returned instance. - """ - - ctx = current_context() - - if not ctx: - logger.error("Tried to activate scope outside of loggingcontext") - return Scope(None, span) # type: ignore[arg-type] - - if ctx.scope is not None: - # start a new logging context as a child of the existing one. - # Doing so -- rather than updating the existing logcontext -- means that - # creating several concurrent spans under the same logcontext works - # correctly. - ctx = nested_logging_context("") - enter_logcontext = True - else: - # if there is no span currently associated with the current logcontext, we - # just store the scope in it. - # - # This feels a bit dubious, but it does hack around a problem where a - # span outlasts its parent logcontext (which would otherwise lead to - # "Re-starting finished log context" errors). - enter_logcontext = False - - scope = _LogContextScope(self, span, ctx, enter_logcontext, finish_on_close) - ctx.scope = scope - if enter_logcontext: - ctx.__enter__() - - return scope - - -class _LogContextScope(Scope): - """ - A custom opentracing scope, associated with a LogContext - - * filters out _DefGen_Return exceptions which arise from calling - `defer.returnValue` in Twisted code - - * When the scope is closed, the logcontext's active scope is reset to None. - and - if enter_logcontext was set - the logcontext is finished too. - """ - - def __init__( - self, - manager: LogContextScopeManager, - span: Span, - logcontext: LoggingContext, - enter_logcontext: bool, - finish_on_close: bool, - ): - """ - Args: - manager: - the manager that is responsible for this scope. - span: - the opentracing span which this scope represents the local - lifetime for. - logcontext: - the log context to which this scope is attached. - enter_logcontext: - if True the log context will be exited when the scope is finished - finish_on_close: - if True finish the span when the scope is closed - """ - super().__init__(manager, span) - self.logcontext = logcontext - self._finish_on_close = finish_on_close - self._enter_logcontext = enter_logcontext - - def __exit__( - self, - exc_type: Optional[Type[BaseException]], - value: Optional[BaseException], - traceback: Optional[TracebackType], - ) -> None: - if exc_type == twisted.internet.defer._DefGen_Return: - # filter out defer.returnValue() calls - exc_type = value = traceback = None - super().__exit__(exc_type, value, traceback) - - def __str__(self) -> str: - return f"Scope<{self.span}>" - - def close(self) -> None: - active_scope = self.manager.active - if active_scope is not self: - logger.error( - "Closing scope %s which is not the currently-active one %s", - self, - active_scope, - ) - - if self._finish_on_close: - self.span.finish() - - self.logcontext.scope = None - - if self._enter_logcontext: - self.logcontext.__exit__(None, None, None) diff --git a/synapse/logging/tracing.py b/synapse/logging/tracing.py new file mode 100644
index 0000000000..e3a1a010a2 --- /dev/null +++ b/synapse/logging/tracing.py
@@ -0,0 +1,942 @@ +# Copyright 2019 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# NOTE This is a small wrapper around opentelemetry because tracing is optional +# and not always packaged downstream. Since opentelemetry instrumentation is +# fairly invasive it was awkward to make it optional. As a result we opted to +# encapsulate all opentelemetry state in these methods which effectively noop if +# opentelemetry is not present. We should strongly consider encouraging the +# downstream distributers to package opentelemetry and making opentelemetry a +# full dependency. In order to facilitate this move the methods have work very +# similarly to opentelemetry's and it should only be a matter of few regexes to +# move over to opentelemetry's access patterns proper. + +""" +============================ +Using OpenTelemetry in Synapse +============================ + +Python-specific tracing concepts are at +https://opentelemetry.io/docs/instrumentation/python/. Note that Synapse wraps +OpenTelemetry in a small module (this one) in order to make the OpenTelemetry +dependency optional. That means that some access patterns are different to those +demonstrated in the OpenTelemetry guides. However, it is still useful to know, +especially if OpenTelemetry is included as a full dependency in the future or if +you are modifying this module. + + +OpenTelemetry is encapsulated so that no span objects from OpenTelemetry are +exposed in Synapse's code. This allows OpenTelemetry to be easily disabled in +Synapse and thereby have OpenTelemetry as an optional dependency. This does +however limit the number of modifiable spans at any point in the code to one. +From here out references to `tracing` in the code snippets refer to the Synapses +module. Most methods provided in the module have a direct correlation to those +provided by OpenTelemetry. Refer to docs there for a more in-depth documentation +on some of the args and methods. + +Tracing +------- + +In Synapse, it is not possible to start a non-active span. Spans can be started +using the ``start_active_span`` method. This returns a context manager that +needs to be entered and exited to expose the ``span``. This is usually done by +using a ``with`` statement. + +.. code-block:: python + + from synapse.logging.tracing import start_active_span + + with start_active_span("operation name"): + # Do something we want to trace + +Forgetting to enter or exit a scope will result in unstarted and unfinished +spans that will not be reported (exported). + +At anytime where there is an active span ``set_attribute`` can be +used to set a tag on the current active span. + +Tracing functions +----------------- + +Functions can be easily traced using decorators. The name of the function +becomes the operation name for the span. + +.. code-block:: python + + from synapse.logging.tracing import trace + + # Start a span using 'interesting_function' as the operation name + @trace + def interesting_function(*args, **kwargs): + # Does all kinds of cool and expected things return + something_usual_and_useful + + +Operation names can be explicitly set for a function by using +``trace_with_opname``: + +.. code-block:: python + + from synapse.logging.tracing import trace_with_opname + + @trace_with_opname("a_better_operation_name") + def interesting_badly_named_function(*args, **kwargs): + # Does all kinds of cool and expected things return + something_usual_and_useful + +Setting Tags +------------ + +To set a tag on the active span do + +.. code-block:: python + + from synapse.logging.tracing import set_attribute + + set_attribute(tag_name, tag_value) + +There's a convenient decorator to tag all the args of the method. It uses +inspection in order to use the formal parameter names prefixed with 'ARG_' as +tag names. It uses kwarg names as tag names without the prefix. + +.. code-block:: python + from synapse.logging.tracing import tag_args + @tag_args + def set_fates(clotho, lachesis, atropos, father="Zues", mother="Themis"): + pass + set_fates("the story", "the end", "the act") + # This will have the following tags + # - ARG_clotho: "the story" + # - ARG_lachesis: "the end" + # - ARG_atropos: "the act" + # - father: "Zues" + # - mother: "Themis" + +Contexts and carriers +--------------------- + +There are a selection of wrappers for injecting and extracting contexts from +carriers provided. We use these to inject of OpenTelemetry Contexts into +Twisted's http headers, EDU contents and our database tables. Please refer to +the end of ``logging/tracing.py`` for the available injection and extraction +methods. + +Homeserver whitelisting +----------------------- + +Most of the whitelist checks are encapsulated in the modules's injection and +extraction method but be aware that using custom carriers or crossing +unchartered waters will require the enforcement of the whitelist. +``logging/tracing.py`` has a ``whitelisted_homeserver`` method which takes +in a destination and compares it to the whitelist. + +Most injection methods take a 'destination' arg. The context will only be +injected if the destination matches the whitelist or the destination is None. + +======= +Gotchas +======= + +- Checking whitelists on span propagation +- Inserting pii +- Forgetting to enter or exit a scope +- Span source: make sure that the span you expect to be active across a function + call really will be that one. Does the current function have more than one + caller? Will all of those calling functions have be in a context with an + active span? +""" +import contextlib +import inspect +import logging +import re +from abc import ABC +from functools import wraps +from typing import ( + TYPE_CHECKING, + Any, + Callable, + ContextManager, + Dict, + Generator, + Iterable, + List, + Optional, + Pattern, + Sequence, + TypeVar, + Union, + cast, + overload, +) + +from typing_extensions import ParamSpec + +from twisted.internet import defer +from twisted.web.http import Request +from twisted.web.http_headers import Headers + +from synapse.api.constants import EventContentFields +from synapse.config import ConfigError +from synapse.util import json_decoder + +if TYPE_CHECKING: + from synapse.http.site import SynapseRequest + from synapse.server import HomeServer + +# Helper class + +T = TypeVar("T") + + +class _DummyLookup(object): + """This will always returns the fixed value given for any accessed property""" + + def __init__(self, value: T) -> None: + self.value = value + + def __getattribute__(self, name: str) -> T: + return object.__getattribute__(self, "value") + + +class DummyLink(ABC): + """Dummy placeholder for `opentelemetry.trace.Link`""" + + def __init__(self) -> None: + self.not_implemented_message = ( + "opentelemetry isn't installed so this is just a dummy link placeholder" + ) + + @property + def context(self) -> None: + raise NotImplementedError(self.not_implemented_message) + + @property + def attributes(self) -> None: + raise NotImplementedError(self.not_implemented_message) + + +# These dependencies are optional so they can fail to import +# and we +try: + import opentelemetry + import opentelemetry.exporter.jaeger.thrift + import opentelemetry.propagate + import opentelemetry.sdk.resources + import opentelemetry.sdk.trace + import opentelemetry.sdk.trace.export + import opentelemetry.semconv.trace + import opentelemetry.trace + import opentelemetry.trace.propagation + import opentelemetry.trace.status + + SpanKind = opentelemetry.trace.SpanKind + SpanAttributes = opentelemetry.semconv.trace.SpanAttributes + StatusCode = opentelemetry.trace.status.StatusCode + Link = opentelemetry.trace.Link +except ImportError: + opentelemetry = None # type: ignore[assignment] + SpanKind = _DummyLookup(0) # type: ignore + SpanAttributes = _DummyLookup("fake-attribute") # type: ignore + StatusCode = _DummyLookup(0) # type: ignore + Link = DummyLink # type: ignore + + +logger = logging.getLogger(__name__) + + +class SynapseTags: + """FIXME: Rename to `SynapseAttributes` so it matches OpenTelemetry `SpanAttributes`""" + + # The message ID of any to_device message processed + TO_DEVICE_MESSAGE_ID = "to_device.message_id" + + # Whether the sync response has new data to be returned to the client. + SYNC_RESULT = "sync.new_data" + + # incoming HTTP request ID (as written in the logs) + REQUEST_ID = "request_id" + + # HTTP request tag (used to distinguish full vs incremental syncs, etc) + REQUEST_TAG = "request_tag" + + # Text description of a database transaction + DB_TXN_DESC = "db.txn_desc" + + # Uniqueish ID of a database transaction + DB_TXN_ID = "db.txn_id" + + # The name of the external cache + CACHE_NAME = "cache.name" + + +class SynapseBaggage: + FORCE_TRACING = "synapse-force-tracing" + + +# Block everything by default +# A regex which matches the server_names to expose traces for. +# None means 'block everything'. +_homeserver_whitelist: Optional[Pattern[str]] = None + +# Util methods + + +P = ParamSpec("P") +R = TypeVar("R") + + +def only_if_tracing(func: Callable[P, R]) -> Callable[P, Optional[R]]: + """Decorator function that executes the function only if we're tracing. Otherwise returns None.""" + + @wraps(func) + def _only_if_tracing_inner(*args: P.args, **kwargs: P.kwargs) -> Optional[R]: + if opentelemetry: + return func(*args, **kwargs) + else: + return None + + return _only_if_tracing_inner + + +@overload +def ensure_active_span( + message: str, +) -> Callable[[Callable[P, R]], Callable[P, Optional[R]]]: + ... + + +@overload +def ensure_active_span( + message: str, ret: T +) -> Callable[[Callable[P, R]], Callable[P, Union[T, R]]]: + ... + + +def ensure_active_span( + message: str, ret: Optional[T] = None +) -> Callable[[Callable[P, R]], Callable[P, Union[Optional[T], R]]]: + """Executes the operation only if opentelemetry is enabled and there is an active span. + If there is no active span it logs message at the error level. + + Args: + message: Message which fills in "There was no active span when trying to %s" + in the error log if there is no active span and opentelemetry is enabled. + ret: return value if opentelemetry is None or there is no active span. + + Returns: + The result of the func, falling back to ret if opentelemetry is disabled or there + was no active span. + """ + + def ensure_active_span_inner_1( + func: Callable[P, R] + ) -> Callable[P, Union[Optional[T], R]]: + @wraps(func) + def ensure_active_span_inner_2( + *args: P.args, **kwargs: P.kwargs + ) -> Union[Optional[T], R]: + if not opentelemetry: + return ret + + if not opentelemetry.trace.get_current_span(): + logger.error( + "There was no active span when trying to %s." + " Did you forget to start one or did a context slip?", + message, + stack_info=True, + ) + + return ret + + return func(*args, **kwargs) + + return ensure_active_span_inner_2 + + return ensure_active_span_inner_1 + + +# Setup + + +def init_tracer(hs: "HomeServer") -> None: + """Set the whitelists and initialise the OpenTelemetry tracer""" + global opentelemetry + if not hs.config.tracing.tracing_enabled: + # We don't have a tracer + opentelemetry = None # type: ignore[assignment] + return + + if not opentelemetry: + raise ConfigError( + "The server has been configured to use OpenTelemetry but OpenTelemetry is not " + "installed." + ) + + # Pull out of the config if it was given. Otherwise set it to something sensible. + set_homeserver_whitelist(hs.config.tracing.homeserver_whitelist) + + resource = opentelemetry.sdk.resources.Resource( + attributes={ + opentelemetry.sdk.resources.SERVICE_NAME: f"{hs.config.server.server_name} {hs.get_instance_name()}" + } + ) + + # TODO: `force_tracing_for_users` is not compatible with OTEL samplers + # because you can only determine `opentelemetry.trace.TraceFlags.SAMPLED` + # and whether it uses a recording span when the span is created and we don't + # have enough information at that time (we can determine in + # `synapse/api/auth.py`). There isn't a way to change the trace flags after + # the fact so there is no way to programmatically force + # recording/tracing/sampling like there was in opentracing. + sampler = opentelemetry.sdk.trace.sampling.ParentBasedTraceIdRatio( + hs.config.tracing.sample_rate + ) + + tracer_provider = opentelemetry.sdk.trace.TracerProvider( + resource=resource, sampler=sampler + ) + + # consoleProcessor = opentelemetry.sdk.trace.export.BatchSpanProcessor( + # opentelemetry.sdk.trace.export.ConsoleSpanExporter() + # ) + # tracer_provider.add_span_processor(consoleProcessor) + + jaeger_exporter = opentelemetry.exporter.jaeger.thrift.JaegerExporter( + **hs.config.tracing.jaeger_exporter_config + ) + jaeger_processor = opentelemetry.sdk.trace.export.BatchSpanProcessor( + jaeger_exporter + ) + tracer_provider.add_span_processor(jaeger_processor) + + # Sets the global default tracer provider + opentelemetry.trace.set_tracer_provider(tracer_provider) + + +# Whitelisting + + +@only_if_tracing +def set_homeserver_whitelist(homeserver_whitelist: Iterable[str]) -> None: + """Sets the homeserver whitelist + + Args: + homeserver_whitelist: regexes specifying whitelisted homeservers + """ + global _homeserver_whitelist + if homeserver_whitelist: + # Makes a single regex which accepts all passed in regexes in the list + _homeserver_whitelist = re.compile( + "({})".format(")|(".join(homeserver_whitelist)) + ) + + +@only_if_tracing +def whitelisted_homeserver(destination: str) -> bool: + """Checks if a destination matches the whitelist + + Args: + destination + """ + + if _homeserver_whitelist: + return _homeserver_whitelist.match(destination) is not None + return False + + +# Start spans and scopes + + +def use_span( + span: "opentelemetry.trace.Span", + end_on_exit: bool = True, +) -> ContextManager["opentelemetry.trace.Span"]: + if opentelemetry is None: + return contextlib.nullcontext() # type: ignore[unreachable] + + return opentelemetry.trace.use_span(span=span, end_on_exit=end_on_exit) + + +def create_non_recording_span() -> "opentelemetry.trace.Span": + """Create a no-op span that does not record or become part of a recorded trace""" + + return opentelemetry.trace.NonRecordingSpan( + opentelemetry.trace.INVALID_SPAN_CONTEXT + ) + + +def start_span( + name: str, + *, + context: Optional["opentelemetry.context.context.Context"] = None, + kind: Optional["opentelemetry.trace.SpanKind"] = SpanKind.INTERNAL, + attributes: "opentelemetry.util.types.Attributes" = None, + links: Optional[Sequence["opentelemetry.trace.Link"]] = None, + start_time: Optional[int] = None, + record_exception: bool = True, + set_status_on_exception: bool = True, + end_on_exit: bool = True, + # For testing only + tracer: Optional["opentelemetry.trace.Tracer"] = None, +) -> "opentelemetry.trace.Span": + if opentelemetry is None: + raise Exception("Not able to create span without opentelemetry installed.") + + if tracer is None: + tracer = opentelemetry.trace.get_tracer(__name__) + + # TODO: Why is this necessary to satisfy this error? It has a default? + # ` error: Argument "kind" to "start_span" of "Tracer" has incompatible type "Optional[SpanKind]"; expected "SpanKind" [arg-type]` + if kind is None: + kind = SpanKind.INTERNAL + + return tracer.start_span( + name=name, + context=context, + kind=kind, + attributes=attributes, + links=links, + start_time=start_time, + record_exception=record_exception, + set_status_on_exception=set_status_on_exception, + ) + + +def start_active_span( + name: str, + *, + context: Optional["opentelemetry.context.context.Context"] = None, + kind: Optional["opentelemetry.trace.SpanKind"] = SpanKind.INTERNAL, + attributes: "opentelemetry.util.types.Attributes" = None, + links: Optional[Sequence["opentelemetry.trace.Link"]] = None, + start_time: Optional[int] = None, + record_exception: bool = True, + set_status_on_exception: bool = True, + end_on_exit: bool = True, + # For testing only + tracer: Optional["opentelemetry.trace.Tracer"] = None, +) -> ContextManager["opentelemetry.trace.Span"]: + if opentelemetry is None: + return contextlib.nullcontext() # type: ignore[unreachable] + + # TODO: Why is this necessary to satisfy this error? It has a default? + # ` error: Argument "kind" to "start_span" of "Tracer" has incompatible type "Optional[SpanKind]"; expected "SpanKind" [arg-type]` + if kind is None: + kind = SpanKind.INTERNAL + + span = start_span( + name=name, + context=context, + kind=kind, + attributes=attributes, + links=links, + start_time=start_time, + record_exception=record_exception, + set_status_on_exception=set_status_on_exception, + tracer=tracer, + ) + + # Equivalent to `tracer.start_as_current_span` + return opentelemetry.trace.use_span( + span, + end_on_exit=end_on_exit, + record_exception=record_exception, + set_status_on_exception=set_status_on_exception, + ) + + +def start_active_span_from_edu( + operation_name: str, + *, + edu_content: Dict[str, Any], +) -> ContextManager["opentelemetry.trace.Span"]: + """ + Extracts a span context from an edu and uses it to start a new active span + + Args: + operation_name: The label for the chunk of time used to process the given edu. + edu_content: an edu_content with a `context` field whose value is + canonical json for a dict which contains tracing information. + """ + if opentelemetry is None: + return contextlib.nullcontext() # type: ignore[unreachable] + + carrier = json_decoder.decode( + edu_content.get(EventContentFields.TRACING_CONTEXT, "{}") + ) + + context = extract_text_map(carrier) + + return start_active_span(name=operation_name, context=context) + + +# OpenTelemetry setters for attributes, logs, etc +@only_if_tracing +def get_active_span() -> Optional["opentelemetry.trace.Span"]: + """Get the currently active span, if any""" + return opentelemetry.trace.get_current_span() + + +def get_span_context_from_context( + context: "opentelemetry.context.context.Context", +) -> Optional["opentelemetry.trace.SpanContext"]: + """Utility function to convert a `Context` to a `SpanContext` + + Based on https://github.com/open-telemetry/opentelemetry-python/blob/43288ca9a36144668797c11ca2654836ec8b5e99/opentelemetry-api/src/opentelemetry/trace/propagation/tracecontext.py#L99-L102 + """ + span = opentelemetry.trace.get_current_span(context=context) + span_context = span.get_span_context() + if span_context == opentelemetry.trace.INVALID_SPAN_CONTEXT: + return None + return span_context + + +def get_context_from_span( + span: "opentelemetry.trace.Span", +) -> "opentelemetry.context.context.Context": + # This doesn't affect the current context at all, it just converts a span + # into `Context` object basically (bad name). + ctx = opentelemetry.trace.propagation.set_span_in_context(span) + return ctx + + +@ensure_active_span("set a tag") +def set_attribute(key: str, value: Union[str, bool, int, float]) -> None: + """Sets a tag on the active span""" + active_span = get_active_span() + assert active_span is not None + active_span.set_attribute(key, value) + + +@ensure_active_span("set the status") +def set_status( + status_code: "opentelemetry.trace.status.StatusCode", exc: Optional[Exception] +) -> None: + """Sets a tag on the active span""" + active_span = get_active_span() + assert active_span is not None + active_span.set_status(opentelemetry.trace.status.Status(status_code=status_code)) + if exc: + active_span.record_exception(exc) + + +DEFAULT_LOG_NAME = "log" + + +@ensure_active_span("log") +def log_kv(key_values: Dict[str, Any], timestamp: Optional[int] = None) -> None: + """Log to the active span""" + active_span = get_active_span() + assert active_span is not None + event_name = key_values.get("event", DEFAULT_LOG_NAME) + active_span.add_event(event_name, attributes=key_values, timestamp=timestamp) + + +@only_if_tracing +def force_tracing(span: Optional["opentelemetry.trace.Span"] = None) -> None: + """Force sampling for the active/given span and its children. + + Args: + span: span to force tracing for. By default, the active span. + """ + # TODO + pass + + +def is_context_forced_tracing( + context: "opentelemetry.context.context.Context", +) -> bool: + """Check if sampling has been force for the given span context.""" + # TODO + return False + + +# Injection and extraction + + +@ensure_active_span("inject the active tracing context into a header dict") +def inject_active_tracing_context_into_header_dict( + headers: Dict[bytes, List[bytes]], + destination: Optional[str] = None, + check_destination: bool = True, +) -> None: + """ + Injects the active tracing context into a dict of HTTP headers + + Args: + headers: the dict to inject headers into + destination: address of entity receiving the span context. Must be given unless + `check_destination` is False. + check_destination (bool): If False, destination will be ignored and the context + will always be injected. If True, the context will only be injected if the + destination matches the tracing allowlist + + Note: + The headers set by the tracer are custom to the tracer implementation which + should be unique enough that they don't interfere with any headers set by + synapse or twisted. + """ + if check_destination: + if destination is None: + raise ValueError( + "destination must be given unless check_destination is False" + ) + if not whitelisted_homeserver(destination): + return + + active_span = get_active_span() + assert active_span is not None + ctx = get_context_from_span(active_span) + + propagator = opentelemetry.propagate.get_global_textmap() + # Put all of SpanContext properties into the headers dict + propagator.inject(headers, context=ctx) + + +def inject_trace_id_into_response_headers(response_headers: Headers) -> None: + """Inject the current trace id into the HTTP response headers""" + if not opentelemetry: + return + active_span = get_active_span() + if not active_span: + return + + trace_id = active_span.get_span_context().trace_id + + if trace_id is not None: + response_headers.addRawHeader("Synapse-Trace-Id", f"{trace_id:x}") + + +@ensure_active_span( + "get the active span context as a dict", ret=cast(Dict[str, str], {}) +) +def get_active_span_text_map(destination: Optional[str] = None) -> Dict[str, str]: + """ + Gets the active tracing Context serialized as a dict. This can be used + instead of manually injecting a span into an empty carrier. + + Args: + destination: the name of the remote server. + + Returns: + dict: the serialized active span's context if opentelemetry is enabled, otherwise + empty. + """ + if destination and not whitelisted_homeserver(destination): + return {} + + active_span = get_active_span() + assert active_span is not None + ctx = get_context_from_span(active_span) + + carrier_text_map: Dict[str, str] = {} + propagator = opentelemetry.propagate.get_global_textmap() + # Put all of Context properties onto the carrier text map that we can return + propagator.inject(carrier_text_map, context=ctx) + + return carrier_text_map + + +def context_from_request( + request: Request, +) -> Optional["opentelemetry.context.context.Context"]: + """Extract an opentelemetry context from the headers on an HTTP request + + This is useful when we have received an HTTP request from another part of our + system, and want to link our spans to those of the remote system. + """ + if not opentelemetry: + return None + header_dict = { + k.decode(): v[0].decode() for k, v in request.requestHeaders.getAllRawHeaders() + } + + # Extract all of the relevant values from the headers to construct a + # SpanContext to return. + return extract_text_map(header_dict) + + +@only_if_tracing +def extract_text_map( + carrier: Dict[str, str] +) -> Optional["opentelemetry.context.context.Context"]: + """ + Wrapper method for opentelemetry's propagator.extract for TEXT_MAP. + Args: + carrier: a dict possibly containing a context. + + Returns: + The active context extracted from carrier. + """ + propagator = opentelemetry.propagate.get_global_textmap() + # Extract all of the relevant values from the `carrier` to construct a + # Context to return. + return propagator.extract(carrier) + + +# Tracing decorators + + +def trace_with_opname(opname: str) -> Callable[[Callable[P, R]], Callable[P, R]]: + """ + Decorator to trace a function with a custom opname. + + See the module's doc string for usage examples. + + """ + + def decorator(func: Callable[P, R]) -> Callable[P, R]: + if opentelemetry is None: + return func # type: ignore[unreachable] + + if inspect.iscoroutinefunction(func): + + @wraps(func) + async def _trace_inner(*args: P.args, **kwargs: P.kwargs) -> R: + with start_active_span(opname): + return await func(*args, **kwargs) # type: ignore[misc] + + else: + # The other case here handles both sync functions and those + # decorated with inlineDeferred. + @wraps(func) + def _trace_inner(*args: P.args, **kwargs: P.kwargs) -> R: + scope = start_active_span(opname) + scope.__enter__() + + try: + result = func(*args, **kwargs) + if isinstance(result, defer.Deferred): + + def call_back(result: R) -> R: + scope.__exit__(None, None, None) + return result + + def err_back(result: R) -> R: + scope.__exit__(None, None, None) + return result + + result.addCallbacks(call_back, err_back) + + else: + if inspect.isawaitable(result): + logger.error( + "@trace may not have wrapped %s correctly! " + "The function is not async but returned a %s.", + func.__qualname__, + type(result).__name__, + ) + + scope.__exit__(None, None, None) + + return result + + except Exception as e: + scope.__exit__(type(e), None, e.__traceback__) + raise + + return _trace_inner # type: ignore[return-value] + + return decorator + + +def trace(func: Callable[P, R]) -> Callable[P, R]: + """ + Decorator to trace a function. + + Sets the operation name to that of the function's name. + + See the module's doc string for usage examples. + """ + + return trace_with_opname(func.__name__)(func) + + +def tag_args(func: Callable[P, R]) -> Callable[P, R]: + """ + Tags all of the args to the active span. + """ + + if not opentelemetry: + return func + + @wraps(func) + def _tag_args_inner(*args: P.args, **kwargs: P.kwargs) -> R: + argspec = inspect.getfullargspec(func) + for i, arg in enumerate(argspec.args[1:]): + set_attribute("ARG_" + arg, str(args[i])) # type: ignore[index] + set_attribute("args", str(args[len(argspec.args) :])) # type: ignore[index] + set_attribute("kwargs", str(kwargs)) + return func(*args, **kwargs) + + return _tag_args_inner + + +@contextlib.contextmanager +def trace_servlet( + request: "SynapseRequest", extract_context: bool = False +) -> Generator[None, None, None]: + """Returns a context manager which traces a request. It starts a span + with some servlet specific tags such as the request metrics name and + request information. + + Args: + request + extract_context: Whether to attempt to extract the tracing + context from the request the servlet is handling. + """ + + if opentelemetry is None: + yield # type: ignore[unreachable] + return + + attrs = { + SynapseTags.REQUEST_ID: request.get_request_id(), + SpanAttributes.HTTP_METHOD: request.get_method(), + SpanAttributes.HTTP_URL: request.get_redacted_uri(), + SpanAttributes.HTTP_HOST: request.getClientAddress().host, + } + + request_name = request.request_metrics.name + tracing_context = context_from_request(request) if extract_context else None + + # This is will end up being the root span for all of servlet traces and we + # aren't able to determine whether to force tracing yet. We can determine + # whether to force trace later in `synapse/api/auth.py`. + with start_active_span( + request_name, + kind=SpanKind.SERVER, + context=tracing_context, + attributes=attrs, + # we configure the span not to finish immediately on exiting the scope, + # and instead pass the span into the SynapseRequest (via + # `request.set_tracing_span(span)`), which will finish it once we've + # finished sending the response to the client. + end_on_exit=False, + ) as span: + request.set_tracing_span(span) + + inject_trace_id_into_response_headers(request.responseHeaders) + try: + yield + finally: + # We set the operation name again in case its changed (which happens + # with JsonResource). + span.update_name(request.request_metrics.name) + + if request.request_metrics.start_context.tag is not None: + span.set_attribute( + SynapseTags.REQUEST_TAG, request.request_metrics.start_context.tag + ) diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py
index 7a1516d3a8..59d956dd9d 100644 --- a/synapse/metrics/background_process_metrics.py +++ b/synapse/metrics/background_process_metrics.py
@@ -42,7 +42,7 @@ from synapse.logging.context import ( LoggingContext, PreserveLoggingContext, ) -from synapse.logging.opentracing import SynapseTags, start_active_span +from synapse.logging.tracing import SynapseTags, start_active_span from synapse.metrics._types import Collector if TYPE_CHECKING: @@ -208,7 +208,7 @@ def run_as_background_process( Args: desc: a description for this background process type func: a function, which may return a Deferred or a coroutine - bg_start_span: Whether to start an opentracing span. Defaults to True. + bg_start_span: Whether to start an tracing span. Defaults to True. Should only be disabled for processes that will not log to or tag a span. args: positional args for func @@ -232,7 +232,8 @@ def run_as_background_process( try: if bg_start_span: ctx = start_active_span( - f"bgproc.{desc}", tags={SynapseTags.REQUEST_ID: str(context)} + f"bgproc.{desc}", + attributes={SynapseTags.REQUEST_ID: str(context)}, ) else: ctx = nullcontext() # type: ignore[assignment] diff --git a/synapse/notifier.py b/synapse/notifier.py
index c42bb8266a..8fd8cb8100 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py
@@ -39,7 +39,7 @@ from synapse.events import EventBase from synapse.handlers.presence import format_user_presence_state from synapse.logging import issue9533_logger from synapse.logging.context import PreserveLoggingContext -from synapse.logging.opentracing import log_kv, start_active_span +from synapse.logging.tracing import log_kv, start_active_span from synapse.metrics import LaterGauge from synapse.streams.config import PaginationConfig from synapse.types import ( @@ -536,7 +536,7 @@ class Notifier: log_kv( { "wait_for_events": "sleep", - "token": prev_token, + "token": str(prev_token), } ) @@ -546,7 +546,7 @@ class Notifier: log_kv( { "wait_for_events": "woken", - "token": user_stream.current_token, + "token": str(user_stream.current_token), } ) diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index e96fb45e9f..11299367d2 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py
@@ -23,7 +23,7 @@ from twisted.internet.interfaces import IDelayedCall from synapse.api.constants import EventTypes from synapse.events import EventBase -from synapse.logging import opentracing +from synapse.logging import tracing from synapse.metrics.background_process_metrics import run_as_background_process from synapse.push import Pusher, PusherConfig, PusherConfigException from synapse.storage.databases.main.event_push_actions import HttpPushAction @@ -198,9 +198,9 @@ class HttpPusher(Pusher): ) for push_action in unprocessed: - with opentracing.start_active_span( + with tracing.start_active_span( "http-push", - tags={ + attributes={ "authenticated_entity": self.user_id, "event_id": push_action.event_id, "app_id": self.app_id, diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py
index 561ad5bf04..af160e31aa 100644 --- a/synapse/replication/http/_base.py +++ b/synapse/replication/http/_base.py
@@ -28,8 +28,8 @@ from synapse.api.errors import HttpResponseException, SynapseError from synapse.http import RequestTimedOutError from synapse.http.server import HttpServer, is_method_cancellable from synapse.http.site import SynapseRequest -from synapse.logging import opentracing -from synapse.logging.opentracing import trace_with_opname +from synapse.logging import tracing +from synapse.logging.tracing import trace_with_opname from synapse.types import JsonDict from synapse.util.caches.response_cache import ResponseCache from synapse.util.stringutils import random_string @@ -248,7 +248,9 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta): # Add an authorization header, if configured. if replication_secret: headers[b"Authorization"] = [b"Bearer " + replication_secret] - opentracing.inject_header_dict(headers, check_destination=False) + tracing.inject_active_tracing_context_into_header_dict( + headers, check_destination=False + ) try: # Keep track of attempts made so we can bail if we don't manage to diff --git a/synapse/replication/tcp/external_cache.py b/synapse/replication/tcp/external_cache.py
index a448dd7eb1..e928fded36 100644 --- a/synapse/replication/tcp/external_cache.py +++ b/synapse/replication/tcp/external_cache.py
@@ -17,7 +17,7 @@ from typing import TYPE_CHECKING, Any, Optional from prometheus_client import Counter, Histogram -from synapse.logging import opentracing +from synapse.logging import tracing from synapse.logging.context import make_deferred_yieldable from synapse.util import json_decoder, json_encoder @@ -94,9 +94,9 @@ class ExternalCache: logger.debug("Caching %s %s: %r", cache_name, key, encoded_value) - with opentracing.start_active_span( + with tracing.start_active_span( "ExternalCache.set", - tags={opentracing.SynapseTags.CACHE_NAME: cache_name}, + attributes={tracing.SynapseTags.CACHE_NAME: cache_name}, ): with response_timer.labels("set").time(): return await make_deferred_yieldable( @@ -113,9 +113,9 @@ class ExternalCache: if self._redis_connection is None: return None - with opentracing.start_active_span( + with tracing.start_active_span( "ExternalCache.get", - tags={opentracing.SynapseTags.CACHE_NAME: cache_name}, + attributes={tracing.SynapseTags.CACHE_NAME: cache_name}, ): with response_timer.labels("get").time(): result = await make_deferred_yieldable( diff --git a/synapse/rest/client/keys.py b/synapse/rest/client/keys.py
index e3f454896a..a592fd2cfb 100644 --- a/synapse/rest/client/keys.py +++ b/synapse/rest/client/keys.py
@@ -26,7 +26,7 @@ from synapse.http.servlet import ( parse_string, ) from synapse.http.site import SynapseRequest -from synapse.logging.opentracing import log_kv, set_tag, trace_with_opname +from synapse.logging.tracing import log_kv, set_attribute, trace_with_opname from synapse.types import JsonDict, StreamToken from ._base import client_patterns, interactive_auth_handler @@ -88,7 +88,7 @@ class KeyUploadServlet(RestServlet): user_id ) if dehydrated_device is not None and device_id != dehydrated_device[0]: - set_tag("error", True) + set_attribute("error", True) log_kv( { "message": "Client uploading keys for a different device", @@ -204,13 +204,13 @@ class KeyChangesServlet(RestServlet): requester = await self.auth.get_user_by_req(request, allow_guest=True) from_token_string = parse_string(request, "from", required=True) - set_tag("from", from_token_string) + set_attribute("from", from_token_string) # We want to enforce they do pass us one, but we ignore it and return # changes after the "to" as well as before. # # XXX This does not enforce that "to" is passed. - set_tag("to", str(parse_string(request, "to"))) + set_attribute("to", str(parse_string(request, "to"))) from_token = await StreamToken.from_string(self.store, from_token_string) diff --git a/synapse/rest/client/knock.py b/synapse/rest/client/knock.py
index ad025c8a45..8201f2bd86 100644 --- a/synapse/rest/client/knock.py +++ b/synapse/rest/client/knock.py
@@ -24,7 +24,7 @@ from synapse.http.servlet import ( parse_strings_from_args, ) from synapse.http.site import SynapseRequest -from synapse.logging.opentracing import set_tag +from synapse.logging.tracing import set_attribute from synapse.rest.client.transactions import HttpTransactionCache from synapse.types import JsonDict, RoomAlias, RoomID @@ -97,7 +97,7 @@ class KnockRoomAliasServlet(RestServlet): def on_PUT( self, request: SynapseRequest, room_identifier: str, txn_id: str ) -> Awaitable[Tuple[int, JsonDict]]: - set_tag("txn_id", txn_id) + set_attribute("txn_id", txn_id) return self.txns.fetch_or_execute_request( request, self.on_POST, request, room_identifier, txn_id diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py
index 2f513164cb..3880846e9a 100644 --- a/synapse/rest/client/room.py +++ b/synapse/rest/client/room.py
@@ -46,7 +46,7 @@ from synapse.http.servlet import ( parse_strings_from_args, ) from synapse.http.site import SynapseRequest -from synapse.logging.opentracing import set_tag +from synapse.logging.tracing import set_attribute from synapse.rest.client._base import client_patterns from synapse.rest.client.transactions import HttpTransactionCache from synapse.storage.state import StateFilter @@ -82,7 +82,7 @@ class RoomCreateRestServlet(TransactionRestServlet): def on_PUT( self, request: SynapseRequest, txn_id: str ) -> Awaitable[Tuple[int, JsonDict]]: - set_tag("txn_id", txn_id) + set_attribute("txn_id", txn_id) return self.txns.fetch_or_execute_request(request, self.on_POST, request) async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]: @@ -194,7 +194,7 @@ class RoomStateEventRestServlet(TransactionRestServlet): requester = await self.auth.get_user_by_req(request, allow_guest=True) if txn_id: - set_tag("txn_id", txn_id) + set_attribute("txn_id", txn_id) content = parse_json_object_from_request(request) @@ -229,7 +229,7 @@ class RoomStateEventRestServlet(TransactionRestServlet): except ShadowBanError: event_id = "$" + random_string(43) - set_tag("event_id", event_id) + set_attribute("event_id", event_id) ret = {"event_id": event_id} return 200, ret @@ -279,7 +279,7 @@ class RoomSendEventRestServlet(TransactionRestServlet): except ShadowBanError: event_id = "$" + random_string(43) - set_tag("event_id", event_id) + set_attribute("event_id", event_id) return 200, {"event_id": event_id} def on_GET( @@ -290,7 +290,7 @@ class RoomSendEventRestServlet(TransactionRestServlet): def on_PUT( self, request: SynapseRequest, room_id: str, event_type: str, txn_id: str ) -> Awaitable[Tuple[int, JsonDict]]: - set_tag("txn_id", txn_id) + set_attribute("txn_id", txn_id) return self.txns.fetch_or_execute_request( request, self.on_POST, request, room_id, event_type, txn_id @@ -348,7 +348,7 @@ class JoinRoomAliasServlet(ResolveRoomIdMixin, TransactionRestServlet): def on_PUT( self, request: SynapseRequest, room_identifier: str, txn_id: str ) -> Awaitable[Tuple[int, JsonDict]]: - set_tag("txn_id", txn_id) + set_attribute("txn_id", txn_id) return self.txns.fetch_or_execute_request( request, self.on_POST, request, room_identifier, txn_id @@ -816,7 +816,7 @@ class RoomForgetRestServlet(TransactionRestServlet): def on_PUT( self, request: SynapseRequest, room_id: str, txn_id: str ) -> Awaitable[Tuple[int, JsonDict]]: - set_tag("txn_id", txn_id) + set_attribute("txn_id", txn_id) return self.txns.fetch_or_execute_request( request, self.on_POST, request, room_id, txn_id @@ -916,7 +916,7 @@ class RoomMembershipRestServlet(TransactionRestServlet): def on_PUT( self, request: SynapseRequest, room_id: str, membership_action: str, txn_id: str ) -> Awaitable[Tuple[int, JsonDict]]: - set_tag("txn_id", txn_id) + set_attribute("txn_id", txn_id) return self.txns.fetch_or_execute_request( request, self.on_POST, request, room_id, membership_action, txn_id @@ -962,13 +962,13 @@ class RoomRedactEventRestServlet(TransactionRestServlet): except ShadowBanError: event_id = "$" + random_string(43) - set_tag("event_id", event_id) + set_attribute("event_id", event_id) return 200, {"event_id": event_id} def on_PUT( self, request: SynapseRequest, room_id: str, event_id: str, txn_id: str ) -> Awaitable[Tuple[int, JsonDict]]: - set_tag("txn_id", txn_id) + set_attribute("txn_id", txn_id) return self.txns.fetch_or_execute_request( request, self.on_POST, request, room_id, event_id, txn_id diff --git a/synapse/rest/client/sendtodevice.py b/synapse/rest/client/sendtodevice.py
index 1a8e9a96d4..a3f8fdb317 100644 --- a/synapse/rest/client/sendtodevice.py +++ b/synapse/rest/client/sendtodevice.py
@@ -19,7 +19,7 @@ from synapse.http import servlet from synapse.http.server import HttpServer from synapse.http.servlet import assert_params_in_dict, parse_json_object_from_request from synapse.http.site import SynapseRequest -from synapse.logging.opentracing import set_tag, trace_with_opname +from synapse.logging.tracing import set_attribute, trace_with_opname from synapse.rest.client.transactions import HttpTransactionCache from synapse.types import JsonDict @@ -47,8 +47,8 @@ class SendToDeviceRestServlet(servlet.RestServlet): def on_PUT( self, request: SynapseRequest, message_type: str, txn_id: str ) -> Awaitable[Tuple[int, JsonDict]]: - set_tag("message_type", message_type) - set_tag("txn_id", txn_id) + set_attribute("message_type", message_type) + set_attribute("txn_id", txn_id) return self.txns.fetch_or_execute_request( request, self._put, request, message_type, txn_id ) diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py
index c2989765ce..5ddb08eb2f 100644 --- a/synapse/rest/client/sync.py +++ b/synapse/rest/client/sync.py
@@ -37,7 +37,7 @@ from synapse.handlers.sync import ( from synapse.http.server import HttpServer from synapse.http.servlet import RestServlet, parse_boolean, parse_integer, parse_string from synapse.http.site import SynapseRequest -from synapse.logging.opentracing import trace_with_opname +from synapse.logging.tracing import trace_with_opname from synapse.types import JsonDict, StreamToken from synapse.util import json_decoder diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py
index cf98b0ab48..f34c067515 100644 --- a/synapse/storage/controllers/persist_events.py +++ b/synapse/storage/controllers/persist_events.py
@@ -45,8 +45,8 @@ from twisted.internet import defer from synapse.api.constants import EventTypes, Membership from synapse.events import EventBase from synapse.events.snapshot import EventContext -from synapse.logging import opentracing from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable +from synapse.logging.tracing import Link, get_active_span, start_active_span, trace from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.controllers.state import StateStorageController from synapse.storage.databases import Databases @@ -118,7 +118,7 @@ times_pruned_extremities = Counter( class _PersistEventsTask: """A batch of events to persist.""" - name: ClassVar[str] = "persist_event_batch" # used for opentracing + name: ClassVar[str] = "persist_event_batch" # used for tracing events_and_contexts: List[Tuple[EventBase, EventContext]] backfilled: bool @@ -139,7 +139,7 @@ class _PersistEventsTask: class _UpdateCurrentStateTask: """A room whose current state needs recalculating.""" - name: ClassVar[str] = "update_current_state" # used for opentracing + name: ClassVar[str] = "update_current_state" # used for tracing def try_merge(self, task: "_EventPersistQueueTask") -> bool: """Deduplicates consecutive recalculations of current state.""" @@ -154,11 +154,11 @@ class _EventPersistQueueItem: task: _EventPersistQueueTask deferred: ObservableDeferred - parent_opentracing_span_contexts: List = attr.ib(factory=list) - """A list of opentracing spans waiting for this batch""" + parent_tracing_span_contexts: List = attr.ib(factory=list) + """A list of tracing spans waiting for this batch""" - opentracing_span_context: Any = None - """The opentracing span under which the persistence actually happened""" + tracing_span_context: Any = None + """The tracing span under which the persistence actually happened""" _PersistResult = TypeVar("_PersistResult") @@ -222,10 +222,10 @@ class _EventPeristenceQueue(Generic[_PersistResult]): ) queue.append(end_item) - # also add our active opentracing span to the item so that we get a link back - span = opentracing.active_span() + # also add our active tracing span to the item so that we get a link back + span = get_active_span() if span: - end_item.parent_opentracing_span_contexts.append(span.context) + end_item.parent_tracing_span_contexts.append(span.get_span_context()) # start a processor for the queue, if there isn't one already self._handle_queue(room_id) @@ -233,9 +233,10 @@ class _EventPeristenceQueue(Generic[_PersistResult]): # wait for the queue item to complete res = await make_deferred_yieldable(end_item.deferred.observe()) - # add another opentracing span which links to the persist trace. - with opentracing.start_active_span_follows_from( - f"{task.name}_complete", (end_item.opentracing_span_context,) + # add another tracing span which links to the persist trace. + with start_active_span( + f"{task.name}_complete", + links=[Link(end_item.tracing_span_context)], ): pass @@ -266,13 +267,15 @@ class _EventPeristenceQueue(Generic[_PersistResult]): queue = self._get_drainining_queue(room_id) for item in queue: try: - with opentracing.start_active_span_follows_from( + with start_active_span( item.task.name, - item.parent_opentracing_span_contexts, - inherit_force_tracing=True, - ) as scope: - if scope: - item.opentracing_span_context = scope.span.context + links=[ + Link(span_context) + for span_context in item.parent_tracing_span_contexts + ], + ) as span: + if span: + item.tracing_span_context = span.get_span_context() ret = await self._per_item_callback(room_id, item.task) except Exception: @@ -355,7 +358,7 @@ class EventsPersistenceStorageController: f"Found an unexpected task type in event persistence queue: {task}" ) - @opentracing.trace + @trace async def persist_events( self, events_and_contexts: Iterable[Tuple[EventBase, EventContext]], @@ -418,7 +421,7 @@ class EventsPersistenceStorageController: self.main_store.get_room_max_token(), ) - @opentracing.trace + @trace async def persist_event( self, event: EventBase, context: EventContext, backfilled: bool = False ) -> Tuple[EventBase, PersistedEventPosition, RoomStreamToken]: diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index b394a6658b..ca0f606797 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py
@@ -47,7 +47,7 @@ from twisted.internet.interfaces import IReactorCore from synapse.api.errors import StoreError from synapse.config.database import DatabaseConnectionConfig -from synapse.logging import opentracing +from synapse.logging import tracing from synapse.logging.context import ( LoggingContext, current_context, @@ -422,11 +422,11 @@ class LoggingTransaction: start = time.time() try: - with opentracing.start_active_span( + with tracing.start_active_span( "db.query", - tags={ - opentracing.tags.DATABASE_TYPE: "sql", - opentracing.tags.DATABASE_STATEMENT: one_line_sql, + attributes={ + tracing.SpanAttributes.DB_SYSTEM: "sql", + tracing.SpanAttributes.DB_STATEMENT: one_line_sql, }, ): return func(sql, *args, **kwargs) @@ -701,15 +701,15 @@ class DatabasePool: exception_callbacks=exception_callbacks, ) try: - with opentracing.start_active_span( + with tracing.start_active_span( "db.txn", - tags={ - opentracing.SynapseTags.DB_TXN_DESC: desc, - opentracing.SynapseTags.DB_TXN_ID: name, + attributes={ + tracing.SynapseTags.DB_TXN_DESC: desc, + tracing.SynapseTags.DB_TXN_ID: name, }, ): r = func(cursor, *args, **kwargs) - opentracing.log_kv({"message": "commit"}) + tracing.log_kv({"message": "commit"}) conn.commit() return r except self.engine.module.OperationalError as e: @@ -725,7 +725,7 @@ class DatabasePool: if i < N: i += 1 try: - with opentracing.start_active_span("db.rollback"): + with tracing.start_active_span("db.rollback"): conn.rollback() except self.engine.module.Error as e1: transaction_logger.warning("[TXN EROLL] {%s} %s", name, e1) @@ -739,7 +739,7 @@ class DatabasePool: if i < N: i += 1 try: - with opentracing.start_active_span("db.rollback"): + with tracing.start_active_span("db.rollback"): conn.rollback() except self.engine.module.Error as e1: transaction_logger.warning( @@ -845,7 +845,7 @@ class DatabasePool: logger.warning("Starting db txn '%s' from sentinel context", desc) try: - with opentracing.start_active_span(f"db.{desc}"): + with tracing.start_active_span(f"db.{desc}"): result = await self.runWithConnection( self.new_transaction, desc, @@ -928,9 +928,7 @@ class DatabasePool: with LoggingContext( str(curr_context), parent_context=parent_context ) as context: - with opentracing.start_active_span( - operation_name="db.connection", - ): + with tracing.start_active_span("db.connection"): sched_duration_sec = monotonic_time() - start_time sql_scheduling_timer.observe(sched_duration_sec) context.add_database_scheduled(sched_duration_sec) @@ -944,15 +942,13 @@ class DatabasePool: "Reconnecting database connection over transaction limit" ) conn.reconnect() - opentracing.log_kv( - {"message": "reconnected due to txn limit"} - ) + tracing.log_kv({"message": "reconnected due to txn limit"}) self._txn_counters[tid] = 1 if self.engine.is_connection_closed(conn): logger.debug("Reconnecting closed database connection") conn.reconnect() - opentracing.log_kv({"message": "reconnected"}) + tracing.log_kv({"message": "reconnected"}) if self._txn_limit > 0: self._txn_counters[tid] = 1 diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py
index 73c95ffb6f..1503d74b1f 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py
@@ -27,7 +27,7 @@ from typing import ( ) from synapse.logging import issue9533_logger -from synapse.logging.opentracing import log_kv, set_tag, trace +from synapse.logging.tracing import log_kv, set_attribute, trace from synapse.replication.tcp.streams import ToDeviceStream from synapse.storage._base import SQLBaseStore, db_to_json from synapse.storage.database import ( @@ -436,7 +436,7 @@ class DeviceInboxWorkerStore(SQLBaseStore): (user_id, device_id), None ) - set_tag("last_deleted_stream_id", str(last_deleted_stream_id)) + set_attribute("last_deleted_stream_id", str(last_deleted_stream_id)) if last_deleted_stream_id: has_changed = self._device_inbox_stream_cache.has_entity_changed( @@ -485,10 +485,10 @@ class DeviceInboxWorkerStore(SQLBaseStore): A list of messages for the device and where in the stream the messages got to. """ - set_tag("destination", destination) - set_tag("last_stream_id", last_stream_id) - set_tag("current_stream_id", current_stream_id) - set_tag("limit", limit) + set_attribute("destination", destination) + set_attribute("last_stream_id", last_stream_id) + set_attribute("current_stream_id", current_stream_id) + set_attribute("limit", limit) has_changed = self._device_federation_outbox_stream_cache.has_entity_changed( destination, last_stream_id diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index ca0fe8c4be..7ceb7a202b 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py
@@ -30,11 +30,11 @@ from typing import ( from typing_extensions import Literal -from synapse.api.constants import EduTypes +from synapse.api.constants import EduTypes, EventContentFields from synapse.api.errors import Codes, StoreError -from synapse.logging.opentracing import ( +from synapse.logging.tracing import ( get_active_span_text_map, - set_tag, + set_attribute, trace, whitelisted_homeserver, ) @@ -333,12 +333,12 @@ class DeviceWorkerStore(EndToEndKeyWorkerStore): # (user_id, device_id) entries into a map, with the value being # the max stream_id across each set of duplicate entries # - # maps (user_id, device_id) -> (stream_id, opentracing_context) + # maps (user_id, device_id) -> (stream_id,tracing_context) # - # opentracing_context contains the opentracing metadata for the request + # tracing_context contains the opentelemetry metadata for the request # that created the poke # - # The most recent request's opentracing_context is used as the + # The most recent request's tracing_context is used as the # context which created the Edu. # This is the stream ID that we will return for the consumer to resume @@ -401,8 +401,8 @@ class DeviceWorkerStore(EndToEndKeyWorkerStore): if update_stream_id > previous_update_stream_id: # FIXME If this overwrites an older update, this discards the - # previous OpenTracing context. - # It might make it harder to track down issues using OpenTracing. + # previous tracing context. + # It might make it harder to track down issues using tracing. # If there's a good reason why it doesn't matter, a comment here # about that would not hurt. query_map[key] = (update_stream_id, update_context) @@ -468,11 +468,11 @@ class DeviceWorkerStore(EndToEndKeyWorkerStore): - user_id - device_id - stream_id - - opentracing_context + - tracing_context """ # get the list of device updates that need to be sent sql = """ - SELECT user_id, device_id, stream_id, opentracing_context FROM device_lists_outbound_pokes + SELECT user_id, device_id, stream_id, tracing_context FROM device_lists_outbound_pokes WHERE destination = ? AND ? < stream_id AND stream_id <= ? ORDER BY stream_id LIMIT ? @@ -493,7 +493,7 @@ class DeviceWorkerStore(EndToEndKeyWorkerStore): destination: The host the device updates are intended for from_stream_id: The minimum stream_id to filter updates by, exclusive query_map: Dictionary mapping (user_id, device_id) to - (update stream_id, the relevant json-encoded opentracing context) + (update stream_id, the relevant json-encoded tracing context) Returns: List of objects representing a device update EDU. @@ -531,13 +531,13 @@ class DeviceWorkerStore(EndToEndKeyWorkerStore): for device_id in device_ids: device = user_devices[device_id] - stream_id, opentracing_context = query_map[(user_id, device_id)] + stream_id, tracing_context = query_map[(user_id, device_id)] result = { "user_id": user_id, "device_id": device_id, "prev_id": [prev_id] if prev_id else [], "stream_id": stream_id, - "org.matrix.opentracing_context": opentracing_context, + EventContentFields.TRACING_CONTEXT: tracing_context, } prev_id = stream_id @@ -706,8 +706,8 @@ class DeviceWorkerStore(EndToEndKeyWorkerStore): else: results[user_id] = await self.get_cached_devices_for_user(user_id) - set_tag("in_cache", str(results)) - set_tag("not_in_cache", str(user_ids_not_in_cache)) + set_attribute("in_cache", str(results)) + set_attribute("not_in_cache", str(user_ids_not_in_cache)) return user_ids_not_in_cache, results @@ -1801,7 +1801,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): "device_id", "sent", "ts", - "opentracing_context", + "tracing_context", ), values=[ ( @@ -1846,7 +1846,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): "room_id", "stream_id", "converted_to_destinations", - "opentracing_context", + "tracing_context", ), values=[ ( @@ -1870,11 +1870,11 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): written to `device_lists_outbound_pokes`. Returns: - A list of user ID, device ID, room ID, stream ID and optional opentracing context. + A list of user ID, device ID, room ID, stream ID and optional opentelemetry context. """ sql = """ - SELECT user_id, device_id, room_id, stream_id, opentracing_context + SELECT user_id, device_id, room_id, stream_id, tracing_context FROM device_lists_changes_in_room WHERE NOT converted_to_destinations ORDER BY stream_id @@ -1892,9 +1892,9 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): device_id, room_id, stream_id, - db_to_json(opentracing_context), + db_to_json(tracing_context), ) - for user_id, device_id, room_id, stream_id, opentracing_context in txn + for user_id, device_id, room_id, stream_id, tracing_context in txn ] return await self.db_pool.runInteraction( diff --git a/synapse/storage/databases/main/e2e_room_keys.py b/synapse/storage/databases/main/e2e_room_keys.py
index af59be6b48..6d565102ac 100644 --- a/synapse/storage/databases/main/e2e_room_keys.py +++ b/synapse/storage/databases/main/e2e_room_keys.py
@@ -18,7 +18,7 @@ from typing import Dict, Iterable, Mapping, Optional, Tuple, cast from typing_extensions import Literal, TypedDict from synapse.api.errors import StoreError -from synapse.logging.opentracing import log_kv, trace +from synapse.logging.tracing import log_kv, trace from synapse.storage._base import SQLBaseStore, db_to_json from synapse.storage.database import LoggingTransaction from synapse.types import JsonDict, JsonSerializable, StreamKeyType diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py
index 46c0d06157..2df8101390 100644 --- a/synapse/storage/databases/main/end_to_end_keys.py +++ b/synapse/storage/databases/main/end_to_end_keys.py
@@ -36,7 +36,7 @@ from synapse.appservice import ( TransactionOneTimeKeyCounts, TransactionUnusedFallbackKeys, ) -from synapse.logging.opentracing import log_kv, set_tag, trace +from synapse.logging.tracing import log_kv, set_attribute, trace from synapse.storage._base import SQLBaseStore, db_to_json from synapse.storage.database import ( DatabasePool, @@ -146,7 +146,7 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker key data. The key data will be a dict in the same format as the DeviceKeys type returned by POST /_matrix/client/r0/keys/query. """ - set_tag("query_list", str(query_list)) + set_attribute("query_list", str(query_list)) if not query_list: return {} @@ -228,8 +228,8 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker Dict mapping from user-id to dict mapping from device_id to key data. """ - set_tag("include_all_devices", include_all_devices) - set_tag("include_deleted_devices", include_deleted_devices) + set_attribute("include_all_devices", include_all_devices) + set_attribute("include_deleted_devices", include_deleted_devices) result = await self.db_pool.runInteraction( "get_e2e_device_keys", @@ -416,9 +416,9 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker """ def _add_e2e_one_time_keys(txn: LoggingTransaction) -> None: - set_tag("user_id", user_id) - set_tag("device_id", device_id) - set_tag("new_keys", str(new_keys)) + set_attribute("user_id", user_id) + set_attribute("device_id", device_id) + set_attribute("new_keys", str(new_keys)) # We are protected from race between lookup and insertion due to # a unique constraint. If there is a race of two calls to # `add_e2e_one_time_keys` then they'll conflict and we will only @@ -1158,10 +1158,10 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore): """ def _set_e2e_device_keys_txn(txn: LoggingTransaction) -> bool: - set_tag("user_id", user_id) - set_tag("device_id", device_id) - set_tag("time_now", time_now) - set_tag("device_keys", str(device_keys)) + set_attribute("user_id", user_id) + set_attribute("device_id", device_id) + set_attribute("time_now", time_now) + set_attribute("device_keys", str(device_keys)) old_key_json = self.db_pool.simple_select_one_onecol_txn( txn, diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py
index a9a88c8bfd..dd187f7422 100644 --- a/synapse/storage/schema/__init__.py +++ b/synapse/storage/schema/__init__.py
@@ -75,6 +75,8 @@ Changes in SCHEMA_VERSION = 71: Changes in SCHEMA_VERSION = 72: - event_edges.(room_id, is_state) are no longer written to. - Tables related to groups are dropped. + - Rename column in `device_lists_outbound_pokes` and `device_lists_changes_in_room` + from `opentracing_context` to generalized `tracing_context`. """ diff --git a/synapse/storage/schema/main/delta/72/04rename_opentelemtetry_tracing_context.sql b/synapse/storage/schema/main/delta/72/04rename_opentelemtetry_tracing_context.sql new file mode 100644
index 0000000000..ae904863f8 --- /dev/null +++ b/synapse/storage/schema/main/delta/72/04rename_opentelemtetry_tracing_context.sql
@@ -0,0 +1,18 @@ +/* Copyright 2021 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Rename to generalized `tracing_context` since we're moving from opentracing to opentelemetry +ALTER TABLE device_lists_outbound_pokes RENAME COLUMN opentracing_context TO tracing_context; +ALTER TABLE device_lists_changes_in_room RENAME COLUMN opentracing_context TO tracing_context; diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py
index a3eb5f741b..1dd2d3e62e 100644 --- a/synapse/util/caches/response_cache.py +++ b/synapse/util/caches/response_cache.py
@@ -29,11 +29,7 @@ import attr from twisted.internet import defer from synapse.logging.context import make_deferred_yieldable, run_in_background -from synapse.logging.opentracing import ( - active_span, - start_active_span, - start_active_span_follows_from, -) +from synapse.logging.tracing import Link, get_active_span, start_active_span from synapse.util import Clock from synapse.util.async_helpers import AbstractObservableDeferred, ObservableDeferred from synapse.util.caches import register_cache @@ -41,7 +37,7 @@ from synapse.util.caches import register_cache logger = logging.getLogger(__name__) if TYPE_CHECKING: - import opentracing + import opentelemetry # the type of the key in the cache KV = TypeVar("KV") @@ -82,8 +78,8 @@ class ResponseCacheEntry: easier to cache Failure results. """ - opentracing_span_context: "Optional[opentracing.SpanContext]" - """The opentracing span which generated/is generating the result""" + tracing_span_context: Optional["opentelemetry.trace.SpanContext"] + """The tracing span which generated/is generating the result""" class ResponseCache(Generic[KV]): @@ -141,7 +137,7 @@ class ResponseCache(Generic[KV]): self, context: ResponseCacheContext[KV], deferred: "defer.Deferred[RV]", - opentracing_span_context: "Optional[opentracing.SpanContext]", + tracing_span_context: Optional["opentelemetry.trace.SpanContext"], ) -> ResponseCacheEntry: """Set the entry for the given key to the given deferred. @@ -152,14 +148,14 @@ class ResponseCache(Generic[KV]): Args: context: Information about the cache miss deferred: The deferred which resolves to the result. - opentracing_span_context: An opentracing span wrapping the calculation + tracing_span_context: An tracing span wrapping the calculation Returns: The cache entry object. """ result = ObservableDeferred(deferred, consumeErrors=True) key = context.cache_key - entry = ResponseCacheEntry(result, opentracing_span_context) + entry = ResponseCacheEntry(result, tracing_span_context) self._result_cache[key] = entry def on_complete(r: RV) -> RV: @@ -234,15 +230,15 @@ class ResponseCache(Generic[KV]): if cache_context: kwargs["cache_context"] = context - span_context: Optional[opentracing.SpanContext] = None + span_context: Optional["opentelemetry.trace.SpanContext"] = None async def cb() -> RV: # NB it is important that we do not `await` before setting span_context! nonlocal span_context with start_active_span(f"ResponseCache[{self._name}].calculate"): - span = active_span() + span = get_active_span() if span: - span_context = span.context + span_context = span.get_span_context() return await callback(*args, **kwargs) d = run_in_background(cb) @@ -257,9 +253,9 @@ class ResponseCache(Generic[KV]): "[%s]: using incomplete cached result for [%s]", self._name, key ) - span_context = entry.opentracing_span_context - with start_active_span_follows_from( + span_context = entry.tracing_span_context + with start_active_span( f"ResponseCache[{self._name}].wait", - contexts=(span_context,) if span_context else (), + links=[Link(span_context)] if span_context else None, ): return await make_deferred_yieldable(result)