diff options
Diffstat (limited to 'synapse/handlers/federation_event.py')
-rw-r--r-- | synapse/handlers/federation_event.py | 101 |
1 files changed, 65 insertions, 36 deletions
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 69f8287b2b..9e188bb51c 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -818,7 +818,7 @@ class FederationEventHandler: missing_events = missing_desired_events | missing_auth_events logger.debug("Fetching %i events from remote", len(missing_events)) await self._get_events_and_persist( - destination=destination, room_id=room_id, events=missing_events + destination=destination, room_id=room_id, event_ids=missing_events ) # we need to make sure we re-load from the database to get the rejected @@ -1085,12 +1085,12 @@ class FederationEventHandler: ) async def _get_events_and_persist( - self, destination: str, room_id: str, events: Iterable[str] + self, destination: str, room_id: str, event_ids: Collection[str] ) -> None: """Fetch the given events from a server, and persist them as outliers. This function *does not* recursively get missing auth events of the - newly fetched events. Callers must include in the `events` argument + newly fetched events. Callers must include in the `event_ids` argument any missing events from the auth chain. Logs a warning if we can't find the given event. @@ -1127,28 +1127,78 @@ class FederationEventHandler: e, ) - await concurrently_execute(get_event, events, 5) + await concurrently_execute(get_event, event_ids, 5) + logger.info("Fetched %i events of %i requested", len(event_map), len(event_ids)) - # Make a map of auth events for each event. We do this after fetching - # all the events as some of the events' auth events will be in the list - # of requested events. + # we now need to auth the events in an order which ensures that each event's + # auth_events are authed before the event itself. + # + # XXX: it might be possible to kick this process off in parallel with fetching + # the events. + while event_map: + # build a list of events whose auth events are not in the queue. + roots = tuple( + ev + for ev in event_map.values() + if not any(aid in event_map for aid in ev.auth_event_ids()) + ) - auth_events = [ - aid - for event in event_map.values() - for aid in event.auth_event_ids() - if aid not in event_map - ] + if not roots: + # if *none* of the remaining events are ready, that means + # we have a loop. This either means a bug in our logic, or that + # somebody has managed to create a loop (which requires finding a + # hash collision in room v2 and later). + logger.warning( + "Loop found in auth events while fetching missing state/auth " + "events: %s", + shortstr(event_map.keys()), + ) + return + + logger.info( + "Persisting %i of %i remaining events", len(roots), len(event_map) + ) + + await self._auth_and_persist_fetched_events(destination, room_id, roots) + + for ev in roots: + del event_map[ev.event_id] + + async def _auth_and_persist_fetched_events( + self, origin: str, room_id: str, fetched_events: Collection[EventBase] + ) -> None: + """Persist the events fetched by _get_events_and_persist. + + The events should not depend on one another, e.g. this should be used to persist + a bunch of outliers, but not a chunk of individual events that depend + on each other for state calculations. + + We also assume that all of the auth events for all of the events have already + been persisted. + + Notifies about the events where appropriate. + + Params: + origin: where the events came from + room_id: the room that the events are meant to be in (though this has + not yet been checked) + event_id: map from event_id -> event for the fetched events + """ + # get all the auth events for all the events in this batch. By now, they should + # have been persisted. + auth_events = { + aid for event in fetched_events for aid in event.auth_event_ids() + } persisted_events = await self._store.get_events( auth_events, allow_rejected=True, ) event_infos = [] - for event in event_map.values(): + for event in fetched_events: auth = {} for auth_event_id in event.auth_event_ids(): - ae = persisted_events.get(auth_event_id) or event_map.get(auth_event_id) + ae = persisted_events.get(auth_event_id) if ae: auth[(ae.type, ae.state_key)] = ae else: @@ -1156,27 +1206,6 @@ class FederationEventHandler: event_infos.append(_NewEventInfo(event, auth)) - if event_infos: - await self._auth_and_persist_events( - destination, - room_id, - event_infos, - ) - - async def _auth_and_persist_events( - self, - origin: str, - room_id: str, - event_infos: Collection[_NewEventInfo], - ) -> None: - """Creates the appropriate contexts and persists events. The events - should not depend on one another, e.g. this should be used to persist - a bunch of outliers, but not a chunk of individual events that depend - on each other for state calculations. - - Notifies about the events where appropriate. - """ - if not event_infos: return |