summary refs log tree commit diff
path: root/synapse/storage/controllers
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/controllers')
-rw-r--r--synapse/storage/controllers/persist_events.py54
-rw-r--r--synapse/storage/controllers/state.py2
2 files changed, 30 insertions, 26 deletions
diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py

index dad3731b9b..bb14729a9d 100644 --- a/synapse/storage/controllers/persist_events.py +++ b/synapse/storage/controllers/persist_events.py
@@ -46,11 +46,12 @@ from synapse.api.constants import EventTypes, Membership from synapse.events import EventBase from synapse.events.snapshot import EventContext from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable -from synapse.logging.opentracing import ( +from synapse.logging.tracing import ( + Link, SynapseTags, - active_span, - set_tag, - start_active_span_follows_from, + get_active_span, + set_attribute, + start_active_span, trace, ) from synapse.metrics.background_process_metrics import run_as_background_process @@ -124,7 +125,7 @@ times_pruned_extremities = Counter( class _PersistEventsTask: """A batch of events to persist.""" - name: ClassVar[str] = "persist_event_batch" # used for opentracing + name: ClassVar[str] = "persist_event_batch" # used for tracing events_and_contexts: List[Tuple[EventBase, EventContext]] backfilled: bool @@ -145,7 +146,7 @@ class _PersistEventsTask: class _UpdateCurrentStateTask: """A room whose current state needs recalculating.""" - name: ClassVar[str] = "update_current_state" # used for opentracing + name: ClassVar[str] = "update_current_state" # used for tracing def try_merge(self, task: "_EventPersistQueueTask") -> bool: """Deduplicates consecutive recalculations of current state.""" @@ -160,11 +161,11 @@ class _EventPersistQueueItem: task: _EventPersistQueueTask deferred: ObservableDeferred - parent_opentracing_span_contexts: List = attr.ib(factory=list) - """A list of opentracing spans waiting for this batch""" + parent_tracing_span_contexts: List = attr.ib(factory=list) + """A list of tracing spans waiting for this batch""" - opentracing_span_context: Any = None - """The opentracing span under which the persistence actually happened""" + tracing_span_context: Any = None + """The tracing span under which the persistence actually happened""" _PersistResult = TypeVar("_PersistResult") @@ -228,10 +229,10 @@ class _EventPeristenceQueue(Generic[_PersistResult]): ) queue.append(end_item) - # also add our active opentracing span to the item so that we get a link back - span = active_span() + # also add our active tracing span to the item so that we get a link back + span = get_active_span() if span: - end_item.parent_opentracing_span_contexts.append(span.context) + end_item.parent_tracing_span_contexts.append(span.get_span_context()) # start a processor for the queue, if there isn't one already self._handle_queue(room_id) @@ -239,9 +240,10 @@ class _EventPeristenceQueue(Generic[_PersistResult]): # wait for the queue item to complete res = await make_deferred_yieldable(end_item.deferred.observe()) - # add another opentracing span which links to the persist trace. - with start_active_span_follows_from( - f"{task.name}_complete", (end_item.opentracing_span_context,) + # add another tracing span which links to the persist trace. + with start_active_span( + f"{task.name}_complete", + links=[Link(end_item.tracing_span_context)], ): pass @@ -272,13 +274,15 @@ class _EventPeristenceQueue(Generic[_PersistResult]): queue = self._get_drainining_queue(room_id) for item in queue: try: - with start_active_span_follows_from( + with start_active_span( item.task.name, - item.parent_opentracing_span_contexts, - inherit_force_tracing=True, - ) as scope: - if scope: - item.opentracing_span_context = scope.span.context + links=[ + Link(span_context) + for span_context in item.parent_tracing_span_contexts + ], + ) as span: + if span: + item.tracing_span_context = span.get_span_context() ret = await self._per_item_callback(room_id, item.task) except Exception: @@ -392,15 +396,15 @@ class EventsPersistenceStorageController: partitioned.setdefault(event.room_id, []).append((event, ctx)) event_ids.append(event.event_id) - set_tag( + set_attribute( SynapseTags.FUNC_ARG_PREFIX + "event_ids", str(event_ids), ) - set_tag( + set_attribute( SynapseTags.FUNC_ARG_PREFIX + "event_ids.length", str(len(event_ids)), ) - set_tag(SynapseTags.FUNC_ARG_PREFIX + "backfilled", str(backfilled)) + set_attribute(SynapseTags.FUNC_ARG_PREFIX + "backfilled", str(backfilled)) async def enqueue( item: Tuple[str, List[Tuple[EventBase, EventContext]]] diff --git a/synapse/storage/controllers/state.py b/synapse/storage/controllers/state.py
index f9ffd0e29e..5964835ea3 100644 --- a/synapse/storage/controllers/state.py +++ b/synapse/storage/controllers/state.py
@@ -29,7 +29,7 @@ from typing import ( from synapse.api.constants import EventTypes from synapse.events import EventBase -from synapse.logging.opentracing import tag_args, trace +from synapse.logging.tracing import tag_args, trace from synapse.storage.roommember import ProfileInfo from synapse.storage.state import StateFilter from synapse.storage.util.partial_state_events_tracker import (