From 13855c59160b472de2eabd5f9363e9cfbf3e3439 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 5 Aug 2022 20:44:21 -0500 Subject: More tracing for federated side --- synapse/handlers/federation_event.py | 28 ++++++- synapse/logging/tracing.py | 85 ++++++++++++---------- synapse/storage/controllers/persist_events.py | 17 ++++- synapse/storage/databases/main/event_federation.py | 1 + synapse/storage/databases/main/events.py | 2 + synapse/storage/databases/main/events_worker.py | 5 ++ 6 files changed, 97 insertions(+), 41 deletions(-) (limited to 'synapse') diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 740a04aad6..60436fe497 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -59,7 +59,7 @@ from synapse.events import EventBase from synapse.events.snapshot import EventContext from synapse.federation.federation_client import InvalidResponseError from synapse.logging.context import nested_logging_context -from synapse.logging.tracing import trace +from synapse.logging.tracing import trace, tag_args, set_attribute, SynapseTags from synapse.metrics.background_process_metrics import run_as_background_process from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet from synapse.replication.http.federation import ( @@ -410,6 +410,7 @@ class FederationEventHandler: prev_member_event, ) + @trace async def process_remote_join( self, origin: str, @@ -753,6 +754,7 @@ class FederationEventHandler: await self._process_pulled_event(origin, ev, backfilled=backfilled) @trace + @tag_args async def _process_pulled_event( self, origin: str, event: EventBase, backfilled: bool ) -> None: @@ -854,6 +856,7 @@ class FederationEventHandler: else: raise + @trace async def _compute_event_context_with_maybe_missing_prevs( self, dest: str, event: EventBase ) -> EventContext: @@ -970,6 +973,8 @@ class FederationEventHandler: event, state_ids_before_event=state_map, partial_state=partial_state ) + @trace + @tag_args async def _get_state_ids_after_missing_prev_event( self, destination: str, @@ -1112,6 +1117,8 @@ class FederationEventHandler: return state_map + @trace + @tag_args async def _get_state_and_persist( self, destination: str, room_id: str, event_id: str ) -> None: @@ -1133,6 +1140,7 @@ class FederationEventHandler: destination=destination, room_id=room_id, event_ids=(event_id,) ) + @trace async def _process_received_pdu( self, origin: str, @@ -1283,6 +1291,7 @@ class FederationEventHandler: except Exception: logger.exception("Failed to resync device for %s", sender) + @trace async def _handle_marker_event(self, origin: str, marker_event: EventBase) -> None: """Handles backfilling the insertion event when we receive a marker event that points to one. @@ -1414,6 +1423,8 @@ class FederationEventHandler: return event_from_response + @trace + @tag_args async def _get_events_and_persist( self, destination: str, room_id: str, event_ids: Collection[str] ) -> None: @@ -1459,6 +1470,7 @@ class FederationEventHandler: logger.info("Fetched %i events of %i requested", len(events), len(event_ids)) await self._auth_and_persist_outliers(room_id, events) + @trace async def _auth_and_persist_outliers( self, room_id: str, events: Iterable[EventBase] ) -> None: @@ -1477,6 +1489,11 @@ class FederationEventHandler: """ event_map = {event.event_id: event for event in events} + set_attribute( + SynapseTags.FUNC_ARG_PREFIX + "event_ids", + event_map.keys(), + ) + # filter out any events we have already seen. This might happen because # the events were eagerly pushed to us (eg, during a room join), or because # another thread has raced against us since we decided to request the event. @@ -1593,6 +1610,7 @@ class FederationEventHandler: backfilled=True, ) + @trace async def _check_event_auth( self, origin: Optional[str], event: EventBase, context: EventContext ) -> None: @@ -1631,6 +1649,9 @@ class FederationEventHandler: claimed_auth_events = await self._load_or_fetch_auth_events_for_event( origin, event ) + set_attribute( + "claimed_auth_events", [ev.event_id for ev in claimed_auth_events] + ) # ... and check that the event passes auth at those auth events. # https://spec.matrix.org/v1.3/server-server-api/#checks-performed-on-receipt-of-a-pdu: @@ -1728,6 +1749,7 @@ class FederationEventHandler: ) context.rejected = RejectedReason.AUTH_ERROR + @trace async def _maybe_kick_guest_users(self, event: EventBase) -> None: if event.type != EventTypes.GuestAccess: return @@ -1935,6 +1957,8 @@ class FederationEventHandler: # instead we raise an AuthError, which will make the caller ignore it. raise AuthError(code=HTTPStatus.FORBIDDEN, msg="Auth events could not be found") + @trace + @tag_args async def _get_remote_auth_chain_for_event( self, destination: str, room_id: str, event_id: str ) -> None: @@ -1963,6 +1987,7 @@ class FederationEventHandler: await self._auth_and_persist_outliers(room_id, remote_auth_events) + @trace async def _run_push_actions_and_persist_event( self, event: EventBase, context: EventContext, backfilled: bool = False ) -> None: @@ -2008,6 +2033,7 @@ class FederationEventHandler: await self._store.remove_push_actions_from_staging(event.event_id) raise + @trace async def persist_events_and_notify( self, room_id: str, diff --git a/synapse/logging/tracing.py b/synapse/logging/tracing.py index 2f4555cc2e..cb557a147d 100644 --- a/synapse/logging/tracing.py +++ b/synapse/logging/tracing.py @@ -281,6 +281,16 @@ class SynapseTags: # The name of the external cache CACHE_NAME = "cache.name" + # Used to tag function arguments + # + # Tag a named arg. The name of the argument should be appended to this + # prefix + FUNC_ARG_PREFIX = "ARG." + # Tag extra variadic number of positional arguments (`def foo(first, second, *extras)`) + FUNC_ARGS = "args" + # Tag keyword args + FUNC_KWARGS = "kwargs" + class SynapseBaggage: FORCE_TRACING = "synapse-force-tracing" @@ -790,30 +800,28 @@ def extract_text_map( # Tracing decorators -def create_decorator( +def _custom_sync_async_decorator( func: Callable[P, R], - # TODO: What is the correct type for these `Any`? `P.args, P.kwargs` isn't allowed here wrapping_logic: Callable[[Callable[P, R], Any, Any], ContextManager[None]], ) -> Callable[P, R]: """ - Creates a decorator that is able to handle sync functions, async functions - (coroutines), and inlineDeferred from Twisted. - + Decorates a function that is sync or async (coroutines), or that returns a Twisted + `Deferred`. The custom business logic of the decorator goes in `wrapping_logic`. Example usage: ```py - # Decorator to time the functiona and log it out + # Decorator to time the function and log it out def duration(func: Callable[P, R]) -> Callable[P, R]: @contextlib.contextmanager - def _wrapping_logic(func: Callable[P, R], *args: P.args, **kwargs: P.kwargs): + def _wrapping_logic(func: Callable[P, R], *args: P.args, **kwargs: P.kwargs) -> Generator[None, None, None]: start_ts = time.time() - yield - end_ts = time.time() - duration = end_ts - start_ts - logger.info("%s took %s seconds", func.__name__, duration) - - return create_decorator(func, _wrapping_logic) + try: + yield + finally: + end_ts = time.time() + duration = end_ts - start_ts + logger.info("%s took %s seconds", func.__name__, duration) + return _custom_sync_async_decorator(func, _wrapping_logic) ``` - Args: func: The function to be decorated wrapping_logic: The business logic of your custom decorator. @@ -821,14 +829,18 @@ def create_decorator( before/after the function as desired. """ - @wraps(func) - async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> R: - if inspect.iscoroutinefunction(func): + if inspect.iscoroutinefunction(func): + + @wraps(func) + async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> R: with wrapping_logic(func, *args, **kwargs): - return await func(*args, **kwargs) - else: - # The other case here handles both sync functions and those - # decorated with inlineDeferred. + return await func(*args, **kwargs) # type: ignore[misc] + + else: + # The other case here handles both sync functions and those + # decorated with inlineDeferred. + @wraps(func) + def _wrapper(*args: P.args, **kwargs: P.kwargs) -> R: scope = wrapping_logic(func, *args, **kwargs) scope.__enter__() @@ -866,7 +878,11 @@ def create_decorator( return _wrapper # type: ignore[return-value] -def trace_with_opname(opname: str) -> Callable[[Callable[P, R]], Callable[P, R]]: +def trace_with_opname( + opname: str, + *, + tracer: Optional["opentelemetry.trace.Tracer"] = None, +) -> Callable[[Callable[P, R]], Callable[P, R]]: """ Decorator to trace a function with a custom opname. @@ -875,21 +891,14 @@ def trace_with_opname(opname: str) -> Callable[[Callable[P, R]], Callable[P, R]] @contextlib.contextmanager def _wrapping_logic(func: Callable[P, R], *args: P.args, **kwargs: P.kwargs): - if opentelemetry is None: - return None - - scope = start_active_span(opname) - scope.__enter__() - try: + with start_active_span(opname, tracer=tracer): yield - except Exception as e: - scope.__exit__(type(e), None, e.__traceback__) - raise - finally: - scope.__exit__(None, None, None) def _decorator(func: Callable[P, R]): - return create_decorator(func, _wrapping_logic) + if not opentelemetry: + return func + + return _custom_sync_async_decorator(func, _wrapping_logic) return _decorator @@ -918,12 +927,12 @@ def tag_args(func: Callable[P, R]) -> Callable[P, R]: def _wrapping_logic(func: Callable[P, R], *args: P.args, **kwargs: P.kwargs): argspec = inspect.getfullargspec(func) for i, arg in enumerate(args[1:]): - set_attribute("ARG_" + argspec.args[i + 1], str(arg)) # type: ignore[index] - set_attribute("args", str(args[len(argspec.args) :])) # type: ignore[index] - set_attribute("kwargs", str(kwargs)) + set_attribute(SynapseTags.FUNC_ARG_PREFIX + argspec.args[i + 1], str(arg)) # type: ignore[index] + set_attribute(SynapseTags.FUNC_ARGS, str(args[len(argspec.args) :])) # type: ignore[index] + set_attribute(SynapseTags.FUNC_KWARGS, str(kwargs)) yield - return create_decorator(func, _wrapping_logic) + return _custom_sync_async_decorator(func, _wrapping_logic) @contextlib.contextmanager diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py index f87f5098a5..0bdb213286 100644 --- a/synapse/storage/controllers/persist_events.py +++ b/synapse/storage/controllers/persist_events.py @@ -46,7 +46,14 @@ from synapse.api.constants import EventTypes, Membership from synapse.events import EventBase from synapse.events.snapshot import EventContext from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable -from synapse.logging.tracing import Link, get_active_span, start_active_span, trace +from synapse.logging.tracing import ( + Link, + get_active_span, + set_attribute, + start_active_span, + trace, + SynapseTags, +) from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.controllers.state import StateStorageController from synapse.storage.databases import Databases @@ -383,6 +390,12 @@ class EventsPersistenceStorageController: PartialStateConflictError: if attempting to persist a partial state event in a room that has been un-partial stated. """ + set_attribute( + SynapseTags.FUNC_ARG_PREFIX + "event_ids", + [e.event_id for e, _ in events_and_contexts], + ) + set_attribute(SynapseTags.FUNC_ARG_PREFIX + "backfilled", str(backfilled)) + partitioned: Dict[str, List[Tuple[EventBase, EventContext]]] = {} for event, ctx in events_and_contexts: partitioned.setdefault(event.room_id, []).append((event, ctx)) @@ -781,7 +794,7 @@ class EventsPersistenceStorageController: stale_forward_extremities_counter.observe(len(stale)) return result - + async def _get_new_state_after_events( self, room_id: str, diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index 3fcb2069df..178536b10f 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -1381,6 +1381,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas _delete_old_forward_extrem_cache_txn, ) + @trace async def insert_insertion_extremity(self, event_id: str, room_id: str) -> None: await self.db_pool.simple_upsert( table="insertion_event_extremities", diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 1f600f1190..21ba7a540e 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -40,6 +40,7 @@ from synapse.api.errors import Codes, SynapseError from synapse.api.room_versions import RoomVersions from synapse.events import EventBase, relation_from_event from synapse.events.snapshot import EventContext +from synapse.logging.tracing import trace from synapse.storage._base import db_to_json, make_in_list_sql_clause from synapse.storage.database import ( DatabasePool, @@ -145,6 +146,7 @@ class PersistEventsStore: self._backfill_id_gen: AbstractStreamIdGenerator = self.store._backfill_id_gen self._stream_id_gen: AbstractStreamIdGenerator = self.store._stream_id_gen + @trace async def _persist_events_and_state_updates( self, events_and_contexts: List[Tuple[EventBase, EventContext]], diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 29c99c6357..449cb03276 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -54,6 +54,7 @@ from synapse.logging.context import ( current_context, make_deferred_yieldable, ) +from synapse.logging.tracing import trace, tag_args from synapse.metrics.background_process_metrics import ( run_as_background_process, wrap_as_background_process, @@ -394,6 +395,8 @@ class EventsWorkerStore(SQLBaseStore): return event + @trace + @tag_args async def get_events( self, event_ids: Collection[str], @@ -1363,6 +1366,8 @@ class EventsWorkerStore(SQLBaseStore): return {r["event_id"] for r in rows} + @trace + @tag_args async def have_seen_events( self, room_id: str, event_ids: Iterable[str] ) -> Set[str]: -- cgit 1.4.1