summary refs log tree commit diff
path: root/synapse/handlers/federation_event.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/federation_event.py')
-rw-r--r--synapse/handlers/federation_event.py101
1 files changed, 65 insertions, 36 deletions
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index 69f8287b2b..9e188bb51c 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -818,7 +818,7 @@ class FederationEventHandler:
         missing_events = missing_desired_events | missing_auth_events
         logger.debug("Fetching %i events from remote", len(missing_events))
         await self._get_events_and_persist(
-            destination=destination, room_id=room_id, events=missing_events
+            destination=destination, room_id=room_id, event_ids=missing_events
         )
 
         # we need to make sure we re-load from the database to get the rejected
@@ -1085,12 +1085,12 @@ class FederationEventHandler:
         )
 
     async def _get_events_and_persist(
-        self, destination: str, room_id: str, events: Iterable[str]
+        self, destination: str, room_id: str, event_ids: Collection[str]
     ) -> None:
         """Fetch the given events from a server, and persist them as outliers.
 
         This function *does not* recursively get missing auth events of the
-        newly fetched events. Callers must include in the `events` argument
+        newly fetched events. Callers must include in the `event_ids` argument
         any missing events from the auth chain.
 
         Logs a warning if we can't find the given event.
@@ -1127,28 +1127,78 @@ class FederationEventHandler:
                         e,
                     )
 
-        await concurrently_execute(get_event, events, 5)
+        await concurrently_execute(get_event, event_ids, 5)
+        logger.info("Fetched %i events of %i requested", len(event_map), len(event_ids))
 
-        # Make a map of auth events for each event. We do this after fetching
-        # all the events as some of the events' auth events will be in the list
-        # of requested events.
+        # we now need to auth the events in an order which ensures that each event's
+        # auth_events are authed before the event itself.
+        #
+        # XXX: it might be possible to kick this process off in parallel with fetching
+        # the events.
+        while event_map:
+            # build a list of events whose auth events are not in the queue.
+            roots = tuple(
+                ev
+                for ev in event_map.values()
+                if not any(aid in event_map for aid in ev.auth_event_ids())
+            )
 
-        auth_events = [
-            aid
-            for event in event_map.values()
-            for aid in event.auth_event_ids()
-            if aid not in event_map
-        ]
+            if not roots:
+                # if *none* of the remaining events are ready, that means
+                # we have a loop. This either means a bug in our logic, or that
+                # somebody has managed to create a loop (which requires finding a
+                # hash collision in room v2 and later).
+                logger.warning(
+                    "Loop found in auth events while fetching missing state/auth "
+                    "events: %s",
+                    shortstr(event_map.keys()),
+                )
+                return
+
+            logger.info(
+                "Persisting %i of %i remaining events", len(roots), len(event_map)
+            )
+
+            await self._auth_and_persist_fetched_events(destination, room_id, roots)
+
+            for ev in roots:
+                del event_map[ev.event_id]
+
+    async def _auth_and_persist_fetched_events(
+        self, origin: str, room_id: str, fetched_events: Collection[EventBase]
+    ) -> None:
+        """Persist the events fetched by _get_events_and_persist.
+
+        The events should not depend on one another, e.g. this should be used to persist
+        a bunch of outliers, but not a chunk of individual events that depend
+        on each other for state calculations.
+
+        We also assume that all of the auth events for all of the events have already
+        been persisted.
+
+        Notifies about the events where appropriate.
+
+        Params:
+            origin: where the events came from
+            room_id: the room that the events are meant to be in (though this has
+               not yet been checked)
+            event_id: map from event_id -> event for the fetched events
+        """
+        # get all the auth events for all the events in this batch. By now, they should
+        # have been persisted.
+        auth_events = {
+            aid for event in fetched_events for aid in event.auth_event_ids()
+        }
         persisted_events = await self._store.get_events(
             auth_events,
             allow_rejected=True,
         )
 
         event_infos = []
-        for event in event_map.values():
+        for event in fetched_events:
             auth = {}
             for auth_event_id in event.auth_event_ids():
-                ae = persisted_events.get(auth_event_id) or event_map.get(auth_event_id)
+                ae = persisted_events.get(auth_event_id)
                 if ae:
                     auth[(ae.type, ae.state_key)] = ae
                 else:
@@ -1156,27 +1206,6 @@ class FederationEventHandler:
 
             event_infos.append(_NewEventInfo(event, auth))
 
-        if event_infos:
-            await self._auth_and_persist_events(
-                destination,
-                room_id,
-                event_infos,
-            )
-
-    async def _auth_and_persist_events(
-        self,
-        origin: str,
-        room_id: str,
-        event_infos: Collection[_NewEventInfo],
-    ) -> None:
-        """Creates the appropriate contexts and persists events. The events
-        should not depend on one another, e.g. this should be used to persist
-        a bunch of outliers, but not a chunk of individual events that depend
-        on each other for state calculations.
-
-        Notifies about the events where appropriate.
-        """
-
         if not event_infos:
             return