diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py
index cf98b0ab48..f34c067515 100644
--- a/synapse/storage/controllers/persist_events.py
+++ b/synapse/storage/controllers/persist_events.py
@@ -45,8 +45,8 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
-from synapse.logging import opentracing
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
+from synapse.logging.tracing import Link, get_active_span, start_active_span, trace
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.controllers.state import StateStorageController
from synapse.storage.databases import Databases
@@ -118,7 +118,7 @@ times_pruned_extremities = Counter(
class _PersistEventsTask:
"""A batch of events to persist."""
- name: ClassVar[str] = "persist_event_batch" # used for opentracing
+ name: ClassVar[str] = "persist_event_batch" # used for tracing
events_and_contexts: List[Tuple[EventBase, EventContext]]
backfilled: bool
@@ -139,7 +139,7 @@ class _PersistEventsTask:
class _UpdateCurrentStateTask:
"""A room whose current state needs recalculating."""
- name: ClassVar[str] = "update_current_state" # used for opentracing
+ name: ClassVar[str] = "update_current_state" # used for tracing
def try_merge(self, task: "_EventPersistQueueTask") -> bool:
"""Deduplicates consecutive recalculations of current state."""
@@ -154,11 +154,11 @@ class _EventPersistQueueItem:
task: _EventPersistQueueTask
deferred: ObservableDeferred
- parent_opentracing_span_contexts: List = attr.ib(factory=list)
- """A list of opentracing spans waiting for this batch"""
+ parent_tracing_span_contexts: List = attr.ib(factory=list)
+ """A list of tracing spans waiting for this batch"""
- opentracing_span_context: Any = None
- """The opentracing span under which the persistence actually happened"""
+ tracing_span_context: Any = None
+ """The tracing span under which the persistence actually happened"""
_PersistResult = TypeVar("_PersistResult")
@@ -222,10 +222,10 @@ class _EventPeristenceQueue(Generic[_PersistResult]):
)
queue.append(end_item)
- # also add our active opentracing span to the item so that we get a link back
- span = opentracing.active_span()
+ # also add our active tracing span to the item so that we get a link back
+ span = get_active_span()
if span:
- end_item.parent_opentracing_span_contexts.append(span.context)
+ end_item.parent_tracing_span_contexts.append(span.get_span_context())
# start a processor for the queue, if there isn't one already
self._handle_queue(room_id)
@@ -233,9 +233,10 @@ class _EventPeristenceQueue(Generic[_PersistResult]):
# wait for the queue item to complete
res = await make_deferred_yieldable(end_item.deferred.observe())
- # add another opentracing span which links to the persist trace.
- with opentracing.start_active_span_follows_from(
- f"{task.name}_complete", (end_item.opentracing_span_context,)
+ # add another tracing span which links to the persist trace.
+ with start_active_span(
+ f"{task.name}_complete",
+ links=[Link(end_item.tracing_span_context)],
):
pass
@@ -266,13 +267,15 @@ class _EventPeristenceQueue(Generic[_PersistResult]):
queue = self._get_drainining_queue(room_id)
for item in queue:
try:
- with opentracing.start_active_span_follows_from(
+ with start_active_span(
item.task.name,
- item.parent_opentracing_span_contexts,
- inherit_force_tracing=True,
- ) as scope:
- if scope:
- item.opentracing_span_context = scope.span.context
+ links=[
+ Link(span_context)
+ for span_context in item.parent_tracing_span_contexts
+ ],
+ ) as span:
+ if span:
+ item.tracing_span_context = span.get_span_context()
ret = await self._per_item_callback(room_id, item.task)
except Exception:
@@ -355,7 +358,7 @@ class EventsPersistenceStorageController:
f"Found an unexpected task type in event persistence queue: {task}"
)
- @opentracing.trace
+ @trace
async def persist_events(
self,
events_and_contexts: Iterable[Tuple[EventBase, EventContext]],
@@ -418,7 +421,7 @@ class EventsPersistenceStorageController:
self.main_store.get_room_max_token(),
)
- @opentracing.trace
+ @trace
async def persist_event(
self, event: EventBase, context: EventContext, backfilled: bool = False
) -> Tuple[EventBase, PersistedEventPosition, RoomStreamToken]:
|