diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index 69f8287b2b..9e188bb51c 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -818,7 +818,7 @@ class FederationEventHandler:
missing_events = missing_desired_events | missing_auth_events
logger.debug("Fetching %i events from remote", len(missing_events))
await self._get_events_and_persist(
- destination=destination, room_id=room_id, events=missing_events
+ destination=destination, room_id=room_id, event_ids=missing_events
)
# we need to make sure we re-load from the database to get the rejected
@@ -1085,12 +1085,12 @@ class FederationEventHandler:
)
async def _get_events_and_persist(
- self, destination: str, room_id: str, events: Iterable[str]
+ self, destination: str, room_id: str, event_ids: Collection[str]
) -> None:
"""Fetch the given events from a server, and persist them as outliers.
This function *does not* recursively get missing auth events of the
- newly fetched events. Callers must include in the `events` argument
+ newly fetched events. Callers must include in the `event_ids` argument
any missing events from the auth chain.
Logs a warning if we can't find the given event.
@@ -1127,28 +1127,78 @@ class FederationEventHandler:
e,
)
- await concurrently_execute(get_event, events, 5)
+ await concurrently_execute(get_event, event_ids, 5)
+ logger.info("Fetched %i events of %i requested", len(event_map), len(event_ids))
- # Make a map of auth events for each event. We do this after fetching
- # all the events as some of the events' auth events will be in the list
- # of requested events.
+ # we now need to auth the events in an order which ensures that each event's
+ # auth_events are authed before the event itself.
+ #
+ # XXX: it might be possible to kick this process off in parallel with fetching
+ # the events.
+ while event_map:
+ # build a list of events whose auth events are not in the queue.
+ roots = tuple(
+ ev
+ for ev in event_map.values()
+ if not any(aid in event_map for aid in ev.auth_event_ids())
+ )
- auth_events = [
- aid
- for event in event_map.values()
- for aid in event.auth_event_ids()
- if aid not in event_map
- ]
+ if not roots:
+ # if *none* of the remaining events are ready, that means
+ # we have a loop. This either means a bug in our logic, or that
+ # somebody has managed to create a loop (which requires finding a
+ # hash collision in room v2 and later).
+ logger.warning(
+ "Loop found in auth events while fetching missing state/auth "
+ "events: %s",
+ shortstr(event_map.keys()),
+ )
+ return
+
+ logger.info(
+ "Persisting %i of %i remaining events", len(roots), len(event_map)
+ )
+
+ await self._auth_and_persist_fetched_events(destination, room_id, roots)
+
+ for ev in roots:
+ del event_map[ev.event_id]
+
+ async def _auth_and_persist_fetched_events(
+ self, origin: str, room_id: str, fetched_events: Collection[EventBase]
+ ) -> None:
+ """Persist the events fetched by _get_events_and_persist.
+
+ The events should not depend on one another, e.g. this should be used to persist
+ a bunch of outliers, but not a chunk of individual events that depend
+ on each other for state calculations.
+
+ We also assume that all of the auth events for all of the events have already
+ been persisted.
+
+ Notifies about the events where appropriate.
+
+ Params:
+ origin: where the events came from
+ room_id: the room that the events are meant to be in (though this has
+ not yet been checked)
+ event_id: map from event_id -> event for the fetched events
+ """
+ # get all the auth events for all the events in this batch. By now, they should
+ # have been persisted.
+ auth_events = {
+ aid for event in fetched_events for aid in event.auth_event_ids()
+ }
persisted_events = await self._store.get_events(
auth_events,
allow_rejected=True,
)
event_infos = []
- for event in event_map.values():
+ for event in fetched_events:
auth = {}
for auth_event_id in event.auth_event_ids():
- ae = persisted_events.get(auth_event_id) or event_map.get(auth_event_id)
+ ae = persisted_events.get(auth_event_id)
if ae:
auth[(ae.type, ae.state_key)] = ae
else:
@@ -1156,27 +1206,6 @@ class FederationEventHandler:
event_infos.append(_NewEventInfo(event, auth))
- if event_infos:
- await self._auth_and_persist_events(
- destination,
- room_id,
- event_infos,
- )
-
- async def _auth_and_persist_events(
- self,
- origin: str,
- room_id: str,
- event_infos: Collection[_NewEventInfo],
- ) -> None:
- """Creates the appropriate contexts and persists events. The events
- should not depend on one another, e.g. this should be used to persist
- a bunch of outliers, but not a chunk of individual events that depend
- on each other for state calculations.
-
- Notifies about the events where appropriate.
- """
-
if not event_infos:
return
|