diff --git a/changelog.d/7.misc b/changelog.d/7.misc
new file mode 100644
index 0000000000..63f1fb77ff
--- /dev/null
+++ b/changelog.d/7.misc
@@ -0,0 +1 @@
+Faster partial join to room with complex auth graph.
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index 882be905db..398f19eec0 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -94,7 +94,7 @@ from synapse.types import (
)
from synapse.types.state import StateFilter
from synapse.util.async_helpers import Linearizer, concurrently_execute
-from synapse.util.iterutils import batch_iter, partition, sorted_topologically_batched
+from synapse.util.iterutils import batch_iter, partition, sorted_topologically
from synapse.util.retryutils import NotRetryingDestination
from synapse.util.stringutils import shortstr
@@ -1678,57 +1678,36 @@ class FederationEventHandler:
# We need to persist an event's auth events before the event.
auth_graph = {
- ev: [event_map[e_id] for e_id in ev.auth_event_ids() if e_id in event_map]
+ ev.event_id: [e_id for e_id in ev.auth_event_ids() if e_id in event_map]
for ev in event_map.values()
}
- for roots in sorted_topologically_batched(event_map.values(), auth_graph):
- if not roots:
- # if *none* of the remaining events are ready, that means
- # we have a loop. This either means a bug in our logic, or that
- # somebody has managed to create a loop (which requires finding a
- # hash collision in room v2 and later).
- logger.warning(
- "Loop found in auth events while fetching missing state/auth "
- "events: %s",
- shortstr(event_map.keys()),
- )
- return
-
- logger.info(
- "Persisting %i of %i remaining outliers: %s",
- len(roots),
- len(event_map),
- shortstr(e.event_id for e in roots),
- )
-
- await self._auth_and_persist_outliers_inner(room_id, roots)
-
- async def _auth_and_persist_outliers_inner(
- self, room_id: str, fetched_events: Collection[EventBase]
- ) -> None:
- """Helper for _auth_and_persist_outliers
-
- Persists a batch of events where we have (theoretically) already persisted all
- of their auth events.
-
- Marks the events as outliers, auths them, persists them to the database, and,
- where appropriate (eg, an invite), awakes the notifier.
+ sorted_auth_event_ids = sorted_topologically(event_map.keys(), auth_graph)
+ sorted_auth_events = [event_map[e_id] for e_id in sorted_auth_event_ids]
+ logger.info(
+ "Persisting %i remaining outliers: %s",
+ len(sorted_auth_events),
+ shortstr(e.event_id for e in sorted_auth_events),
+ )
- Params:
- origin: where the events came from
- room_id: the room that the events are meant to be in (though this has
- not yet been checked)
- fetched_events: the events to persist
- """
# get all the auth events for all the events in this batch. By now, they should
# have been persisted.
- auth_events = {
- aid for event in fetched_events for aid in event.auth_event_ids()
+ auth_event_ids = {
+ aid for event in sorted_auth_events for aid in event.auth_event_ids()
+ }
+ auth_map = {
+ ev.event_id: ev
+ for ev in sorted_auth_events
+ if ev.event_id in auth_event_ids
}
- persisted_events = await self._store.get_events(
- auth_events,
- allow_rejected=True,
- )
+
+ missing_events = auth_event_ids.difference(auth_map)
+ if missing_events:
+ persisted_events = await self._store.get_events(
+ missing_events,
+ allow_rejected=True,
+ redact_behaviour=EventRedactBehaviour.as_is,
+ )
+ auth_map.update(persisted_events)
events_and_contexts_to_persist: List[Tuple[EventBase, EventContext]] = []
@@ -1736,7 +1715,7 @@ class FederationEventHandler:
with nested_logging_context(suffix=event.event_id):
auth = []
for auth_event_id in event.auth_event_ids():
- ae = persisted_events.get(auth_event_id)
+ ae = auth_map.get(auth_event_id)
if not ae:
# the fact we can't find the auth event doesn't mean it doesn't
# exist, which means it is premature to reject `event`. Instead we
@@ -1755,7 +1734,9 @@ class FederationEventHandler:
context = EventContext.for_outlier(self._storage_controllers)
try:
validate_event_for_room_version(event)
- await check_state_independent_auth_rules(self._store, event)
+ await check_state_independent_auth_rules(
+ self._store, event, batched_auth_events=auth_map
+ )
check_state_dependent_auth_rules(event, auth)
except AuthError as e:
logger.warning("Rejecting %r because %s", event, e)
@@ -1772,7 +1753,7 @@ class FederationEventHandler:
events_and_contexts_to_persist.append((event, context))
- for event in fetched_events:
+ for event in sorted_auth_events:
await prep(event)
await self.persist_events_and_notify(
|