summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorEric Eastwood <erice@element.io>2022-08-05 20:44:21 -0500
committerEric Eastwood <erice@element.io>2022-08-05 20:44:21 -0500
commit13855c59160b472de2eabd5f9363e9cfbf3e3439 (patch)
tree598f038605617a6c14792bf1d29893e12d496fe6 /synapse
parentTrace more (diff)
downloadsynapse-13855c59160b472de2eabd5f9363e9cfbf3e3439.tar.xz
More tracing for federated side
Diffstat (limited to 'synapse')
-rw-r--r--synapse/handlers/federation_event.py28
-rw-r--r--synapse/logging/tracing.py85
-rw-r--r--synapse/storage/controllers/persist_events.py17
-rw-r--r--synapse/storage/databases/main/event_federation.py1
-rw-r--r--synapse/storage/databases/main/events.py2
-rw-r--r--synapse/storage/databases/main/events_worker.py5
6 files changed, 97 insertions, 41 deletions
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index 740a04aad6..60436fe497 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -59,7 +59,7 @@ from synapse.events import EventBase
 from synapse.events.snapshot import EventContext
 from synapse.federation.federation_client import InvalidResponseError
 from synapse.logging.context import nested_logging_context
-from synapse.logging.tracing import trace
+from synapse.logging.tracing import trace, tag_args, set_attribute, SynapseTags
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
 from synapse.replication.http.federation import (
@@ -410,6 +410,7 @@ class FederationEventHandler:
             prev_member_event,
         )
 
+    @trace
     async def process_remote_join(
         self,
         origin: str,
@@ -753,6 +754,7 @@ class FederationEventHandler:
                 await self._process_pulled_event(origin, ev, backfilled=backfilled)
 
     @trace
+    @tag_args
     async def _process_pulled_event(
         self, origin: str, event: EventBase, backfilled: bool
     ) -> None:
@@ -854,6 +856,7 @@ class FederationEventHandler:
             else:
                 raise
 
+    @trace
     async def _compute_event_context_with_maybe_missing_prevs(
         self, dest: str, event: EventBase
     ) -> EventContext:
@@ -970,6 +973,8 @@ class FederationEventHandler:
             event, state_ids_before_event=state_map, partial_state=partial_state
         )
 
+    @trace
+    @tag_args
     async def _get_state_ids_after_missing_prev_event(
         self,
         destination: str,
@@ -1112,6 +1117,8 @@ class FederationEventHandler:
 
         return state_map
 
+    @trace
+    @tag_args
     async def _get_state_and_persist(
         self, destination: str, room_id: str, event_id: str
     ) -> None:
@@ -1133,6 +1140,7 @@ class FederationEventHandler:
                 destination=destination, room_id=room_id, event_ids=(event_id,)
             )
 
+    @trace
     async def _process_received_pdu(
         self,
         origin: str,
@@ -1283,6 +1291,7 @@ class FederationEventHandler:
         except Exception:
             logger.exception("Failed to resync device for %s", sender)
 
+    @trace
     async def _handle_marker_event(self, origin: str, marker_event: EventBase) -> None:
         """Handles backfilling the insertion event when we receive a marker
         event that points to one.
@@ -1414,6 +1423,8 @@ class FederationEventHandler:
 
         return event_from_response
 
+    @trace
+    @tag_args
     async def _get_events_and_persist(
         self, destination: str, room_id: str, event_ids: Collection[str]
     ) -> None:
@@ -1459,6 +1470,7 @@ class FederationEventHandler:
         logger.info("Fetched %i events of %i requested", len(events), len(event_ids))
         await self._auth_and_persist_outliers(room_id, events)
 
+    @trace
     async def _auth_and_persist_outliers(
         self, room_id: str, events: Iterable[EventBase]
     ) -> None:
@@ -1477,6 +1489,11 @@ class FederationEventHandler:
         """
         event_map = {event.event_id: event for event in events}
 
+        set_attribute(
+            SynapseTags.FUNC_ARG_PREFIX + "event_ids",
+            event_map.keys(),
+        )
+
         # filter out any events we have already seen. This might happen because
         # the events were eagerly pushed to us (eg, during a room join), or because
         # another thread has raced against us since we decided to request the event.
@@ -1593,6 +1610,7 @@ class FederationEventHandler:
             backfilled=True,
         )
 
+    @trace
     async def _check_event_auth(
         self, origin: Optional[str], event: EventBase, context: EventContext
     ) -> None:
@@ -1631,6 +1649,9 @@ class FederationEventHandler:
         claimed_auth_events = await self._load_or_fetch_auth_events_for_event(
             origin, event
         )
+        set_attribute(
+            "claimed_auth_events", [ev.event_id for ev in claimed_auth_events]
+        )
 
         # ... and check that the event passes auth at those auth events.
         # https://spec.matrix.org/v1.3/server-server-api/#checks-performed-on-receipt-of-a-pdu:
@@ -1728,6 +1749,7 @@ class FederationEventHandler:
             )
             context.rejected = RejectedReason.AUTH_ERROR
 
+    @trace
     async def _maybe_kick_guest_users(self, event: EventBase) -> None:
         if event.type != EventTypes.GuestAccess:
             return
@@ -1935,6 +1957,8 @@ class FederationEventHandler:
         # instead we raise an AuthError, which will make the caller ignore it.
         raise AuthError(code=HTTPStatus.FORBIDDEN, msg="Auth events could not be found")
 
+    @trace
+    @tag_args
     async def _get_remote_auth_chain_for_event(
         self, destination: str, room_id: str, event_id: str
     ) -> None:
@@ -1963,6 +1987,7 @@ class FederationEventHandler:
 
         await self._auth_and_persist_outliers(room_id, remote_auth_events)
 
+    @trace
     async def _run_push_actions_and_persist_event(
         self, event: EventBase, context: EventContext, backfilled: bool = False
     ) -> None:
@@ -2008,6 +2033,7 @@ class FederationEventHandler:
             await self._store.remove_push_actions_from_staging(event.event_id)
             raise
 
+    @trace
     async def persist_events_and_notify(
         self,
         room_id: str,
diff --git a/synapse/logging/tracing.py b/synapse/logging/tracing.py
index 2f4555cc2e..cb557a147d 100644
--- a/synapse/logging/tracing.py
+++ b/synapse/logging/tracing.py
@@ -281,6 +281,16 @@ class SynapseTags:
     # The name of the external cache
     CACHE_NAME = "cache.name"
 
+    # Used to tag function arguments
+    #
+    # Tag a named arg. The name of the argument should be appended to this
+    # prefix
+    FUNC_ARG_PREFIX = "ARG."
+    # Tag extra variadic number of positional arguments (`def foo(first, second, *extras)`)
+    FUNC_ARGS = "args"
+    # Tag keyword args
+    FUNC_KWARGS = "kwargs"
+
 
 class SynapseBaggage:
     FORCE_TRACING = "synapse-force-tracing"
@@ -790,30 +800,28 @@ def extract_text_map(
 # Tracing decorators
 
 
-def create_decorator(
+def _custom_sync_async_decorator(
     func: Callable[P, R],
-    # TODO: What is the correct type for these `Any`? `P.args, P.kwargs` isn't allowed here
     wrapping_logic: Callable[[Callable[P, R], Any, Any], ContextManager[None]],
 ) -> Callable[P, R]:
     """
-    Creates a decorator that is able to handle sync functions, async functions
-    (coroutines), and inlineDeferred from Twisted.
-
+    Decorates a function that is sync or async (coroutines), or that returns a Twisted
+    `Deferred`. The custom business logic of the decorator goes in `wrapping_logic`.
     Example usage:
     ```py
-    # Decorator to time the functiona and log it out
+    # Decorator to time the function and log it out
     def duration(func: Callable[P, R]) -> Callable[P, R]:
         @contextlib.contextmanager
-        def _wrapping_logic(func: Callable[P, R], *args: P.args, **kwargs: P.kwargs):
+        def _wrapping_logic(func: Callable[P, R], *args: P.args, **kwargs: P.kwargs) -> Generator[None, None, None]:
             start_ts = time.time()
-            yield
-            end_ts = time.time()
-            duration = end_ts - start_ts
-            logger.info("%s took %s seconds", func.__name__, duration)
-
-        return create_decorator(func, _wrapping_logic)
+            try:
+                yield
+            finally:
+                end_ts = time.time()
+                duration = end_ts - start_ts
+                logger.info("%s took %s seconds", func.__name__, duration)
+        return _custom_sync_async_decorator(func, _wrapping_logic)
     ```
-
     Args:
         func: The function to be decorated
         wrapping_logic: The business logic of your custom decorator.
@@ -821,14 +829,18 @@ def create_decorator(
             before/after the function as desired.
     """
 
-    @wraps(func)
-    async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
-        if inspect.iscoroutinefunction(func):
+    if inspect.iscoroutinefunction(func):
+
+        @wraps(func)
+        async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
             with wrapping_logic(func, *args, **kwargs):
-                return await func(*args, **kwargs)
-        else:
-            # The other case here handles both sync functions and those
-            # decorated with inlineDeferred.
+                return await func(*args, **kwargs)  # type: ignore[misc]
+
+    else:
+        # The other case here handles both sync functions and those
+        # decorated with inlineDeferred.
+        @wraps(func)
+        def _wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
             scope = wrapping_logic(func, *args, **kwargs)
             scope.__enter__()
 
@@ -866,7 +878,11 @@ def create_decorator(
     return _wrapper  # type: ignore[return-value]
 
 
-def trace_with_opname(opname: str) -> Callable[[Callable[P, R]], Callable[P, R]]:
+def trace_with_opname(
+    opname: str,
+    *,
+    tracer: Optional["opentelemetry.trace.Tracer"] = None,
+) -> Callable[[Callable[P, R]], Callable[P, R]]:
     """
     Decorator to trace a function with a custom opname.
 
@@ -875,21 +891,14 @@ def trace_with_opname(opname: str) -> Callable[[Callable[P, R]], Callable[P, R]]
 
     @contextlib.contextmanager
     def _wrapping_logic(func: Callable[P, R], *args: P.args, **kwargs: P.kwargs):
-        if opentelemetry is None:
-            return None
-
-        scope = start_active_span(opname)
-        scope.__enter__()
-        try:
+        with start_active_span(opname, tracer=tracer):
             yield
-        except Exception as e:
-            scope.__exit__(type(e), None, e.__traceback__)
-            raise
-        finally:
-            scope.__exit__(None, None, None)
 
     def _decorator(func: Callable[P, R]):
-        return create_decorator(func, _wrapping_logic)
+        if not opentelemetry:
+            return func
+
+        return _custom_sync_async_decorator(func, _wrapping_logic)
 
     return _decorator
 
@@ -918,12 +927,12 @@ def tag_args(func: Callable[P, R]) -> Callable[P, R]:
     def _wrapping_logic(func: Callable[P, R], *args: P.args, **kwargs: P.kwargs):
         argspec = inspect.getfullargspec(func)
         for i, arg in enumerate(args[1:]):
-            set_attribute("ARG_" + argspec.args[i + 1], str(arg))  # type: ignore[index]
-        set_attribute("args", str(args[len(argspec.args) :]))  # type: ignore[index]
-        set_attribute("kwargs", str(kwargs))
+            set_attribute(SynapseTags.FUNC_ARG_PREFIX + argspec.args[i + 1], str(arg))  # type: ignore[index]
+        set_attribute(SynapseTags.FUNC_ARGS, str(args[len(argspec.args) :]))  # type: ignore[index]
+        set_attribute(SynapseTags.FUNC_KWARGS, str(kwargs))
         yield
 
-    return create_decorator(func, _wrapping_logic)
+    return _custom_sync_async_decorator(func, _wrapping_logic)
 
 
 @contextlib.contextmanager
diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py
index f87f5098a5..0bdb213286 100644
--- a/synapse/storage/controllers/persist_events.py
+++ b/synapse/storage/controllers/persist_events.py
@@ -46,7 +46,14 @@ from synapse.api.constants import EventTypes, Membership
 from synapse.events import EventBase
 from synapse.events.snapshot import EventContext
 from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
-from synapse.logging.tracing import Link, get_active_span, start_active_span, trace
+from synapse.logging.tracing import (
+    Link,
+    get_active_span,
+    set_attribute,
+    start_active_span,
+    trace,
+    SynapseTags,
+)
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage.controllers.state import StateStorageController
 from synapse.storage.databases import Databases
@@ -383,6 +390,12 @@ class EventsPersistenceStorageController:
             PartialStateConflictError: if attempting to persist a partial state event in
                 a room that has been un-partial stated.
         """
+        set_attribute(
+            SynapseTags.FUNC_ARG_PREFIX + "event_ids",
+            [e.event_id for e, _ in events_and_contexts],
+        )
+        set_attribute(SynapseTags.FUNC_ARG_PREFIX + "backfilled", str(backfilled))
+
         partitioned: Dict[str, List[Tuple[EventBase, EventContext]]] = {}
         for event, ctx in events_and_contexts:
             partitioned.setdefault(event.room_id, []).append((event, ctx))
@@ -781,7 +794,7 @@ class EventsPersistenceStorageController:
             stale_forward_extremities_counter.observe(len(stale))
 
         return result
-    
+
     async def _get_new_state_after_events(
         self,
         room_id: str,
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index 3fcb2069df..178536b10f 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -1381,6 +1381,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
             _delete_old_forward_extrem_cache_txn,
         )
 
+    @trace
     async def insert_insertion_extremity(self, event_id: str, room_id: str) -> None:
         await self.db_pool.simple_upsert(
             table="insertion_event_extremities",
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 1f600f1190..21ba7a540e 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -40,6 +40,7 @@ from synapse.api.errors import Codes, SynapseError
 from synapse.api.room_versions import RoomVersions
 from synapse.events import EventBase, relation_from_event
 from synapse.events.snapshot import EventContext
+from synapse.logging.tracing import trace
 from synapse.storage._base import db_to_json, make_in_list_sql_clause
 from synapse.storage.database import (
     DatabasePool,
@@ -145,6 +146,7 @@ class PersistEventsStore:
         self._backfill_id_gen: AbstractStreamIdGenerator = self.store._backfill_id_gen
         self._stream_id_gen: AbstractStreamIdGenerator = self.store._stream_id_gen
 
+    @trace
     async def _persist_events_and_state_updates(
         self,
         events_and_contexts: List[Tuple[EventBase, EventContext]],
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 29c99c6357..449cb03276 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -54,6 +54,7 @@ from synapse.logging.context import (
     current_context,
     make_deferred_yieldable,
 )
+from synapse.logging.tracing import trace, tag_args
 from synapse.metrics.background_process_metrics import (
     run_as_background_process,
     wrap_as_background_process,
@@ -394,6 +395,8 @@ class EventsWorkerStore(SQLBaseStore):
 
         return event
 
+    @trace
+    @tag_args
     async def get_events(
         self,
         event_ids: Collection[str],
@@ -1363,6 +1366,8 @@ class EventsWorkerStore(SQLBaseStore):
 
         return {r["event_id"] for r in rows}
 
+    @trace
+    @tag_args
     async def have_seen_events(
         self, room_id: str, event_ids: Iterable[str]
     ) -> Set[str]: