From c3f2f0f063e5d97e1ae396b85330ea249499f077 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 10 Jan 2024 12:29:42 +0000 Subject: Faster partial join to room with complex auth graph (#7) Instead of persisting outliers in a bunch of batches, let's just do them all at once. This is fine because all `_auth_and_persist_outliers_inner` is doing is checking the auth rules for each event, which requires the events to be topologically sorted by the auth graph. --- synapse/handlers/federation_event.py | 79 ++++++++++++++---------------------- 1 file changed, 30 insertions(+), 49 deletions(-) (limited to 'synapse/handlers/federation_event.py') diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 882be905db..398f19eec0 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -94,7 +94,7 @@ from synapse.types import ( ) from synapse.types.state import StateFilter from synapse.util.async_helpers import Linearizer, concurrently_execute -from synapse.util.iterutils import batch_iter, partition, sorted_topologically_batched +from synapse.util.iterutils import batch_iter, partition, sorted_topologically from synapse.util.retryutils import NotRetryingDestination from synapse.util.stringutils import shortstr @@ -1678,57 +1678,36 @@ class FederationEventHandler: # We need to persist an event's auth events before the event. auth_graph = { - ev: [event_map[e_id] for e_id in ev.auth_event_ids() if e_id in event_map] + ev.event_id: [e_id for e_id in ev.auth_event_ids() if e_id in event_map] for ev in event_map.values() } - for roots in sorted_topologically_batched(event_map.values(), auth_graph): - if not roots: - # if *none* of the remaining events are ready, that means - # we have a loop. This either means a bug in our logic, or that - # somebody has managed to create a loop (which requires finding a - # hash collision in room v2 and later). - logger.warning( - "Loop found in auth events while fetching missing state/auth " - "events: %s", - shortstr(event_map.keys()), - ) - return - - logger.info( - "Persisting %i of %i remaining outliers: %s", - len(roots), - len(event_map), - shortstr(e.event_id for e in roots), - ) - - await self._auth_and_persist_outliers_inner(room_id, roots) - - async def _auth_and_persist_outliers_inner( - self, room_id: str, fetched_events: Collection[EventBase] - ) -> None: - """Helper for _auth_and_persist_outliers - - Persists a batch of events where we have (theoretically) already persisted all - of their auth events. - - Marks the events as outliers, auths them, persists them to the database, and, - where appropriate (eg, an invite), awakes the notifier. + sorted_auth_event_ids = sorted_topologically(event_map.keys(), auth_graph) + sorted_auth_events = [event_map[e_id] for e_id in sorted_auth_event_ids] + logger.info( + "Persisting %i remaining outliers: %s", + len(sorted_auth_events), + shortstr(e.event_id for e in sorted_auth_events), + ) - Params: - origin: where the events came from - room_id: the room that the events are meant to be in (though this has - not yet been checked) - fetched_events: the events to persist - """ # get all the auth events for all the events in this batch. By now, they should # have been persisted. - auth_events = { - aid for event in fetched_events for aid in event.auth_event_ids() + auth_event_ids = { + aid for event in sorted_auth_events for aid in event.auth_event_ids() + } + auth_map = { + ev.event_id: ev + for ev in sorted_auth_events + if ev.event_id in auth_event_ids } - persisted_events = await self._store.get_events( - auth_events, - allow_rejected=True, - ) + + missing_events = auth_event_ids.difference(auth_map) + if missing_events: + persisted_events = await self._store.get_events( + missing_events, + allow_rejected=True, + redact_behaviour=EventRedactBehaviour.as_is, + ) + auth_map.update(persisted_events) events_and_contexts_to_persist: List[Tuple[EventBase, EventContext]] = [] @@ -1736,7 +1715,7 @@ class FederationEventHandler: with nested_logging_context(suffix=event.event_id): auth = [] for auth_event_id in event.auth_event_ids(): - ae = persisted_events.get(auth_event_id) + ae = auth_map.get(auth_event_id) if not ae: # the fact we can't find the auth event doesn't mean it doesn't # exist, which means it is premature to reject `event`. Instead we @@ -1755,7 +1734,9 @@ class FederationEventHandler: context = EventContext.for_outlier(self._storage_controllers) try: validate_event_for_room_version(event) - await check_state_independent_auth_rules(self._store, event) + await check_state_independent_auth_rules( + self._store, event, batched_auth_events=auth_map + ) check_state_dependent_auth_rules(event, auth) except AuthError as e: logger.warning("Rejecting %r because %s", event, e) @@ -1772,7 +1753,7 @@ class FederationEventHandler: events_and_contexts_to_persist.append((event, context)) - for event in fetched_events: + for event in sorted_auth_events: await prep(event) await self.persist_events_and_notify( -- cgit 1.5.1 From 0a96fa52a214d0cc707dbdf9693118ceddbfc150 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 10 Jan 2024 14:42:13 +0000 Subject: Pull less state out if we fail to backfill (#16788) Sometimes we fail to fetch events during backfill due to missing state, and we often end up querying the same bad events periodically (as people backpaginate). In such cases its likely we will continue to fail to get the state, and therefore we should try *before* loading the state that we have from the DB (as otherwise it's wasted DB and memory). --------- Co-authored-by: reivilibre --- changelog.d/16788.misc | 1 + synapse/handlers/federation_event.py | 21 ++++++++++++--------- 2 files changed, 13 insertions(+), 9 deletions(-) create mode 100644 changelog.d/16788.misc (limited to 'synapse/handlers/federation_event.py') diff --git a/changelog.d/16788.misc b/changelog.d/16788.misc new file mode 100644 index 0000000000..e58a5a7a32 --- /dev/null +++ b/changelog.d/16788.misc @@ -0,0 +1 @@ +Pull less state out of the DB when we retry fetching old events during backfill. diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 398f19eec0..12837429b9 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -1141,16 +1141,8 @@ class FederationEventHandler: partial_state_flags = await self._store.get_partial_state_events(seen) partial_state = any(partial_state_flags.values()) - # Get the state of the events we know about - ours = await self._state_storage_controller.get_state_groups_ids( - room_id, seen, await_full_state=False - ) - # state_maps is a list of mappings from (type, state_key) to event_id - state_maps: List[StateMap[str]] = list(ours.values()) - - # we don't need this any more, let's delete it. - del ours + state_maps: List[StateMap[str]] = [] # Ask the remote server for the states we don't # know about @@ -1169,6 +1161,17 @@ class FederationEventHandler: state_maps.append(remote_state_map) + # Get the state of the events we know about. We do this *after* + # trying to fetch missing state over federation as that might fail + # and then we can skip loading the local state. + ours = await self._state_storage_controller.get_state_groups_ids( + room_id, seen, await_full_state=False + ) + state_maps.extend(ours.values()) + + # we don't need this any more, let's delete it. + del ours + room_version = await self._store.get_room_version_id(room_id) state_map = await self._state_resolution_handler.resolve_events_with_store( room_id, -- cgit 1.5.1