summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/handlers/federation_event.py6
-rw-r--r--synapse/storage/controllers/persist_events.py13
-rw-r--r--synapse/storage/databases/main/events_worker.py19
3 files changed, 27 insertions, 11 deletions
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py

index 61882fb40b..578b494185 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py
@@ -61,8 +61,8 @@ from synapse.federation.federation_client import InvalidResponseError from synapse.logging.context import nested_logging_context from synapse.logging.tracing import ( SynapseTags, - start_active_span, set_attribute, + start_active_span, tag_args, trace, ) @@ -722,7 +722,7 @@ class FederationEventHandler: @trace async def _process_pulled_events( - self, origin: str, events: Iterable[EventBase], backfilled: bool + self, origin: str, events: List[EventBase], backfilled: bool ) -> None: """Process a batch of events we have pulled from a remote server @@ -1662,7 +1662,7 @@ class FederationEventHandler: origin, event ) set_attribute( - "claimed_auth_events", [ev.event_id for ev in claimed_auth_events] + "claimed_auth_events", str([ev.event_id for ev in claimed_auth_events]) ) # ... and check that the event passes auth at those auth events. diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py
index 7cb69fa4f3..8a039dbc83 100644 --- a/synapse/storage/controllers/persist_events.py +++ b/synapse/storage/controllers/persist_events.py
@@ -390,15 +390,18 @@ class EventsPersistenceStorageController: PartialStateConflictError: if attempting to persist a partial state event in a room that has been un-partial stated. """ - set_attribute( - SynapseTags.FUNC_ARG_PREFIX + f"event_ids ({len(events_and_contexts)})", - str([e.event_id for e, _ in events_and_contexts]), - ) - set_attribute(SynapseTags.FUNC_ARG_PREFIX + "backfilled", str(backfilled)) + event_ids: List[str] = [] partitioned: Dict[str, List[Tuple[EventBase, EventContext]]] = {} for event, ctx in events_and_contexts: partitioned.setdefault(event.room_id, []).append((event, ctx)) + event_ids.append(event.event_id) + + set_attribute( + SynapseTags.FUNC_ARG_PREFIX + f"event_ids ({len(event_ids)})", + str(event_ids), + ) + set_attribute(SynapseTags.FUNC_ARG_PREFIX + "backfilled", str(backfilled)) async def enqueue( item: Tuple[str, List[Tuple[EventBase, EventContext]]] diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 00f9298a9f..c547ba7afd 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py
@@ -54,7 +54,7 @@ from synapse.logging.context import ( current_context, make_deferred_yieldable, ) -from synapse.logging.tracing import tag_args, trace +from synapse.logging.tracing import start_active_span, tag_args, trace from synapse.metrics.background_process_metrics import ( run_as_background_process, wrap_as_background_process, @@ -395,8 +395,6 @@ class EventsWorkerStore(SQLBaseStore): return event - @trace - @tag_args async def get_events( self, event_ids: Collection[str], @@ -433,6 +431,8 @@ class EventsWorkerStore(SQLBaseStore): return {e.event_id: e for e in events} + @trace + @tag_args async def get_events_as_list( self, event_ids: Collection[str], @@ -1034,6 +1034,11 @@ class EventsWorkerStore(SQLBaseStore): fetched_events: Dict[str, _EventRow] = {} events_to_fetch = event_ids + is_recording_redaction_trace = False + fetching_redactions_tracing_span_cm = start_active_span( + "recursively fetching redactions" + ) + while events_to_fetch: row_map = await self._enqueue_events(events_to_fetch) @@ -1049,6 +1054,14 @@ class EventsWorkerStore(SQLBaseStore): events_to_fetch = redaction_ids.difference(fetched_event_ids) if events_to_fetch: logger.debug("Also fetching redaction events %s", events_to_fetch) + # Start tracing how long it takes for us to get all of the redactions + if not is_recording_redaction_trace: + fetching_redactions_tracing_span_cm.__enter__() + is_recording_redaction_trace = True + + # Only stop recording if we were recording in the first place + if is_recording_redaction_trace: + fetching_redactions_tracing_span_cm.__exit__(None, None, None) # build a map from event_id to EventBase event_map: Dict[str, EventBase] = {}