summary refs log tree commit diff
path: root/synapse/handlers/federation.py
diff options
context:
space:
mode:
authorRichard van der Hoff <1389908+richvdh@users.noreply.github.com>2021-08-18 12:36:22 +0100
committerGitHub <noreply@github.com>2021-08-18 12:36:22 +0100
commit964f29cb6f4ff5892575ac170aa872822f2c1a0e (patch)
treef8e5b44e9bc9a5f8be41b65d999fd67038266461 /synapse/handlers/federation.py
parentUse auto-attribs for attrs classes for sync. (#10630) (diff)
downloadsynapse-964f29cb6f4ff5892575ac170aa872822f2c1a0e.tar.xz
Refactor `on_receive_pdu` code (#10615)
* drop room pdu linearizer sooner

No point holding onto it while we recheck the db

* move out `missing_prevs` calculation

we're going to need `missing_prevs` whatever we do, so we may as well calculate
it eagerly and just update it if it gets outdated.

* Add another `if missing_prevs` condition

this should be a no-op, since all the code inside the block already checks `if
missing_prevs`

* reorder if conditions

This shouldn't change the logic at all.

* Push down `min_depth` read

No point reading it from the database unless we're going to use it.

* Collect the sent_to_us_directly code together

Move the remaining `sent_to_us_directly` code inside the `if
sent_to_us_directly` block.

* Properly separate the `not sent_to_us_directly` branch

Since the only way this second block is now reachable is if we
*didn't* go into the `sent_to_us_directly` branch, we can replace it with a
simple `else`.

* changelog
Diffstat (limited to '')
-rw-r--r--synapse/handlers/federation.py271
1 files changed, 137 insertions, 134 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index a257d87fed..529d025c39 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -285,169 +285,172 @@ class FederationHandler(BaseHandler):
         #  - Fetching any missing prev events to fill in gaps in the graph
         #  - Fetching state if we have a hole in the graph
         if not pdu.internal_metadata.is_outlier():
-            # We only backfill backwards to the min depth.
-            min_depth = await self.get_min_depth_for_context(pdu.room_id)
-
-            logger.debug("min_depth: %d", min_depth)
-
             prevs = set(pdu.prev_event_ids())
             seen = await self.store.have_events_in_timeline(prevs)
+            missing_prevs = prevs - seen
 
-            if min_depth is not None and pdu.depth > min_depth:
-                missing_prevs = prevs - seen
-                if sent_to_us_directly and missing_prevs:
-                    # If we're missing stuff, ensure we only fetch stuff one
-                    # at a time.
-                    logger.info(
-                        "Acquiring room lock to fetch %d missing prev_events: %s",
-                        len(missing_prevs),
-                        shortstr(missing_prevs),
-                    )
-                    with (await self._room_pdu_linearizer.queue(pdu.room_id)):
+            if missing_prevs:
+                if sent_to_us_directly:
+                    # We only backfill backwards to the min depth.
+                    min_depth = await self.get_min_depth_for_context(pdu.room_id)
+                    logger.debug("min_depth: %d", min_depth)
+
+                    if min_depth is not None and pdu.depth > min_depth:
+                        # If we're missing stuff, ensure we only fetch stuff one
+                        # at a time.
                         logger.info(
-                            "Acquired room lock to fetch %d missing prev_events",
+                            "Acquiring room lock to fetch %d missing prev_events: %s",
                             len(missing_prevs),
+                            shortstr(missing_prevs),
                         )
-
-                        try:
-                            await self._get_missing_events_for_pdu(
-                                origin, pdu, prevs, min_depth
+                        with (await self._room_pdu_linearizer.queue(pdu.room_id)):
+                            logger.info(
+                                "Acquired room lock to fetch %d missing prev_events",
+                                len(missing_prevs),
                             )
-                        except Exception as e:
-                            raise Exception(
-                                "Error fetching missing prev_events for %s: %s"
-                                % (event_id, e)
-                            ) from e
+
+                            try:
+                                await self._get_missing_events_for_pdu(
+                                    origin, pdu, prevs, min_depth
+                                )
+                            except Exception as e:
+                                raise Exception(
+                                    "Error fetching missing prev_events for %s: %s"
+                                    % (event_id, e)
+                                ) from e
 
                         # Update the set of things we've seen after trying to
                         # fetch the missing stuff
                         seen = await self.store.have_events_in_timeline(prevs)
+                        missing_prevs = prevs - seen
+
+                        if not missing_prevs:
+                            logger.info("Found all missing prev_events")
+
+                    if missing_prevs:
+                        # since this event was pushed to us, it is possible for it to
+                        # become the only forward-extremity in the room, and we would then
+                        # trust its state to be the state for the whole room. This is very
+                        # bad. Further, if the event was pushed to us, there is no excuse
+                        # for us not to have all the prev_events. (XXX: apart from
+                        # min_depth?)
+                        #
+                        # We therefore reject any such events.
+                        logger.warning(
+                            "Rejecting: failed to fetch %d prev events: %s",
+                            len(missing_prevs),
+                            shortstr(missing_prevs),
+                        )
+                        raise FederationError(
+                            "ERROR",
+                            403,
+                            (
+                                "Your server isn't divulging details about prev_events "
+                                "referenced in this event."
+                            ),
+                            affected=pdu.event_id,
+                        )
 
-                        if not prevs - seen:
-                            logger.info(
-                                "Found all missing prev_events",
-                            )
-
-            missing_prevs = prevs - seen
-            if missing_prevs:
-                # We've still not been able to get all of the prev_events for this event.
-                #
-                # In this case, we need to fall back to asking another server in the
-                # federation for the state at this event. That's ok provided we then
-                # resolve the state against other bits of the DAG before using it (which
-                # will ensure that you can't just take over a room by sending an event,
-                # withholding its prev_events, and declaring yourself to be an admin in
-                # the subsequent state request).
-                #
-                # Now, if we're pulling this event as a missing prev_event, then clearly
-                # this event is not going to become the only forward-extremity and we are
-                # guaranteed to resolve its state against our existing forward
-                # extremities, so that should be fine.
-                #
-                # On the other hand, if this event was pushed to us, it is possible for
-                # it to become the only forward-extremity in the room, and we would then
-                # trust its state to be the state for the whole room. This is very bad.
-                # Further, if the event was pushed to us, there is no excuse for us not to
-                # have all the prev_events. We therefore reject any such events.
-                #
-                # XXX this really feels like it could/should be merged with the above,
-                # but there is an interaction with min_depth that I'm not really
-                # following.
-
-                if sent_to_us_directly:
-                    logger.warning(
-                        "Rejecting: failed to fetch %d prev events: %s",
-                        len(missing_prevs),
+                else:
+                    # We don't have all of the prev_events for this event.
+                    #
+                    # In this case, we need to fall back to asking another server in the
+                    # federation for the state at this event. That's ok provided we then
+                    # resolve the state against other bits of the DAG before using it (which
+                    # will ensure that you can't just take over a room by sending an event,
+                    # withholding its prev_events, and declaring yourself to be an admin in
+                    # the subsequent state request).
+                    #
+                    # Since we're pulling this event as a missing prev_event, then clearly
+                    # this event is not going to become the only forward-extremity and we are
+                    # guaranteed to resolve its state against our existing forward
+                    # extremities, so that should be fine.
+                    #
+                    # XXX this really feels like it could/should be merged with the above,
+                    # but there is an interaction with min_depth that I'm not really
+                    # following.
+                    logger.info(
+                        "Event %s is missing prev_events %s: calculating state for a "
+                        "backwards extremity",
+                        event_id,
                         shortstr(missing_prevs),
                     )
-                    raise FederationError(
-                        "ERROR",
-                        403,
-                        (
-                            "Your server isn't divulging details about prev_events "
-                            "referenced in this event."
-                        ),
-                        affected=pdu.event_id,
-                    )
 
-                logger.info(
-                    "Event %s is missing prev_events %s: calculating state for a "
-                    "backwards extremity",
-                    event_id,
-                    shortstr(missing_prevs),
-                )
+                    # Calculate the state after each of the previous events, and
+                    # resolve them to find the correct state at the current event.
+                    event_map = {event_id: pdu}
+                    try:
+                        # Get the state of the events we know about
+                        ours = await self.state_store.get_state_groups_ids(
+                            room_id, seen
+                        )
 
-                # Calculate the state after each of the previous events, and
-                # resolve them to find the correct state at the current event.
-                event_map = {event_id: pdu}
-                try:
-                    # Get the state of the events we know about
-                    ours = await self.state_store.get_state_groups_ids(room_id, seen)
-
-                    # state_maps is a list of mappings from (type, state_key) to event_id
-                    state_maps: List[StateMap[str]] = list(ours.values())
-
-                    # we don't need this any more, let's delete it.
-                    del ours
-
-                    # Ask the remote server for the states we don't
-                    # know about
-                    for p in missing_prevs:
-                        logger.info("Requesting state after missing prev_event %s", p)
-
-                        with nested_logging_context(p):
-                            # note that if any of the missing prevs share missing state or
-                            # auth events, the requests to fetch those events are deduped
-                            # by the get_pdu_cache in federation_client.
-                            remote_state = (
-                                await self._get_state_after_missing_prev_event(
-                                    origin, room_id, p
-                                )
+                        # state_maps is a list of mappings from (type, state_key) to event_id
+                        state_maps: List[StateMap[str]] = list(ours.values())
+
+                        # we don't need this any more, let's delete it.
+                        del ours
+
+                        # Ask the remote server for the states we don't
+                        # know about
+                        for p in missing_prevs:
+                            logger.info(
+                                "Requesting state after missing prev_event %s", p
                             )
 
-                            remote_state_map = {
-                                (x.type, x.state_key): x.event_id for x in remote_state
-                            }
-                            state_maps.append(remote_state_map)
+                            with nested_logging_context(p):
+                                # note that if any of the missing prevs share missing state or
+                                # auth events, the requests to fetch those events are deduped
+                                # by the get_pdu_cache in federation_client.
+                                remote_state = (
+                                    await self._get_state_after_missing_prev_event(
+                                        origin, room_id, p
+                                    )
+                                )
+
+                                remote_state_map = {
+                                    (x.type, x.state_key): x.event_id
+                                    for x in remote_state
+                                }
+                                state_maps.append(remote_state_map)
 
-                            for x in remote_state:
-                                event_map[x.event_id] = x
+                                for x in remote_state:
+                                    event_map[x.event_id] = x
 
-                    room_version = await self.store.get_room_version_id(room_id)
-                    state_map = (
-                        await self._state_resolution_handler.resolve_events_with_store(
+                        room_version = await self.store.get_room_version_id(room_id)
+                        state_map = await self._state_resolution_handler.resolve_events_with_store(
                             room_id,
                             room_version,
                             state_maps,
                             event_map,
                             state_res_store=StateResolutionStore(self.store),
                         )
-                    )
 
-                    # We need to give _process_received_pdu the actual state events
-                    # rather than event ids, so generate that now.
+                        # We need to give _process_received_pdu the actual state events
+                        # rather than event ids, so generate that now.
 
-                    # First though we need to fetch all the events that are in
-                    # state_map, so we can build up the state below.
-                    evs = await self.store.get_events(
-                        list(state_map.values()),
-                        get_prev_content=False,
-                        redact_behaviour=EventRedactBehaviour.AS_IS,
-                    )
-                    event_map.update(evs)
+                        # First though we need to fetch all the events that are in
+                        # state_map, so we can build up the state below.
+                        evs = await self.store.get_events(
+                            list(state_map.values()),
+                            get_prev_content=False,
+                            redact_behaviour=EventRedactBehaviour.AS_IS,
+                        )
+                        event_map.update(evs)
 
-                    state = [event_map[e] for e in state_map.values()]
-                except Exception:
-                    logger.warning(
-                        "Error attempting to resolve state at missing " "prev_events",
-                        exc_info=True,
-                    )
-                    raise FederationError(
-                        "ERROR",
-                        403,
-                        "We can't get valid state history.",
-                        affected=event_id,
-                    )
+                        state = [event_map[e] for e in state_map.values()]
+                    except Exception:
+                        logger.warning(
+                            "Error attempting to resolve state at missing "
+                            "prev_events",
+                            exc_info=True,
+                        )
+                        raise FederationError(
+                            "ERROR",
+                            403,
+                            "We can't get valid state history.",
+                            affected=event_id,
+                        )
 
         # A second round of checks for all events. Check that the event passes auth
         # based on `auth_events`, this allows us to assert that the event would