diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py
index c11f6c5845..dc38942bb1 100644
--- a/synapse/storage/persist_events.py
+++ b/synapse/storage/persist_events.py
@@ -18,6 +18,7 @@ import itertools
import logging
from collections import deque
from typing import (
+ Any,
Awaitable,
Callable,
Collection,
@@ -40,6 +41,7 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
+from synapse.logging import opentracing
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.databases import Databases
@@ -103,12 +105,18 @@ times_pruned_extremities = Counter(
)
-@attr.s(auto_attribs=True, frozen=True, slots=True)
+@attr.s(auto_attribs=True, slots=True)
class _EventPersistQueueItem:
events_and_contexts: List[Tuple[EventBase, EventContext]]
backfilled: bool
deferred: ObservableDeferred
+ parent_opentracing_span_contexts: List = []
+ """A list of opentracing spans waiting for this batch"""
+
+ opentracing_span_context: Any = None
+ """The opentracing span under which the persistence actually happened"""
+
_PersistResult = TypeVar("_PersistResult")
@@ -171,9 +179,27 @@ class _EventPeristenceQueue(Generic[_PersistResult]):
)
queue.append(end_item)
+ # add our events to the queue item
end_item.events_and_contexts.extend(events_and_contexts)
+
+ # also add our active opentracing span to the item so that we get a link back
+ span = opentracing.active_span()
+ if span:
+ end_item.parent_opentracing_span_contexts.append(span.context)
+
+ # start a processor for the queue, if there isn't one already
self._handle_queue(room_id)
- return await make_deferred_yieldable(end_item.deferred.observe())
+
+ # wait for the queue item to complete
+ res = await make_deferred_yieldable(end_item.deferred.observe())
+
+ # add another opentracing span which links to the persist trace.
+ with opentracing.start_active_span_follows_from(
+ "persist_event_batch_complete", (end_item.opentracing_span_context,)
+ ):
+ pass
+
+ return res
def _handle_queue(self, room_id):
"""Attempts to handle the queue for a room if not already being handled.
@@ -200,9 +226,17 @@ class _EventPeristenceQueue(Generic[_PersistResult]):
queue = self._get_drainining_queue(room_id)
for item in queue:
try:
- ret = await self._per_item_callback(
- item.events_and_contexts, item.backfilled
- )
+ with opentracing.start_active_span_follows_from(
+ "persist_event_batch",
+ item.parent_opentracing_span_contexts,
+ inherit_force_tracing=True,
+ ) as scope:
+ if scope:
+ item.opentracing_span_context = scope.span.context
+
+ ret = await self._per_item_callback(
+ item.events_and_contexts, item.backfilled
+ )
except Exception:
with PreserveLoggingContext():
item.deferred.errback()
@@ -252,6 +286,7 @@ class EventsPersistenceStorage:
self._event_persist_queue = _EventPeristenceQueue(self._persist_event_batch)
self._state_resolution_handler = hs.get_state_resolution_handler()
+ @opentracing.trace
async def persist_events(
self,
events_and_contexts: Iterable[Tuple[EventBase, EventContext]],
@@ -307,6 +342,7 @@ class EventsPersistenceStorage:
self.main_store.get_room_max_token(),
)
+ @opentracing.trace
async def persist_event(
self, event: EventBase, context: EventContext, backfilled: bool = False
) -> Tuple[EventBase, PersistedEventPosition, RoomStreamToken]:
|