summary refs log tree commit diff
path: root/synapse/handlers/federation_event.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2022-08-24 09:58:29 +0100
committerGitHub <noreply@github.com>2022-08-24 09:58:29 +0100
commitcbb157548676865793f39b4da0b7f3fa5ee01058 (patch)
tree77ff41ddcb9b01938b4efdd109cf90981c86a23e /synapse/handlers/federation_event.py
parentNewsfile (diff)
parentInstrument `_check_sigs_and_hash_and_fetch` to trace time spent in child conc... (diff)
downloadsynapse-cbb157548676865793f39b4da0b7f3fa5ee01058.tar.xz
Merge branch 'develop' into erikj/less_state_membership github/erikj/less_state_membership erikj/less_state_membership
Diffstat (limited to 'synapse/handlers/federation_event.py')
-rw-r--r--synapse/handlers/federation_event.py399
1 files changed, 315 insertions, 84 deletions
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index a5f4ce7c8a..048c4111f6 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -29,7 +29,7 @@ from typing import (
     Tuple,
 )
 
-from prometheus_client import Counter
+from prometheus_client import Counter, Histogram
 
 from synapse import event_auth
 from synapse.api.constants import (
@@ -59,6 +59,13 @@ from synapse.events import EventBase
 from synapse.events.snapshot import EventContext
 from synapse.federation.federation_client import InvalidResponseError
 from synapse.logging.context import nested_logging_context
+from synapse.logging.opentracing import (
+    SynapseTags,
+    set_tag,
+    start_active_span,
+    tag_args,
+    trace,
+)
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
 from synapse.replication.http.federation import (
@@ -91,6 +98,36 @@ soft_failed_event_counter = Counter(
     "Events received over federation that we marked as soft_failed",
 )
 
+# Added to debug performance and track progress on optimizations
+backfill_processing_after_timer = Histogram(
+    "synapse_federation_backfill_processing_after_time_seconds",
+    "sec",
+    [],
+    buckets=(
+        0.1,
+        0.25,
+        0.5,
+        1.0,
+        2.5,
+        5.0,
+        7.5,
+        10.0,
+        15.0,
+        20.0,
+        25.0,
+        30.0,
+        40.0,
+        50.0,
+        60.0,
+        80.0,
+        100.0,
+        120.0,
+        150.0,
+        180.0,
+        "+Inf",
+    ),
+)
+
 
 class FederationEventHandler:
     """Handles events that originated from federation.
@@ -278,7 +315,8 @@ class FederationEventHandler:
                 )
 
         try:
-            await self._process_received_pdu(origin, pdu, state_ids=None)
+            context = await self._state_handler.compute_event_context(pdu)
+            await self._process_received_pdu(origin, pdu, context)
         except PartialStateConflictError:
             # The room was un-partial stated while we were processing the PDU.
             # Try once more, with full state this time.
@@ -286,7 +324,8 @@ class FederationEventHandler:
                 "Room %s was un-partial stated while processing the PDU, trying again.",
                 room_id,
             )
-            await self._process_received_pdu(origin, pdu, state_ids=None)
+            context = await self._state_handler.compute_event_context(pdu)
+            await self._process_received_pdu(origin, pdu, context)
 
     async def on_send_membership_event(
         self, origin: str, event: EventBase
@@ -316,6 +355,7 @@ class FederationEventHandler:
             The event and context of the event after inserting it into the room graph.
 
         Raises:
+            RuntimeError if any prev_events are missing
             SynapseError if the event is not accepted into the room
             PartialStateConflictError if the room was un-partial stated in between
                 computing the state at the event and persisting it. The caller should
@@ -376,7 +416,7 @@ class FederationEventHandler:
         # need to.
         await self._event_creation_handler.cache_joined_hosts_for_event(event, context)
 
-        await self._check_for_soft_fail(event, None, origin=origin)
+        await self._check_for_soft_fail(event, context=context, origin=origin)
         await self._run_push_actions_and_persist_event(event, context)
         return event, context
 
@@ -406,6 +446,7 @@ class FederationEventHandler:
             prev_member_event,
         )
 
+    @trace
     async def process_remote_join(
         self,
         origin: str,
@@ -534,32 +575,36 @@ class FederationEventHandler:
             #
             # This is the same operation as we do when we receive a regular event
             # over federation.
-            state_ids = await self._resolve_state_at_missing_prevs(destination, event)
-
-            # build a new state group for it if need be
-            context = await self._state_handler.compute_event_context(
-                event,
-                state_ids_before_event=state_ids,
+            context = await self._compute_event_context_with_maybe_missing_prevs(
+                destination, event
             )
             if context.partial_state:
                 # this can happen if some or all of the event's prev_events still have
-                # partial state - ie, an event has an earlier stream_ordering than one
-                # or more of its prev_events, so we de-partial-state it before its
-                # prev_events.
+                # partial state. We were careful to only pick events from the db without
+                # partial-state prev events, so that implies that a prev event has
+                # been persisted (with partial state) since we did the query.
                 #
-                # TODO(faster_joins): we probably need to be more intelligent, and
-                #    exclude partial-state prev_events from consideration
-                #    https://github.com/matrix-org/synapse/issues/13001
+                # So, let's just ignore `event` for now; when we re-run the db query
+                # we should instead get its partial-state prev event, which we will
+                # de-partial-state, and then come back to event.
                 logger.warning(
-                    "%s still has partial state: can't de-partial-state it yet",
+                    "%s still has prev_events with partial state: can't de-partial-state it yet",
                     event.event_id,
                 )
                 return
+
+            # since the state at this event has changed, we should now re-evaluate
+            # whether it should have been rejected. We must already have all of the
+            # auth events (from last time we went round this path), so there is no
+            # need to pass the origin.
+            await self._check_event_auth(None, event, context)
+
             await self._store.update_state_for_partial_state_event(event, context)
             self._state_storage_controller.notify_event_un_partial_stated(
                 event.event_id
             )
 
+    @trace
     async def backfill(
         self, dest: str, room_id: str, limit: int, extremities: Collection[str]
     ) -> None:
@@ -589,21 +634,23 @@ class FederationEventHandler:
         if not events:
             return
 
-        # if there are any events in the wrong room, the remote server is buggy and
-        # should not be trusted.
-        for ev in events:
-            if ev.room_id != room_id:
-                raise InvalidResponseError(
-                    f"Remote server {dest} returned event {ev.event_id} which is in "
-                    f"room {ev.room_id}, when we were backfilling in {room_id}"
-                )
+        with backfill_processing_after_timer.time():
+            # if there are any events in the wrong room, the remote server is buggy and
+            # should not be trusted.
+            for ev in events:
+                if ev.room_id != room_id:
+                    raise InvalidResponseError(
+                        f"Remote server {dest} returned event {ev.event_id} which is in "
+                        f"room {ev.room_id}, when we were backfilling in {room_id}"
+                    )
 
-        await self._process_pulled_events(
-            dest,
-            events,
-            backfilled=True,
-        )
+            await self._process_pulled_events(
+                dest,
+                events,
+                backfilled=True,
+            )
 
+    @trace
     async def _get_missing_events_for_pdu(
         self, origin: str, pdu: EventBase, prevs: Set[str], min_depth: int
     ) -> None:
@@ -704,8 +751,9 @@ class FederationEventHandler:
         logger.info("Got %d prev_events", len(missing_events))
         await self._process_pulled_events(origin, missing_events, backfilled=False)
 
+    @trace
     async def _process_pulled_events(
-        self, origin: str, events: Iterable[EventBase], backfilled: bool
+        self, origin: str, events: Collection[EventBase], backfilled: bool
     ) -> None:
         """Process a batch of events we have pulled from a remote server
 
@@ -720,6 +768,15 @@ class FederationEventHandler:
             backfilled: True if this is part of a historical batch of events (inhibits
                 notification to clients, and validation of device keys.)
         """
+        set_tag(
+            SynapseTags.FUNC_ARG_PREFIX + "event_ids",
+            str([event.event_id for event in events]),
+        )
+        set_tag(
+            SynapseTags.FUNC_ARG_PREFIX + "event_ids.length",
+            str(len(events)),
+        )
+        set_tag(SynapseTags.FUNC_ARG_PREFIX + "backfilled", str(backfilled))
         logger.debug(
             "processing pulled backfilled=%s events=%s",
             backfilled,
@@ -742,6 +799,8 @@ class FederationEventHandler:
             with nested_logging_context(ev.event_id):
                 await self._process_pulled_event(origin, ev, backfilled=backfilled)
 
+    @trace
+    @tag_args
     async def _process_pulled_event(
         self, origin: str, event: EventBase, backfilled: bool
     ) -> None:
@@ -793,7 +852,7 @@ class FederationEventHandler:
         if existing:
             if not existing.internal_metadata.is_outlier():
                 logger.info(
-                    "Ignoring received event %s which we have already seen",
+                    "_process_pulled_event: Ignoring received event %s which we have already seen",
                     event_id,
                 )
                 return
@@ -806,29 +865,56 @@ class FederationEventHandler:
             return
 
         try:
-            state_ids = await self._resolve_state_at_missing_prevs(origin, event)
-            # TODO(faster_joins): make sure that _resolve_state_at_missing_prevs does
-            #   not return partial state
-            #   https://github.com/matrix-org/synapse/issues/13002
+            try:
+                context = await self._compute_event_context_with_maybe_missing_prevs(
+                    origin, event
+                )
+                await self._process_received_pdu(
+                    origin,
+                    event,
+                    context,
+                    backfilled=backfilled,
+                )
+            except PartialStateConflictError:
+                # The room was un-partial stated while we were processing the event.
+                # Try once more, with full state this time.
+                context = await self._compute_event_context_with_maybe_missing_prevs(
+                    origin, event
+                )
 
-            await self._process_received_pdu(
-                origin, event, state_ids=state_ids, backfilled=backfilled
-            )
+                # We ought to have full state now, barring some unlikely race where we left and
+                # rejoned the room in the background.
+                if context.partial_state:
+                    raise AssertionError(
+                        f"Event {event.event_id} still has a partial resolved state "
+                        f"after room {event.room_id} was un-partial stated"
+                    )
+
+                await self._process_received_pdu(
+                    origin,
+                    event,
+                    context,
+                    backfilled=backfilled,
+                )
         except FederationError as e:
             if e.code == 403:
                 logger.warning("Pulled event %s failed history check.", event_id)
             else:
                 raise
 
-    async def _resolve_state_at_missing_prevs(
+    @trace
+    async def _compute_event_context_with_maybe_missing_prevs(
         self, dest: str, event: EventBase
-    ) -> Optional[StateMap[str]]:
-        """Calculate the state at an event with missing prev_events.
+    ) -> EventContext:
+        """Build an EventContext structure for a non-outlier event whose prev_events may
+        be missing.
 
-        This is used when we have pulled a batch of events from a remote server, and
-        still don't have all the prev_events.
+        This is used when we have pulled a batch of events from a remote server, and may
+        not have all the prev_events.
 
-        If we already have all the prev_events for `event`, this method does nothing.
+        To build an EventContext, we need to calculate the state before the event. If we
+        already have all the prev_events for `event`, we can simply use the state after
+        the prev_events to calculate the state before `event`.
 
         Otherwise, the missing prevs become new backwards extremities, and we fall back
         to asking the remote server for the state after each missing `prev_event`,
@@ -849,8 +935,7 @@ class FederationEventHandler:
             event: an event to check for missing prevs.
 
         Returns:
-            if we already had all the prev events, `None`. Otherwise, returns
-            the event ids of the state at `event`.
+            The event context.
 
         Raises:
             FederationError if we fail to get the state from the remote server after any
@@ -864,7 +949,7 @@ class FederationEventHandler:
         missing_prevs = prevs - seen
 
         if not missing_prevs:
-            return None
+            return await self._state_handler.compute_event_context(event)
 
         logger.info(
             "Event %s is missing prev_events %s: calculating state for a "
@@ -876,9 +961,15 @@ class FederationEventHandler:
         # resolve them to find the correct state at the current event.
 
         try:
+            # Determine whether we may be about to retrieve partial state
+            # Events may be un-partial stated right after we compute the partial state
+            # flag, but that's okay, as long as the flag errs on the conservative side.
+            partial_state_flags = await self._store.get_partial_state_events(seen)
+            partial_state = any(partial_state_flags.values())
+
             # Get the state of the events we know about
             ours = await self._state_storage_controller.get_state_groups_ids(
-                room_id, seen
+                room_id, seen, await_full_state=False
             )
 
             # state_maps is a list of mappings from (type, state_key) to event_id
@@ -924,8 +1015,12 @@ class FederationEventHandler:
                 "We can't get valid state history.",
                 affected=event_id,
             )
-        return state_map
+        return await self._state_handler.compute_event_context(
+            event, state_ids_before_event=state_map, partial_state=partial_state
+        )
 
+    @trace
+    @tag_args
     async def _get_state_ids_after_missing_prev_event(
         self,
         destination: str,
@@ -965,10 +1060,10 @@ class FederationEventHandler:
         logger.debug("Fetching %i events from cache/store", len(desired_events))
         have_events = await self._store.have_seen_events(room_id, desired_events)
 
-        missing_desired_events = desired_events - have_events
+        missing_desired_event_ids = desired_events - have_events
         logger.debug(
             "We are missing %i events (got %i)",
-            len(missing_desired_events),
+            len(missing_desired_event_ids),
             len(have_events),
         )
 
@@ -980,13 +1075,30 @@ class FederationEventHandler:
         #   already have a bunch of the state events. It would be nice if the
         #   federation api gave us a way of finding out which we actually need.
 
-        missing_auth_events = set(auth_event_ids) - have_events
-        missing_auth_events.difference_update(
-            await self._store.have_seen_events(room_id, missing_auth_events)
+        missing_auth_event_ids = set(auth_event_ids) - have_events
+        missing_auth_event_ids.difference_update(
+            await self._store.have_seen_events(room_id, missing_auth_event_ids)
         )
-        logger.debug("We are also missing %i auth events", len(missing_auth_events))
+        logger.debug("We are also missing %i auth events", len(missing_auth_event_ids))
+
+        missing_event_ids = missing_desired_event_ids | missing_auth_event_ids
 
-        missing_events = missing_desired_events | missing_auth_events
+        set_tag(
+            SynapseTags.RESULT_PREFIX + "missing_auth_event_ids",
+            str(missing_auth_event_ids),
+        )
+        set_tag(
+            SynapseTags.RESULT_PREFIX + "missing_auth_event_ids.length",
+            str(len(missing_auth_event_ids)),
+        )
+        set_tag(
+            SynapseTags.RESULT_PREFIX + "missing_desired_event_ids",
+            str(missing_desired_event_ids),
+        )
+        set_tag(
+            SynapseTags.RESULT_PREFIX + "missing_desired_event_ids.length",
+            str(len(missing_desired_event_ids)),
+        )
 
         # Making an individual request for each of 1000s of events has a lot of
         # overhead. On the other hand, we don't really want to fetch all of the events
@@ -997,13 +1109,13 @@ class FederationEventHandler:
         #
         # TODO: might it be better to have an API which lets us do an aggregate event
         #   request
-        if (len(missing_events) * 10) >= len(auth_event_ids) + len(state_event_ids):
+        if (len(missing_event_ids) * 10) >= len(auth_event_ids) + len(state_event_ids):
             logger.debug("Requesting complete state from remote")
             await self._get_state_and_persist(destination, room_id, event_id)
         else:
-            logger.debug("Fetching %i events from remote", len(missing_events))
+            logger.debug("Fetching %i events from remote", len(missing_event_ids))
             await self._get_events_and_persist(
-                destination=destination, room_id=room_id, event_ids=missing_events
+                destination=destination, room_id=room_id, event_ids=missing_event_ids
             )
 
         # We now need to fill out the state map, which involves fetching the
@@ -1060,6 +1172,14 @@ class FederationEventHandler:
                 event_id,
                 failed_to_fetch,
             )
+            set_tag(
+                SynapseTags.RESULT_PREFIX + "failed_to_fetch",
+                str(failed_to_fetch),
+            )
+            set_tag(
+                SynapseTags.RESULT_PREFIX + "failed_to_fetch.length",
+                str(len(failed_to_fetch)),
+            )
 
         if remote_event.is_state() and remote_event.rejected_reason is None:
             state_map[
@@ -1068,6 +1188,8 @@ class FederationEventHandler:
 
         return state_map
 
+    @trace
+    @tag_args
     async def _get_state_and_persist(
         self, destination: str, room_id: str, event_id: str
     ) -> None:
@@ -1089,11 +1211,12 @@ class FederationEventHandler:
                 destination=destination, room_id=room_id, event_ids=(event_id,)
             )
 
+    @trace
     async def _process_received_pdu(
         self,
         origin: str,
         event: EventBase,
-        state_ids: Optional[StateMap[str]],
+        context: EventContext,
         backfilled: bool = False,
     ) -> None:
         """Called when we have a new non-outlier event.
@@ -1115,24 +1238,18 @@ class FederationEventHandler:
 
             event: event to be persisted
 
-            state_ids: Normally None, but if we are handling a gap in the graph
-                (ie, we are missing one or more prev_events), the resolved state at the
-                event. Must not be partial state.
+            context: The `EventContext` to persist the event with.
 
             backfilled: True if this is part of a historical batch of events (inhibits
                 notification to clients, and validation of device keys.)
 
         PartialStateConflictError: if the room was un-partial stated in between
-            computing the state at the event and persisting it. The caller should retry
-            exactly once in this case. Will never be raised if `state_ids` is provided.
+            computing the state at the event and persisting it. The caller should
+            recompute `context` and retry exactly once when this happens.
         """
         logger.debug("Processing event: %s", event)
         assert not event.internal_metadata.outlier
 
-        context = await self._state_handler.compute_event_context(
-            event,
-            state_ids_before_event=state_ids,
-        )
         try:
             await self._check_event_auth(origin, event, context)
         except AuthError as e:
@@ -1144,7 +1261,7 @@ class FederationEventHandler:
             # For new (non-backfilled and non-outlier) events we check if the event
             # passes auth based on the current state. If it doesn't then we
             # "soft-fail" the event.
-            await self._check_for_soft_fail(event, state_ids, origin=origin)
+            await self._check_for_soft_fail(event, context=context, origin=origin)
 
         await self._run_push_actions_and_persist_event(event, context, backfilled)
 
@@ -1245,6 +1362,7 @@ class FederationEventHandler:
         except Exception:
             logger.exception("Failed to resync device for %s", sender)
 
+    @trace
     async def _handle_marker_event(self, origin: str, marker_event: EventBase) -> None:
         """Handles backfilling the insertion event when we receive a marker
         event that points to one.
@@ -1276,7 +1394,7 @@ class FederationEventHandler:
         logger.debug("_handle_marker_event: received %s", marker_event)
 
         insertion_event_id = marker_event.content.get(
-            EventContentFields.MSC2716_MARKER_INSERTION
+            EventContentFields.MSC2716_INSERTION_EVENT_REFERENCE
         )
 
         if insertion_event_id is None:
@@ -1329,6 +1447,55 @@ class FederationEventHandler:
             marker_event,
         )
 
+    async def backfill_event_id(
+        self, destination: str, room_id: str, event_id: str
+    ) -> EventBase:
+        """Backfill a single event and persist it as a non-outlier which means
+        we also pull in all of the state and auth events necessary for it.
+
+        Args:
+            destination: The homeserver to pull the given event_id from.
+            room_id: The room where the event is from.
+            event_id: The event ID to backfill.
+
+        Raises:
+            FederationError if we are unable to find the event from the destination
+        """
+        logger.info(
+            "backfill_event_id: event_id=%s from destination=%s", event_id, destination
+        )
+
+        room_version = await self._store.get_room_version(room_id)
+
+        event_from_response = await self._federation_client.get_pdu(
+            [destination],
+            event_id,
+            room_version,
+        )
+
+        if not event_from_response:
+            raise FederationError(
+                "ERROR",
+                404,
+                "Unable to find event_id=%s from destination=%s to backfill."
+                % (event_id, destination),
+                affected=event_id,
+            )
+
+        # Persist the event we just fetched, including pulling all of the state
+        # and auth events to de-outlier it. This also sets up the necessary
+        # `state_groups` for the event.
+        await self._process_pulled_events(
+            destination,
+            [event_from_response],
+            # Prevent notifications going to clients
+            backfilled=True,
+        )
+
+        return event_from_response
+
+    @trace
+    @tag_args
     async def _get_events_and_persist(
         self, destination: str, room_id: str, event_ids: Collection[str]
     ) -> None:
@@ -1374,6 +1541,7 @@ class FederationEventHandler:
         logger.info("Fetched %i events of %i requested", len(events), len(event_ids))
         await self._auth_and_persist_outliers(room_id, events)
 
+    @trace
     async def _auth_and_persist_outliers(
         self, room_id: str, events: Iterable[EventBase]
     ) -> None:
@@ -1392,6 +1560,16 @@ class FederationEventHandler:
         """
         event_map = {event.event_id: event for event in events}
 
+        event_ids = event_map.keys()
+        set_tag(
+            SynapseTags.FUNC_ARG_PREFIX + "event_ids",
+            str(event_ids),
+        )
+        set_tag(
+            SynapseTags.FUNC_ARG_PREFIX + "event_ids.length",
+            str(len(event_ids)),
+        )
+
         # filter out any events we have already seen. This might happen because
         # the events were eagerly pushed to us (eg, during a room join), or because
         # another thread has raced against us since we decided to request the event.
@@ -1508,14 +1686,17 @@ class FederationEventHandler:
             backfilled=True,
         )
 
+    @trace
     async def _check_event_auth(
-        self, origin: str, event: EventBase, context: EventContext
+        self, origin: Optional[str], event: EventBase, context: EventContext
     ) -> None:
         """
         Checks whether an event should be rejected (for failing auth checks).
 
         Args:
-            origin: The host the event originates from.
+            origin: The host the event originates from. This is used to fetch
+               any missing auth events. It can be set to None, but only if we are
+               sure that we already have all the auth events.
             event: The event itself.
             context:
                 The event context.
@@ -1544,6 +1725,14 @@ class FederationEventHandler:
         claimed_auth_events = await self._load_or_fetch_auth_events_for_event(
             origin, event
         )
+        set_tag(
+            SynapseTags.RESULT_PREFIX + "claimed_auth_events",
+            str([ev.event_id for ev in claimed_auth_events]),
+        )
+        set_tag(
+            SynapseTags.RESULT_PREFIX + "claimed_auth_events.length",
+            str(len(claimed_auth_events)),
+        )
 
         # ... and check that the event passes auth at those auth events.
         # https://spec.matrix.org/v1.3/server-server-api/#checks-performed-on-receipt-of-a-pdu:
@@ -1641,6 +1830,7 @@ class FederationEventHandler:
             )
             context.rejected = RejectedReason.AUTH_ERROR
 
+    @trace
     async def _maybe_kick_guest_users(self, event: EventBase) -> None:
         if event.type != EventTypes.GuestAccess:
             return
@@ -1658,17 +1848,27 @@ class FederationEventHandler:
     async def _check_for_soft_fail(
         self,
         event: EventBase,
-        state_ids: Optional[StateMap[str]],
+        context: EventContext,
         origin: str,
     ) -> None:
         """Checks if we should soft fail the event; if so, marks the event as
         such.
 
+        Does nothing for events in rooms with partial state, since we may not have an
+        accurate membership event for the sender in the current state.
+
         Args:
             event
-            state_ids: The state at the event if we don't have all the event's prev events
+            context: The `EventContext` which we are about to persist the event with.
             origin: The host the event originates from.
         """
+        if await self._store.is_partial_state_room(event.room_id):
+            # We might not know the sender's membership in the current state, so don't
+            # soft fail anything. Even if we do have a membership for the sender in the
+            # current state, it may have been derived from state resolution between
+            # partial and full state and may not be accurate.
+            return
+
         extrem_ids_list = await self._store.get_latest_event_ids_in_room(event.room_id)
         extrem_ids = set(extrem_ids_list)
         prev_event_ids = set(event.prev_event_ids())
@@ -1685,11 +1885,15 @@ class FederationEventHandler:
         auth_types = auth_types_for_event(room_version_obj, event)
 
         # Calculate the "current state".
-        if state_ids is not None:
-            # If we're explicitly given the state then we won't have all the
-            # prev events, and so we have a gap in the graph. In this case
-            # we want to be a little careful as we might have been down for
-            # a while and have an incorrect view of the current state,
+        seen_event_ids = await self._store.have_events_in_timeline(prev_event_ids)
+        has_missing_prevs = bool(prev_event_ids - seen_event_ids)
+        if has_missing_prevs:
+            # We don't have all the prev_events of this event, which means we have a
+            # gap in the graph, and the new event is going to become a new backwards
+            # extremity.
+            #
+            # In this case we want to be a little careful as we might have been
+            # down for a while and have an incorrect view of the current state,
             # however we still want to do checks as gaps are easy to
             # maliciously manufacture.
             #
@@ -1702,6 +1906,7 @@ class FederationEventHandler:
                 event.room_id, extrem_ids
             )
             state_sets: List[StateMap[str]] = list(state_sets_d.values())
+            state_ids = await context.get_prev_state_ids()
             state_sets.append(state_ids)
             current_state_ids = (
                 await self._state_resolution_handler.resolve_events_with_store(
@@ -1751,7 +1956,7 @@ class FederationEventHandler:
             event.internal_metadata.soft_failed = True
 
     async def _load_or_fetch_auth_events_for_event(
-        self, destination: str, event: EventBase
+        self, destination: Optional[str], event: EventBase
     ) -> Collection[EventBase]:
         """Fetch this event's auth_events, from database or remote
 
@@ -1767,12 +1972,19 @@ class FederationEventHandler:
         Args:
             destination: where to send the /event_auth request. Typically the server
                that sent us `event` in the first place.
+
+               If this is None, no attempt is made to load any missing auth events:
+               rather, an AssertionError is raised if there are any missing events.
+
             event: the event whose auth_events we want
 
         Returns:
             all of the events listed in `event.auth_events_ids`, after deduplication
 
         Raises:
+            AssertionError if some auth events were missing and no `destination` was
+            supplied.
+
             AuthError if we were unable to fetch the auth_events for any reason.
         """
         event_auth_event_ids = set(event.auth_event_ids())
@@ -1784,6 +1996,13 @@ class FederationEventHandler:
         )
         if not missing_auth_event_ids:
             return event_auth_events.values()
+        if destination is None:
+            # this shouldn't happen: destination must be set unless we know we have already
+            # persisted the auth events.
+            raise AssertionError(
+                "_load_or_fetch_auth_events_for_event() called with no destination for "
+                "an event with missing auth_events"
+            )
 
         logger.info(
             "Event %s refers to unknown auth events %s: fetching auth chain",
@@ -1819,6 +2038,8 @@ class FederationEventHandler:
         # instead we raise an AuthError, which will make the caller ignore it.
         raise AuthError(code=HTTPStatus.FORBIDDEN, msg="Auth events could not be found")
 
+    @trace
+    @tag_args
     async def _get_remote_auth_chain_for_event(
         self, destination: str, room_id: str, event_id: str
     ) -> None:
@@ -1847,6 +2068,7 @@ class FederationEventHandler:
 
         await self._auth_and_persist_outliers(room_id, remote_auth_events)
 
+    @trace
     async def _run_push_actions_and_persist_event(
         self, event: EventBase, context: EventContext, backfilled: bool = False
     ) -> None:
@@ -1955,8 +2177,17 @@ class FederationEventHandler:
                     self._message_handler.maybe_schedule_expiry(event)
 
             if not backfilled:  # Never notify for backfilled events
-                for event in events:
-                    await self._notify_persisted_event(event, max_stream_token)
+                with start_active_span("notify_persisted_events"):
+                    set_tag(
+                        SynapseTags.RESULT_PREFIX + "event_ids",
+                        str([ev.event_id for ev in events]),
+                    )
+                    set_tag(
+                        SynapseTags.RESULT_PREFIX + "event_ids.length",
+                        str(len(events)),
+                    )
+                    for event in events:
+                        await self._notify_persisted_event(event, max_stream_token)
 
             return max_stream_token.stream