summary refs log tree commit diff
path: root/synapse/handlers/federation_event.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/federation_event.py')
-rw-r--r--synapse/handlers/federation_event.py284
1 files changed, 248 insertions, 36 deletions
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index f640b417b3..5a2f2e5ebb 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -12,6 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import itertools
 import logging
 from http import HTTPStatus
 from typing import (
@@ -45,7 +46,7 @@ from synapse.api.errors import (
     RequestSendFailed,
     SynapseError,
 )
-from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
+from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion, RoomVersions
 from synapse.event_auth import (
     auth_types_for_event,
     check_auth_rules_for_event,
@@ -214,7 +215,7 @@ class FederationEventHandler:
 
         if missing_prevs:
             # We only backfill backwards to the min depth.
-            min_depth = await self.get_min_depth_for_context(pdu.room_id)
+            min_depth = await self._store.get_min_depth(pdu.room_id)
             logger.debug("min_depth: %d", min_depth)
 
             if min_depth is not None and pdu.depth > min_depth:
@@ -390,9 +391,122 @@ class FederationEventHandler:
             prev_member_event,
         )
 
+    async def process_remote_join(
+        self,
+        origin: str,
+        room_id: str,
+        auth_events: List[EventBase],
+        state: List[EventBase],
+        event: EventBase,
+        room_version: RoomVersion,
+    ) -> int:
+        """Persists the events returned by a send_join
+
+        Checks the auth chain is valid (and passes auth checks) for the
+        state and event. Then persists the auth chain and state atomically.
+        Persists the event separately. Notifies about the persisted events
+        where appropriate.
+
+        Will attempt to fetch missing auth events.
+
+        Args:
+            origin: Where the events came from
+            room_id,
+            auth_events
+            state
+            event
+            room_version: The room version we expect this room to have, and
+                will raise if it doesn't match the version in the create event.
+        """
+        events_to_context = {}
+        for e in itertools.chain(auth_events, state):
+            e.internal_metadata.outlier = True
+            events_to_context[e.event_id] = EventContext.for_outlier()
+
+        event_map = {
+            e.event_id: e for e in itertools.chain(auth_events, state, [event])
+        }
+
+        create_event = None
+        for e in auth_events:
+            if (e.type, e.state_key) == (EventTypes.Create, ""):
+                create_event = e
+                break
+
+        if create_event is None:
+            # If the state doesn't have a create event then the room is
+            # invalid, and it would fail auth checks anyway.
+            raise SynapseError(400, "No create event in state")
+
+        room_version_id = create_event.content.get(
+            "room_version", RoomVersions.V1.identifier
+        )
+
+        if room_version.identifier != room_version_id:
+            raise SynapseError(400, "Room version mismatch")
+
+        missing_auth_events = set()
+        for e in itertools.chain(auth_events, state, [event]):
+            for e_id in e.auth_event_ids():
+                if e_id not in event_map:
+                    missing_auth_events.add(e_id)
+
+        for e_id in missing_auth_events:
+            m_ev = await self._federation_client.get_pdu(
+                [origin],
+                e_id,
+                room_version=room_version,
+                outlier=True,
+                timeout=10000,
+            )
+            if m_ev and m_ev.event_id == e_id:
+                event_map[e_id] = m_ev
+            else:
+                logger.info("Failed to find auth event %r", e_id)
+
+        for e in itertools.chain(auth_events, state, [event]):
+            auth_for_e = [
+                event_map[e_id] for e_id in e.auth_event_ids() if e_id in event_map
+            ]
+            if create_event:
+                auth_for_e.append(create_event)
+
+            try:
+                validate_event_for_room_version(room_version, e)
+                check_auth_rules_for_event(room_version, e, auth_for_e)
+            except SynapseError as err:
+                # we may get SynapseErrors here as well as AuthErrors. For
+                # instance, there are a couple of (ancient) events in some
+                # rooms whose senders do not have the correct sigil; these
+                # cause SynapseErrors in auth.check. We don't want to give up
+                # the attempt to federate altogether in such cases.
+
+                logger.warning("Rejecting %s because %s", e.event_id, err.msg)
+
+                if e == event:
+                    raise
+                events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR
+
+        if auth_events or state:
+            await self.persist_events_and_notify(
+                room_id,
+                [
+                    (e, events_to_context[e.event_id])
+                    for e in itertools.chain(auth_events, state)
+                ],
+            )
+
+        new_event_context = await self._state_handler.compute_event_context(
+            event, old_state=state
+        )
+
+        return await self.persist_events_and_notify(
+            room_id, [(event, new_event_context)]
+        )
+
     @log_function
     async def backfill(
-        self, dest: str, room_id: str, limit: int, extremities: List[str]
+        self, dest: str, room_id: str, limit: int, extremities: Iterable[str]
     ) -> None:
         """Trigger a backfill request to `dest` for the given `room_id`
 
@@ -1116,14 +1230,12 @@ class FederationEventHandler:
 
         await concurrently_execute(get_event, event_ids, 5)
         logger.info("Fetched %i events of %i requested", len(events), len(event_ids))
-        await self._auth_and_persist_fetched_events(destination, room_id, events)
+        await self._auth_and_persist_outliers(room_id, events)
 
-    async def _auth_and_persist_fetched_events(
-        self, origin: str, room_id: str, events: Iterable[EventBase]
+    async def _auth_and_persist_outliers(
+        self, room_id: str, events: Iterable[EventBase]
     ) -> None:
-        """Persist the events fetched by _get_events_and_persist or _get_remote_auth_chain_for_event
-
-        The events to be persisted must be outliers.
+        """Persist a batch of outlier events fetched from remote servers.
 
         We first sort the events to make sure that we process each event's auth_events
         before the event itself, and then auth and persist them.
@@ -1131,7 +1243,6 @@ class FederationEventHandler:
         Notifies about the events where appropriate.
 
         Params:
-            origin: where the events came from
             room_id: the room that the events are meant to be in (though this has
                not yet been checked)
             events: the events that have been fetched
@@ -1167,15 +1278,15 @@ class FederationEventHandler:
                 shortstr(e.event_id for e in roots),
             )
 
-            await self._auth_and_persist_fetched_events_inner(origin, room_id, roots)
+            await self._auth_and_persist_outliers_inner(room_id, roots)
 
             for ev in roots:
                 del event_map[ev.event_id]
 
-    async def _auth_and_persist_fetched_events_inner(
-        self, origin: str, room_id: str, fetched_events: Collection[EventBase]
+    async def _auth_and_persist_outliers_inner(
+        self, room_id: str, fetched_events: Collection[EventBase]
     ) -> None:
-        """Helper for _auth_and_persist_fetched_events
+        """Helper for _auth_and_persist_outliers
 
         Persists a batch of events where we have (theoretically) already persisted all
         of their auth events.
@@ -1203,7 +1314,7 @@ class FederationEventHandler:
 
         def prep(event: EventBase) -> Optional[Tuple[EventBase, EventContext]]:
             with nested_logging_context(suffix=event.event_id):
-                auth = {}
+                auth = []
                 for auth_event_id in event.auth_event_ids():
                     ae = persisted_events.get(auth_event_id)
                     if not ae:
@@ -1216,7 +1327,7 @@ class FederationEventHandler:
                         # exist, which means it is premature to reject `event`. Instead we
                         # just ignore it for now.
                         return None
-                    auth[(ae.type, ae.state_key)] = ae
+                    auth.append(ae)
 
                 context = EventContext.for_outlier()
                 try:
@@ -1256,6 +1367,10 @@ class FederationEventHandler:
 
         Returns:
             The updated context object.
+
+        Raises:
+            AuthError if we were unable to find copies of the event's auth events.
+               (Most other failures just cause us to set `context.rejected`.)
         """
         # This method should only be used for non-outliers
         assert not event.internal_metadata.outlier
@@ -1272,7 +1387,26 @@ class FederationEventHandler:
             context.rejected = RejectedReason.AUTH_ERROR
             return context
 
-        # calculate what the auth events *should* be, to use as a basis for auth.
+        # next, check that we have all of the event's auth events.
+        #
+        # Note that this can raise AuthError, which we want to propagate to the
+        # caller rather than swallow with `context.rejected` (since we cannot be
+        # certain that there is a permanent problem with the event).
+        claimed_auth_events = await self._load_or_fetch_auth_events_for_event(
+            origin, event
+        )
+
+        # ... and check that the event passes auth at those auth events.
+        try:
+            check_auth_rules_for_event(room_version_obj, event, claimed_auth_events)
+        except AuthError as e:
+            logger.warning(
+                "While checking auth of %r against auth_events: %s", event, e
+            )
+            context.rejected = RejectedReason.AUTH_ERROR
+            return context
+
+        # now check auth against what we think the auth events *should* be.
         prev_state_ids = await context.get_prev_state_ids()
         auth_events_ids = self._event_auth_handler.compute_auth_events(
             event, prev_state_ids, for_verification=True
@@ -1305,7 +1439,9 @@ class FederationEventHandler:
             auth_events_for_auth = calculated_auth_event_map
 
         try:
-            check_auth_rules_for_event(room_version_obj, event, auth_events_for_auth)
+            check_auth_rules_for_event(
+                room_version_obj, event, auth_events_for_auth.values()
+            )
         except AuthError as e:
             logger.warning("Failed auth resolution for %r because %s", event, e)
             context.rejected = RejectedReason.AUTH_ERROR
@@ -1403,11 +1539,9 @@ class FederationEventHandler:
         current_state_ids_list = [
             e for k, e in current_state_ids.items() if k in auth_types
         ]
-
-        auth_events_map = await self._store.get_events(current_state_ids_list)
-        current_auth_events = {
-            (e.type, e.state_key): e for e in auth_events_map.values()
-        }
+        current_auth_events = await self._store.get_events_as_list(
+            current_state_ids_list
+        )
 
         try:
             check_auth_rules_for_event(room_version_obj, event, current_auth_events)
@@ -1472,6 +1606,9 @@ class FederationEventHandler:
         # if we have missing events, we need to fetch those events from somewhere.
         #
         # we start by checking if they are in the store, and then try calling /event_auth/.
+        #
+        # TODO: this code is now redundant, since it should be impossible for us to
+        #   get here without already having the auth events.
         if missing_auth:
             have_events = await self._store.have_seen_events(
                 event.room_id, missing_auth
@@ -1575,7 +1712,7 @@ class FederationEventHandler:
         logger.info(
             "After state res: updating auth_events with new state %s",
             {
-                (d.type, d.state_key): d.event_id
+                d
                 for d in new_state.values()
                 if auth_events.get((d.type, d.state_key)) != d
             },
@@ -1589,6 +1726,75 @@ class FederationEventHandler:
 
         return context, auth_events
 
+    async def _load_or_fetch_auth_events_for_event(
+        self, destination: str, event: EventBase
+    ) -> Collection[EventBase]:
+        """Fetch this event's auth_events, from database or remote
+
+        Loads any of the auth_events that we already have from the database/cache. If
+        there are any that are missing, calls /event_auth to get the complete auth
+        chain for the event (and then attempts to load the auth_events again).
+
+        If any of the auth_events cannot be found, raises an AuthError. This can happen
+        for a number of reasons; eg: the events don't exist, or we were unable to talk
+        to `destination`, or we couldn't validate the signature on the event (which
+        in turn has multiple potential causes).
+
+        Args:
+            destination: where to send the /event_auth request. Typically the server
+               that sent us `event` in the first place.
+            event: the event whose auth_events we want
+
+        Returns:
+            all of the events in `event.auth_events`, after deduplication
+
+        Raises:
+            AuthError if we were unable to fetch the auth_events for any reason.
+        """
+        event_auth_event_ids = set(event.auth_event_ids())
+        event_auth_events = await self._store.get_events(
+            event_auth_event_ids, allow_rejected=True
+        )
+        missing_auth_event_ids = event_auth_event_ids.difference(
+            event_auth_events.keys()
+        )
+        if not missing_auth_event_ids:
+            return event_auth_events.values()
+
+        logger.info(
+            "Event %s refers to unknown auth events %s: fetching auth chain",
+            event,
+            missing_auth_event_ids,
+        )
+        try:
+            await self._get_remote_auth_chain_for_event(
+                destination, event.room_id, event.event_id
+            )
+        except Exception as e:
+            logger.warning("Failed to get auth chain for %s: %s", event, e)
+            # in this case, it's very likely we still won't have all the auth
+            # events - but we pick that up below.
+
+        # try to fetch the auth events we missed list time.
+        extra_auth_events = await self._store.get_events(
+            missing_auth_event_ids, allow_rejected=True
+        )
+        missing_auth_event_ids.difference_update(extra_auth_events.keys())
+        event_auth_events.update(extra_auth_events)
+        if not missing_auth_event_ids:
+            return event_auth_events.values()
+
+        # we still don't have all the auth events.
+        logger.warning(
+            "Missing auth events for %s: %s",
+            event,
+            shortstr(missing_auth_event_ids),
+        )
+        # the fact we can't find the auth event doesn't mean it doesn't
+        # exist, which means it is premature to store `event` as rejected.
+        # instead we raise an AuthError, which will make the caller ignore it.
+        raise AuthError(code=HTTPStatus.FORBIDDEN, msg="Auth events could not be found")
+
     async def _get_remote_auth_chain_for_event(
         self, destination: str, room_id: str, event_id: str
     ) -> None:
@@ -1624,9 +1830,7 @@ class FederationEventHandler:
         for s in seen_remotes:
             remote_event_map.pop(s, None)
 
-        await self._auth_and_persist_fetched_events(
-            destination, room_id, remote_event_map.values()
-        )
+        await self._auth_and_persist_outliers(room_id, remote_event_map.values())
 
     async def _update_context_for_auth_events(
         self, event: EventBase, context: EventContext, auth_events: StateMap[EventBase]
@@ -1696,16 +1900,27 @@ class FederationEventHandler:
         # persist_events_and_notify directly.)
         assert not event.internal_metadata.outlier
 
-        try:
-            if (
-                not backfilled
-                and not context.rejected
-                and (await self._store.get_min_depth(event.room_id)) <= event.depth
-            ):
+        if not backfilled and not context.rejected:
+            min_depth = await self._store.get_min_depth(event.room_id)
+            if min_depth is None or min_depth > event.depth:
+                # XXX richvdh 2021/10/07: I don't really understand what this
+                # condition is doing. I think it's trying not to send pushes
+                # for events that predate our join - but that's not really what
+                # min_depth means, and anyway ancient events are a more general
+                # problem.
+                #
+                # for now I'm just going to log about it.
+                logger.info(
+                    "Skipping push actions for old event with depth %s < %s",
+                    event.depth,
+                    min_depth,
+                )
+            else:
                 await self._action_generator.handle_push_actions_for_event(
                     event, context
                 )
 
+        try:
             await self.persist_events_and_notify(
                 event.room_id, [(event, context)], backfilled=backfilled
             )
@@ -1837,6 +2052,3 @@ class FederationEventHandler:
                 len(ev.auth_event_ids()),
             )
             raise SynapseError(HTTPStatus.BAD_REQUEST, "Too many auth_events")
-
-    async def get_min_depth_for_context(self, context: str) -> int:
-        return await self._store.get_min_depth(context)