diff options
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/federation_event.py | 103 |
1 files changed, 48 insertions, 55 deletions
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 4eefcc36d8..8fd9e51044 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -1080,7 +1080,7 @@ class FederationEventHandler: room_version = await self._store.get_room_version(room_id) - event_map: Dict[str, EventBase] = {} + events: List[EventBase] = [] async def get_event(event_id: str) -> None: with nested_logging_context(event_id): @@ -1098,8 +1098,7 @@ class FederationEventHandler: event_id, ) return - - event_map[event.event_id] = event + events.append(event) except Exception as e: logger.warning( @@ -1110,11 +1109,29 @@ class FederationEventHandler: ) await concurrently_execute(get_event, event_ids, 5) - logger.info("Fetched %i events of %i requested", len(event_map), len(event_ids)) + logger.info("Fetched %i events of %i requested", len(events), len(event_ids)) + await self._auth_and_persist_fetched_events(destination, room_id, events) + + async def _auth_and_persist_fetched_events( + self, origin: str, room_id: str, events: Iterable[EventBase] + ) -> None: + """Persist the events fetched by _get_events_and_persist or _get_remote_auth_chain_for_event + + The events to be persisted must be outliers. + + We first sort the events to make sure that we process each event's auth_events + before the event itself, and then auth and persist them. + + Notifies about the events where appropriate. + + Params: + origin: where the events came from + room_id: the room that the events are meant to be in (though this has + not yet been checked) + events: the events that have been fetched + """ + event_map = {event.event_id: event for event in events} - # we now need to auth the events in an order which ensures that each event's - # auth_events are authed before the event itself. - # # XXX: it might be possible to kick this process off in parallel with fetching # the events. while event_map: @@ -1141,22 +1158,18 @@ class FederationEventHandler: "Persisting %i of %i remaining events", len(roots), len(event_map) ) - await self._auth_and_persist_fetched_events(destination, room_id, roots) + await self._auth_and_persist_fetched_events_inner(origin, room_id, roots) for ev in roots: del event_map[ev.event_id] - async def _auth_and_persist_fetched_events( + async def _auth_and_persist_fetched_events_inner( self, origin: str, room_id: str, fetched_events: Collection[EventBase] ) -> None: - """Persist the events fetched by _get_events_and_persist. + """Helper for _auth_and_persist_fetched_events - The events should not depend on one another, e.g. this should be used to persist - a bunch of outliers, but not a chunk of individual events that depend - on each other for state calculations. - - We also assume that all of the auth events for all of the events have already - been persisted. + Persists a batch of events where we have (theoretically) already persisted all + of their auth events. Notifies about the events where appropriate. @@ -1164,7 +1177,7 @@ class FederationEventHandler: origin: where the events came from room_id: the room that the events are meant to be in (though this has not yet been checked) - event_id: map from event_id -> event for the fetched events + fetched_events: the events to persist """ # get all the auth events for all the events in this batch. By now, they should # have been persisted. @@ -1558,53 +1571,33 @@ class FederationEventHandler: event_id: the event for which we are lacking auth events """ try: - remote_auth_chain = await self._federation_client.get_event_auth( - destination, room_id, event_id - ) + remote_event_map = { + e.event_id: e + for e in await self._federation_client.get_event_auth( + destination, room_id, event_id + ) + } except RequestSendFailed as e1: # The other side isn't around or doesn't implement the # endpoint, so lets just bail out. logger.info("Failed to get event auth from remote: %s", e1) return - seen_remotes = await self._store.have_seen_events( - room_id, [e.event_id for e in remote_auth_chain] - ) + logger.info("/event_auth returned %i events", len(remote_event_map)) - for auth_event in remote_auth_chain: - if auth_event.event_id in seen_remotes: - continue + # `event` may be returned, but we should not yet process it. + remote_event_map.pop(event_id, None) - if auth_event.event_id == event_id: - continue + # nor should we reprocess any events we have already seen. + seen_remotes = await self._store.have_seen_events( + room_id, remote_event_map.keys() + ) + for s in seen_remotes: + remote_event_map.pop(s, None) - try: - auth_ids = auth_event.auth_event_ids() - auth = { - (e.type, e.state_key): e - for e in remote_auth_chain - if e.event_id in auth_ids or e.type == EventTypes.Create - } - auth_event.internal_metadata.outlier = True - - logger.debug( - "_check_event_auth %s missing_auth: %s", - event_id, - auth_event.event_id, - ) - missing_auth_event_context = EventContext.for_outlier() - missing_auth_event_context = await self._check_event_auth( - destination, - auth_event, - missing_auth_event_context, - claimed_auth_event_map=auth, - ) - await self.persist_events_and_notify( - room_id, - [(auth_event, missing_auth_event_context)], - ) - except AuthError: - pass + await self._auth_and_persist_fetched_events( + destination, room_id, remote_event_map.values() + ) async def _update_context_for_auth_events( self, event: EventBase, context: EventContext, auth_events: StateMap[EventBase] |