summary refs log tree commit diff
path: root/synapse/handlers/federation_event.py
diff options
context:
space:
mode:
authorErik Johnston <erikj@element.io>2024-01-10 12:29:42 +0000
committerGitHub <noreply@github.com>2024-01-10 12:29:42 +0000
commitc3f2f0f063e5d97e1ae396b85330ea249499f077 (patch)
tree45a2b9055e2c918927f07f9a10d91bb4c2a9e4bc /synapse/handlers/federation_event.py
parentBump authlib from 1.2.1 to 1.3.0 (#16801) (diff)
downloadsynapse-c3f2f0f063e5d97e1ae396b85330ea249499f077.tar.xz
Faster partial join to room with complex auth graph (#7)
Instead of persisting outliers in a bunch of batches, let's just do them
all at once.

This is fine because all `_auth_and_persist_outliers_inner` is doing is
checking the auth rules for each event, which requires the events to be
topologically sorted by the auth graph.
Diffstat (limited to '')
-rw-r--r--synapse/handlers/federation_event.py79
1 files changed, 30 insertions, 49 deletions
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index 882be905db..398f19eec0 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -94,7 +94,7 @@ from synapse.types import (
 )
 from synapse.types.state import StateFilter
 from synapse.util.async_helpers import Linearizer, concurrently_execute
-from synapse.util.iterutils import batch_iter, partition, sorted_topologically_batched
+from synapse.util.iterutils import batch_iter, partition, sorted_topologically
 from synapse.util.retryutils import NotRetryingDestination
 from synapse.util.stringutils import shortstr
 
@@ -1678,57 +1678,36 @@ class FederationEventHandler:
 
         # We need to persist an event's auth events before the event.
         auth_graph = {
-            ev: [event_map[e_id] for e_id in ev.auth_event_ids() if e_id in event_map]
+            ev.event_id: [e_id for e_id in ev.auth_event_ids() if e_id in event_map]
             for ev in event_map.values()
         }
-        for roots in sorted_topologically_batched(event_map.values(), auth_graph):
-            if not roots:
-                # if *none* of the remaining events are ready, that means
-                # we have a loop. This either means a bug in our logic, or that
-                # somebody has managed to create a loop (which requires finding a
-                # hash collision in room v2 and later).
-                logger.warning(
-                    "Loop found in auth events while fetching missing state/auth "
-                    "events: %s",
-                    shortstr(event_map.keys()),
-                )
-                return
-
-            logger.info(
-                "Persisting %i of %i remaining outliers: %s",
-                len(roots),
-                len(event_map),
-                shortstr(e.event_id for e in roots),
-            )
-
-            await self._auth_and_persist_outliers_inner(room_id, roots)
-
-    async def _auth_and_persist_outliers_inner(
-        self, room_id: str, fetched_events: Collection[EventBase]
-    ) -> None:
-        """Helper for _auth_and_persist_outliers
-
-        Persists a batch of events where we have (theoretically) already persisted all
-        of their auth events.
-
-        Marks the events as outliers, auths them, persists them to the database, and,
-        where appropriate (eg, an invite), awakes the notifier.
+        sorted_auth_event_ids = sorted_topologically(event_map.keys(), auth_graph)
+        sorted_auth_events = [event_map[e_id] for e_id in sorted_auth_event_ids]
+        logger.info(
+            "Persisting %i remaining outliers: %s",
+            len(sorted_auth_events),
+            shortstr(e.event_id for e in sorted_auth_events),
+        )
 
-        Params:
-            origin: where the events came from
-            room_id: the room that the events are meant to be in (though this has
-               not yet been checked)
-            fetched_events: the events to persist
-        """
         # get all the auth events for all the events in this batch. By now, they should
         # have been persisted.
-        auth_events = {
-            aid for event in fetched_events for aid in event.auth_event_ids()
+        auth_event_ids = {
+            aid for event in sorted_auth_events for aid in event.auth_event_ids()
+        }
+        auth_map = {
+            ev.event_id: ev
+            for ev in sorted_auth_events
+            if ev.event_id in auth_event_ids
         }
-        persisted_events = await self._store.get_events(
-            auth_events,
-            allow_rejected=True,
-        )
+
+        missing_events = auth_event_ids.difference(auth_map)
+        if missing_events:
+            persisted_events = await self._store.get_events(
+                missing_events,
+                allow_rejected=True,
+                redact_behaviour=EventRedactBehaviour.as_is,
+            )
+            auth_map.update(persisted_events)
 
         events_and_contexts_to_persist: List[Tuple[EventBase, EventContext]] = []
 
@@ -1736,7 +1715,7 @@ class FederationEventHandler:
             with nested_logging_context(suffix=event.event_id):
                 auth = []
                 for auth_event_id in event.auth_event_ids():
-                    ae = persisted_events.get(auth_event_id)
+                    ae = auth_map.get(auth_event_id)
                     if not ae:
                         # the fact we can't find the auth event doesn't mean it doesn't
                         # exist, which means it is premature to reject `event`. Instead we
@@ -1755,7 +1734,9 @@ class FederationEventHandler:
                 context = EventContext.for_outlier(self._storage_controllers)
                 try:
                     validate_event_for_room_version(event)
-                    await check_state_independent_auth_rules(self._store, event)
+                    await check_state_independent_auth_rules(
+                        self._store, event, batched_auth_events=auth_map
+                    )
                     check_state_dependent_auth_rules(event, auth)
                 except AuthError as e:
                     logger.warning("Rejecting %r because %s", event, e)
@@ -1772,7 +1753,7 @@ class FederationEventHandler:
 
             events_and_contexts_to_persist.append((event, context))
 
-        for event in fetched_events:
+        for event in sorted_auth_events:
             await prep(event)
 
         await self.persist_events_and_notify(