summary refs log tree commit diff
path: root/synapse/storage/controllers
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/controllers')
-rw-r--r--synapse/storage/controllers/persist_events.py45
1 files changed, 24 insertions, 21 deletions
diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py

index cf98b0ab48..f34c067515 100644 --- a/synapse/storage/controllers/persist_events.py +++ b/synapse/storage/controllers/persist_events.py
@@ -45,8 +45,8 @@ from twisted.internet import defer from synapse.api.constants import EventTypes, Membership from synapse.events import EventBase from synapse.events.snapshot import EventContext -from synapse.logging import opentracing from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable +from synapse.logging.tracing import Link, get_active_span, start_active_span, trace from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.controllers.state import StateStorageController from synapse.storage.databases import Databases @@ -118,7 +118,7 @@ times_pruned_extremities = Counter( class _PersistEventsTask: """A batch of events to persist.""" - name: ClassVar[str] = "persist_event_batch" # used for opentracing + name: ClassVar[str] = "persist_event_batch" # used for tracing events_and_contexts: List[Tuple[EventBase, EventContext]] backfilled: bool @@ -139,7 +139,7 @@ class _PersistEventsTask: class _UpdateCurrentStateTask: """A room whose current state needs recalculating.""" - name: ClassVar[str] = "update_current_state" # used for opentracing + name: ClassVar[str] = "update_current_state" # used for tracing def try_merge(self, task: "_EventPersistQueueTask") -> bool: """Deduplicates consecutive recalculations of current state.""" @@ -154,11 +154,11 @@ class _EventPersistQueueItem: task: _EventPersistQueueTask deferred: ObservableDeferred - parent_opentracing_span_contexts: List = attr.ib(factory=list) - """A list of opentracing spans waiting for this batch""" + parent_tracing_span_contexts: List = attr.ib(factory=list) + """A list of tracing spans waiting for this batch""" - opentracing_span_context: Any = None - """The opentracing span under which the persistence actually happened""" + tracing_span_context: Any = None + """The tracing span under which the persistence actually happened""" _PersistResult = TypeVar("_PersistResult") @@ -222,10 +222,10 @@ class _EventPeristenceQueue(Generic[_PersistResult]): ) queue.append(end_item) - # also add our active opentracing span to the item so that we get a link back - span = opentracing.active_span() + # also add our active tracing span to the item so that we get a link back + span = get_active_span() if span: - end_item.parent_opentracing_span_contexts.append(span.context) + end_item.parent_tracing_span_contexts.append(span.get_span_context()) # start a processor for the queue, if there isn't one already self._handle_queue(room_id) @@ -233,9 +233,10 @@ class _EventPeristenceQueue(Generic[_PersistResult]): # wait for the queue item to complete res = await make_deferred_yieldable(end_item.deferred.observe()) - # add another opentracing span which links to the persist trace. - with opentracing.start_active_span_follows_from( - f"{task.name}_complete", (end_item.opentracing_span_context,) + # add another tracing span which links to the persist trace. + with start_active_span( + f"{task.name}_complete", + links=[Link(end_item.tracing_span_context)], ): pass @@ -266,13 +267,15 @@ class _EventPeristenceQueue(Generic[_PersistResult]): queue = self._get_drainining_queue(room_id) for item in queue: try: - with opentracing.start_active_span_follows_from( + with start_active_span( item.task.name, - item.parent_opentracing_span_contexts, - inherit_force_tracing=True, - ) as scope: - if scope: - item.opentracing_span_context = scope.span.context + links=[ + Link(span_context) + for span_context in item.parent_tracing_span_contexts + ], + ) as span: + if span: + item.tracing_span_context = span.get_span_context() ret = await self._per_item_callback(room_id, item.task) except Exception: @@ -355,7 +358,7 @@ class EventsPersistenceStorageController: f"Found an unexpected task type in event persistence queue: {task}" ) - @opentracing.trace + @trace async def persist_events( self, events_and_contexts: Iterable[Tuple[EventBase, EventContext]], @@ -418,7 +421,7 @@ class EventsPersistenceStorageController: self.main_store.get_room_max_token(), ) - @opentracing.trace + @trace async def persist_event( self, event: EventBase, context: EventContext, backfilled: bool = False ) -> Tuple[EventBase, PersistedEventPosition, RoomStreamToken]: