summary refs log tree commit diff
path: root/synapse/storage/persist_events.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/persist_events.py')
-rw-r--r--synapse/storage/persist_events.py46
1 files changed, 41 insertions, 5 deletions
diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py
index c11f6c5845..dc38942bb1 100644
--- a/synapse/storage/persist_events.py
+++ b/synapse/storage/persist_events.py
@@ -18,6 +18,7 @@ import itertools
 import logging
 from collections import deque
 from typing import (
+    Any,
     Awaitable,
     Callable,
     Collection,
@@ -40,6 +41,7 @@ from twisted.internet import defer
 from synapse.api.constants import EventTypes, Membership
 from synapse.events import EventBase
 from synapse.events.snapshot import EventContext
+from synapse.logging import opentracing
 from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage.databases import Databases
@@ -103,12 +105,18 @@ times_pruned_extremities = Counter(
 )
 
 
-@attr.s(auto_attribs=True, frozen=True, slots=True)
+@attr.s(auto_attribs=True, slots=True)
 class _EventPersistQueueItem:
     events_and_contexts: List[Tuple[EventBase, EventContext]]
     backfilled: bool
     deferred: ObservableDeferred
 
+    parent_opentracing_span_contexts: List = []
+    """A list of opentracing spans waiting for this batch"""
+
+    opentracing_span_context: Any = None
+    """The opentracing span under which the persistence actually happened"""
+
 
 _PersistResult = TypeVar("_PersistResult")
 
@@ -171,9 +179,27 @@ class _EventPeristenceQueue(Generic[_PersistResult]):
             )
             queue.append(end_item)
 
+        # add our events to the queue item
         end_item.events_and_contexts.extend(events_and_contexts)
+
+        # also add our active opentracing span to the item so that we get a link back
+        span = opentracing.active_span()
+        if span:
+            end_item.parent_opentracing_span_contexts.append(span.context)
+
+        # start a processor for the queue, if there isn't one already
         self._handle_queue(room_id)
-        return await make_deferred_yieldable(end_item.deferred.observe())
+
+        # wait for the queue item to complete
+        res = await make_deferred_yieldable(end_item.deferred.observe())
+
+        # add another opentracing span which links to the persist trace.
+        with opentracing.start_active_span_follows_from(
+            "persist_event_batch_complete", (end_item.opentracing_span_context,)
+        ):
+            pass
+
+        return res
 
     def _handle_queue(self, room_id):
         """Attempts to handle the queue for a room if not already being handled.
@@ -200,9 +226,17 @@ class _EventPeristenceQueue(Generic[_PersistResult]):
                 queue = self._get_drainining_queue(room_id)
                 for item in queue:
                     try:
-                        ret = await self._per_item_callback(
-                            item.events_and_contexts, item.backfilled
-                        )
+                        with opentracing.start_active_span_follows_from(
+                            "persist_event_batch",
+                            item.parent_opentracing_span_contexts,
+                            inherit_force_tracing=True,
+                        ) as scope:
+                            if scope:
+                                item.opentracing_span_context = scope.span.context
+
+                            ret = await self._per_item_callback(
+                                item.events_and_contexts, item.backfilled
+                            )
                     except Exception:
                         with PreserveLoggingContext():
                             item.deferred.errback()
@@ -252,6 +286,7 @@ class EventsPersistenceStorage:
         self._event_persist_queue = _EventPeristenceQueue(self._persist_event_batch)
         self._state_resolution_handler = hs.get_state_resolution_handler()
 
+    @opentracing.trace
     async def persist_events(
         self,
         events_and_contexts: Iterable[Tuple[EventBase, EventContext]],
@@ -307,6 +342,7 @@ class EventsPersistenceStorage:
             self.main_store.get_room_max_token(),
         )
 
+    @opentracing.trace
     async def persist_event(
         self, event: EventBase, context: EventContext, backfilled: bool = False
     ) -> Tuple[EventBase, PersistedEventPosition, RoomStreamToken]: