diff options
Diffstat (limited to 'synapse/handlers/federation_event.py')
-rw-r--r-- | synapse/handlers/federation_event.py | 284 |
1 files changed, 248 insertions, 36 deletions
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index f640b417b3..5a2f2e5ebb 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import itertools import logging from http import HTTPStatus from typing import ( @@ -45,7 +46,7 @@ from synapse.api.errors import ( RequestSendFailed, SynapseError, ) -from synapse.api.room_versions import KNOWN_ROOM_VERSIONS +from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion, RoomVersions from synapse.event_auth import ( auth_types_for_event, check_auth_rules_for_event, @@ -214,7 +215,7 @@ class FederationEventHandler: if missing_prevs: # We only backfill backwards to the min depth. - min_depth = await self.get_min_depth_for_context(pdu.room_id) + min_depth = await self._store.get_min_depth(pdu.room_id) logger.debug("min_depth: %d", min_depth) if min_depth is not None and pdu.depth > min_depth: @@ -390,9 +391,122 @@ class FederationEventHandler: prev_member_event, ) + async def process_remote_join( + self, + origin: str, + room_id: str, + auth_events: List[EventBase], + state: List[EventBase], + event: EventBase, + room_version: RoomVersion, + ) -> int: + """Persists the events returned by a send_join + + Checks the auth chain is valid (and passes auth checks) for the + state and event. Then persists the auth chain and state atomically. + Persists the event separately. Notifies about the persisted events + where appropriate. + + Will attempt to fetch missing auth events. + + Args: + origin: Where the events came from + room_id, + auth_events + state + event + room_version: The room version we expect this room to have, and + will raise if it doesn't match the version in the create event. + """ + events_to_context = {} + for e in itertools.chain(auth_events, state): + e.internal_metadata.outlier = True + events_to_context[e.event_id] = EventContext.for_outlier() + + event_map = { + e.event_id: e for e in itertools.chain(auth_events, state, [event]) + } + + create_event = None + for e in auth_events: + if (e.type, e.state_key) == (EventTypes.Create, ""): + create_event = e + break + + if create_event is None: + # If the state doesn't have a create event then the room is + # invalid, and it would fail auth checks anyway. + raise SynapseError(400, "No create event in state") + + room_version_id = create_event.content.get( + "room_version", RoomVersions.V1.identifier + ) + + if room_version.identifier != room_version_id: + raise SynapseError(400, "Room version mismatch") + + missing_auth_events = set() + for e in itertools.chain(auth_events, state, [event]): + for e_id in e.auth_event_ids(): + if e_id not in event_map: + missing_auth_events.add(e_id) + + for e_id in missing_auth_events: + m_ev = await self._federation_client.get_pdu( + [origin], + e_id, + room_version=room_version, + outlier=True, + timeout=10000, + ) + if m_ev and m_ev.event_id == e_id: + event_map[e_id] = m_ev + else: + logger.info("Failed to find auth event %r", e_id) + + for e in itertools.chain(auth_events, state, [event]): + auth_for_e = [ + event_map[e_id] for e_id in e.auth_event_ids() if e_id in event_map + ] + if create_event: + auth_for_e.append(create_event) + + try: + validate_event_for_room_version(room_version, e) + check_auth_rules_for_event(room_version, e, auth_for_e) + except SynapseError as err: + # we may get SynapseErrors here as well as AuthErrors. For + # instance, there are a couple of (ancient) events in some + # rooms whose senders do not have the correct sigil; these + # cause SynapseErrors in auth.check. We don't want to give up + # the attempt to federate altogether in such cases. + + logger.warning("Rejecting %s because %s", e.event_id, err.msg) + + if e == event: + raise + events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR + + if auth_events or state: + await self.persist_events_and_notify( + room_id, + [ + (e, events_to_context[e.event_id]) + for e in itertools.chain(auth_events, state) + ], + ) + + new_event_context = await self._state_handler.compute_event_context( + event, old_state=state + ) + + return await self.persist_events_and_notify( + room_id, [(event, new_event_context)] + ) + @log_function async def backfill( - self, dest: str, room_id: str, limit: int, extremities: List[str] + self, dest: str, room_id: str, limit: int, extremities: Iterable[str] ) -> None: """Trigger a backfill request to `dest` for the given `room_id` @@ -1116,14 +1230,12 @@ class FederationEventHandler: await concurrently_execute(get_event, event_ids, 5) logger.info("Fetched %i events of %i requested", len(events), len(event_ids)) - await self._auth_and_persist_fetched_events(destination, room_id, events) + await self._auth_and_persist_outliers(room_id, events) - async def _auth_and_persist_fetched_events( - self, origin: str, room_id: str, events: Iterable[EventBase] + async def _auth_and_persist_outliers( + self, room_id: str, events: Iterable[EventBase] ) -> None: - """Persist the events fetched by _get_events_and_persist or _get_remote_auth_chain_for_event - - The events to be persisted must be outliers. + """Persist a batch of outlier events fetched from remote servers. We first sort the events to make sure that we process each event's auth_events before the event itself, and then auth and persist them. @@ -1131,7 +1243,6 @@ class FederationEventHandler: Notifies about the events where appropriate. Params: - origin: where the events came from room_id: the room that the events are meant to be in (though this has not yet been checked) events: the events that have been fetched @@ -1167,15 +1278,15 @@ class FederationEventHandler: shortstr(e.event_id for e in roots), ) - await self._auth_and_persist_fetched_events_inner(origin, room_id, roots) + await self._auth_and_persist_outliers_inner(room_id, roots) for ev in roots: del event_map[ev.event_id] - async def _auth_and_persist_fetched_events_inner( - self, origin: str, room_id: str, fetched_events: Collection[EventBase] + async def _auth_and_persist_outliers_inner( + self, room_id: str, fetched_events: Collection[EventBase] ) -> None: - """Helper for _auth_and_persist_fetched_events + """Helper for _auth_and_persist_outliers Persists a batch of events where we have (theoretically) already persisted all of their auth events. @@ -1203,7 +1314,7 @@ class FederationEventHandler: def prep(event: EventBase) -> Optional[Tuple[EventBase, EventContext]]: with nested_logging_context(suffix=event.event_id): - auth = {} + auth = [] for auth_event_id in event.auth_event_ids(): ae = persisted_events.get(auth_event_id) if not ae: @@ -1216,7 +1327,7 @@ class FederationEventHandler: # exist, which means it is premature to reject `event`. Instead we # just ignore it for now. return None - auth[(ae.type, ae.state_key)] = ae + auth.append(ae) context = EventContext.for_outlier() try: @@ -1256,6 +1367,10 @@ class FederationEventHandler: Returns: The updated context object. + + Raises: + AuthError if we were unable to find copies of the event's auth events. + (Most other failures just cause us to set `context.rejected`.) """ # This method should only be used for non-outliers assert not event.internal_metadata.outlier @@ -1272,7 +1387,26 @@ class FederationEventHandler: context.rejected = RejectedReason.AUTH_ERROR return context - # calculate what the auth events *should* be, to use as a basis for auth. + # next, check that we have all of the event's auth events. + # + # Note that this can raise AuthError, which we want to propagate to the + # caller rather than swallow with `context.rejected` (since we cannot be + # certain that there is a permanent problem with the event). + claimed_auth_events = await self._load_or_fetch_auth_events_for_event( + origin, event + ) + + # ... and check that the event passes auth at those auth events. + try: + check_auth_rules_for_event(room_version_obj, event, claimed_auth_events) + except AuthError as e: + logger.warning( + "While checking auth of %r against auth_events: %s", event, e + ) + context.rejected = RejectedReason.AUTH_ERROR + return context + + # now check auth against what we think the auth events *should* be. prev_state_ids = await context.get_prev_state_ids() auth_events_ids = self._event_auth_handler.compute_auth_events( event, prev_state_ids, for_verification=True @@ -1305,7 +1439,9 @@ class FederationEventHandler: auth_events_for_auth = calculated_auth_event_map try: - check_auth_rules_for_event(room_version_obj, event, auth_events_for_auth) + check_auth_rules_for_event( + room_version_obj, event, auth_events_for_auth.values() + ) except AuthError as e: logger.warning("Failed auth resolution for %r because %s", event, e) context.rejected = RejectedReason.AUTH_ERROR @@ -1403,11 +1539,9 @@ class FederationEventHandler: current_state_ids_list = [ e for k, e in current_state_ids.items() if k in auth_types ] - - auth_events_map = await self._store.get_events(current_state_ids_list) - current_auth_events = { - (e.type, e.state_key): e for e in auth_events_map.values() - } + current_auth_events = await self._store.get_events_as_list( + current_state_ids_list + ) try: check_auth_rules_for_event(room_version_obj, event, current_auth_events) @@ -1472,6 +1606,9 @@ class FederationEventHandler: # if we have missing events, we need to fetch those events from somewhere. # # we start by checking if they are in the store, and then try calling /event_auth/. + # + # TODO: this code is now redundant, since it should be impossible for us to + # get here without already having the auth events. if missing_auth: have_events = await self._store.have_seen_events( event.room_id, missing_auth @@ -1575,7 +1712,7 @@ class FederationEventHandler: logger.info( "After state res: updating auth_events with new state %s", { - (d.type, d.state_key): d.event_id + d for d in new_state.values() if auth_events.get((d.type, d.state_key)) != d }, @@ -1589,6 +1726,75 @@ class FederationEventHandler: return context, auth_events + async def _load_or_fetch_auth_events_for_event( + self, destination: str, event: EventBase + ) -> Collection[EventBase]: + """Fetch this event's auth_events, from database or remote + + Loads any of the auth_events that we already have from the database/cache. If + there are any that are missing, calls /event_auth to get the complete auth + chain for the event (and then attempts to load the auth_events again). + + If any of the auth_events cannot be found, raises an AuthError. This can happen + for a number of reasons; eg: the events don't exist, or we were unable to talk + to `destination`, or we couldn't validate the signature on the event (which + in turn has multiple potential causes). + + Args: + destination: where to send the /event_auth request. Typically the server + that sent us `event` in the first place. + event: the event whose auth_events we want + + Returns: + all of the events in `event.auth_events`, after deduplication + + Raises: + AuthError if we were unable to fetch the auth_events for any reason. + """ + event_auth_event_ids = set(event.auth_event_ids()) + event_auth_events = await self._store.get_events( + event_auth_event_ids, allow_rejected=True + ) + missing_auth_event_ids = event_auth_event_ids.difference( + event_auth_events.keys() + ) + if not missing_auth_event_ids: + return event_auth_events.values() + + logger.info( + "Event %s refers to unknown auth events %s: fetching auth chain", + event, + missing_auth_event_ids, + ) + try: + await self._get_remote_auth_chain_for_event( + destination, event.room_id, event.event_id + ) + except Exception as e: + logger.warning("Failed to get auth chain for %s: %s", event, e) + # in this case, it's very likely we still won't have all the auth + # events - but we pick that up below. + + # try to fetch the auth events we missed list time. + extra_auth_events = await self._store.get_events( + missing_auth_event_ids, allow_rejected=True + ) + missing_auth_event_ids.difference_update(extra_auth_events.keys()) + event_auth_events.update(extra_auth_events) + if not missing_auth_event_ids: + return event_auth_events.values() + + # we still don't have all the auth events. + logger.warning( + "Missing auth events for %s: %s", + event, + shortstr(missing_auth_event_ids), + ) + # the fact we can't find the auth event doesn't mean it doesn't + # exist, which means it is premature to store `event` as rejected. + # instead we raise an AuthError, which will make the caller ignore it. + raise AuthError(code=HTTPStatus.FORBIDDEN, msg="Auth events could not be found") + async def _get_remote_auth_chain_for_event( self, destination: str, room_id: str, event_id: str ) -> None: @@ -1624,9 +1830,7 @@ class FederationEventHandler: for s in seen_remotes: remote_event_map.pop(s, None) - await self._auth_and_persist_fetched_events( - destination, room_id, remote_event_map.values() - ) + await self._auth_and_persist_outliers(room_id, remote_event_map.values()) async def _update_context_for_auth_events( self, event: EventBase, context: EventContext, auth_events: StateMap[EventBase] @@ -1696,16 +1900,27 @@ class FederationEventHandler: # persist_events_and_notify directly.) assert not event.internal_metadata.outlier - try: - if ( - not backfilled - and not context.rejected - and (await self._store.get_min_depth(event.room_id)) <= event.depth - ): + if not backfilled and not context.rejected: + min_depth = await self._store.get_min_depth(event.room_id) + if min_depth is None or min_depth > event.depth: + # XXX richvdh 2021/10/07: I don't really understand what this + # condition is doing. I think it's trying not to send pushes + # for events that predate our join - but that's not really what + # min_depth means, and anyway ancient events are a more general + # problem. + # + # for now I'm just going to log about it. + logger.info( + "Skipping push actions for old event with depth %s < %s", + event.depth, + min_depth, + ) + else: await self._action_generator.handle_push_actions_for_event( event, context ) + try: await self.persist_events_and_notify( event.room_id, [(event, context)], backfilled=backfilled ) @@ -1837,6 +2052,3 @@ class FederationEventHandler: len(ev.auth_event_ids()), ) raise SynapseError(HTTPStatus.BAD_REQUEST, "Too many auth_events") - - async def get_min_depth_for_context(self, context: str) -> int: - return await self._store.get_min_depth(context) |