diff options
author | Richard van der Hoff <1389908+richvdh@users.noreply.github.com> | 2021-08-18 12:36:22 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-08-18 12:36:22 +0100 |
commit | 964f29cb6f4ff5892575ac170aa872822f2c1a0e (patch) | |
tree | f8e5b44e9bc9a5f8be41b65d999fd67038266461 /synapse/handlers | |
parent | Use auto-attribs for attrs classes for sync. (#10630) (diff) | |
download | synapse-964f29cb6f4ff5892575ac170aa872822f2c1a0e.tar.xz |
Refactor `on_receive_pdu` code (#10615)
* drop room pdu linearizer sooner No point holding onto it while we recheck the db * move out `missing_prevs` calculation we're going to need `missing_prevs` whatever we do, so we may as well calculate it eagerly and just update it if it gets outdated. * Add another `if missing_prevs` condition this should be a no-op, since all the code inside the block already checks `if missing_prevs` * reorder if conditions This shouldn't change the logic at all. * Push down `min_depth` read No point reading it from the database unless we're going to use it. * Collect the sent_to_us_directly code together Move the remaining `sent_to_us_directly` code inside the `if sent_to_us_directly` block. * Properly separate the `not sent_to_us_directly` branch Since the only way this second block is now reachable is if we *didn't* go into the `sent_to_us_directly` branch, we can replace it with a simple `else`. * changelog
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/federation.py | 271 |
1 files changed, 137 insertions, 134 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index a257d87fed..529d025c39 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -285,169 +285,172 @@ class FederationHandler(BaseHandler): # - Fetching any missing prev events to fill in gaps in the graph # - Fetching state if we have a hole in the graph if not pdu.internal_metadata.is_outlier(): - # We only backfill backwards to the min depth. - min_depth = await self.get_min_depth_for_context(pdu.room_id) - - logger.debug("min_depth: %d", min_depth) - prevs = set(pdu.prev_event_ids()) seen = await self.store.have_events_in_timeline(prevs) + missing_prevs = prevs - seen - if min_depth is not None and pdu.depth > min_depth: - missing_prevs = prevs - seen - if sent_to_us_directly and missing_prevs: - # If we're missing stuff, ensure we only fetch stuff one - # at a time. - logger.info( - "Acquiring room lock to fetch %d missing prev_events: %s", - len(missing_prevs), - shortstr(missing_prevs), - ) - with (await self._room_pdu_linearizer.queue(pdu.room_id)): + if missing_prevs: + if sent_to_us_directly: + # We only backfill backwards to the min depth. + min_depth = await self.get_min_depth_for_context(pdu.room_id) + logger.debug("min_depth: %d", min_depth) + + if min_depth is not None and pdu.depth > min_depth: + # If we're missing stuff, ensure we only fetch stuff one + # at a time. logger.info( - "Acquired room lock to fetch %d missing prev_events", + "Acquiring room lock to fetch %d missing prev_events: %s", len(missing_prevs), + shortstr(missing_prevs), ) - - try: - await self._get_missing_events_for_pdu( - origin, pdu, prevs, min_depth + with (await self._room_pdu_linearizer.queue(pdu.room_id)): + logger.info( + "Acquired room lock to fetch %d missing prev_events", + len(missing_prevs), ) - except Exception as e: - raise Exception( - "Error fetching missing prev_events for %s: %s" - % (event_id, e) - ) from e + + try: + await self._get_missing_events_for_pdu( + origin, pdu, prevs, min_depth + ) + except Exception as e: + raise Exception( + "Error fetching missing prev_events for %s: %s" + % (event_id, e) + ) from e # Update the set of things we've seen after trying to # fetch the missing stuff seen = await self.store.have_events_in_timeline(prevs) + missing_prevs = prevs - seen + + if not missing_prevs: + logger.info("Found all missing prev_events") + + if missing_prevs: + # since this event was pushed to us, it is possible for it to + # become the only forward-extremity in the room, and we would then + # trust its state to be the state for the whole room. This is very + # bad. Further, if the event was pushed to us, there is no excuse + # for us not to have all the prev_events. (XXX: apart from + # min_depth?) + # + # We therefore reject any such events. + logger.warning( + "Rejecting: failed to fetch %d prev events: %s", + len(missing_prevs), + shortstr(missing_prevs), + ) + raise FederationError( + "ERROR", + 403, + ( + "Your server isn't divulging details about prev_events " + "referenced in this event." + ), + affected=pdu.event_id, + ) - if not prevs - seen: - logger.info( - "Found all missing prev_events", - ) - - missing_prevs = prevs - seen - if missing_prevs: - # We've still not been able to get all of the prev_events for this event. - # - # In this case, we need to fall back to asking another server in the - # federation for the state at this event. That's ok provided we then - # resolve the state against other bits of the DAG before using it (which - # will ensure that you can't just take over a room by sending an event, - # withholding its prev_events, and declaring yourself to be an admin in - # the subsequent state request). - # - # Now, if we're pulling this event as a missing prev_event, then clearly - # this event is not going to become the only forward-extremity and we are - # guaranteed to resolve its state against our existing forward - # extremities, so that should be fine. - # - # On the other hand, if this event was pushed to us, it is possible for - # it to become the only forward-extremity in the room, and we would then - # trust its state to be the state for the whole room. This is very bad. - # Further, if the event was pushed to us, there is no excuse for us not to - # have all the prev_events. We therefore reject any such events. - # - # XXX this really feels like it could/should be merged with the above, - # but there is an interaction with min_depth that I'm not really - # following. - - if sent_to_us_directly: - logger.warning( - "Rejecting: failed to fetch %d prev events: %s", - len(missing_prevs), + else: + # We don't have all of the prev_events for this event. + # + # In this case, we need to fall back to asking another server in the + # federation for the state at this event. That's ok provided we then + # resolve the state against other bits of the DAG before using it (which + # will ensure that you can't just take over a room by sending an event, + # withholding its prev_events, and declaring yourself to be an admin in + # the subsequent state request). + # + # Since we're pulling this event as a missing prev_event, then clearly + # this event is not going to become the only forward-extremity and we are + # guaranteed to resolve its state against our existing forward + # extremities, so that should be fine. + # + # XXX this really feels like it could/should be merged with the above, + # but there is an interaction with min_depth that I'm not really + # following. + logger.info( + "Event %s is missing prev_events %s: calculating state for a " + "backwards extremity", + event_id, shortstr(missing_prevs), ) - raise FederationError( - "ERROR", - 403, - ( - "Your server isn't divulging details about prev_events " - "referenced in this event." - ), - affected=pdu.event_id, - ) - logger.info( - "Event %s is missing prev_events %s: calculating state for a " - "backwards extremity", - event_id, - shortstr(missing_prevs), - ) + # Calculate the state after each of the previous events, and + # resolve them to find the correct state at the current event. + event_map = {event_id: pdu} + try: + # Get the state of the events we know about + ours = await self.state_store.get_state_groups_ids( + room_id, seen + ) - # Calculate the state after each of the previous events, and - # resolve them to find the correct state at the current event. - event_map = {event_id: pdu} - try: - # Get the state of the events we know about - ours = await self.state_store.get_state_groups_ids(room_id, seen) - - # state_maps is a list of mappings from (type, state_key) to event_id - state_maps: List[StateMap[str]] = list(ours.values()) - - # we don't need this any more, let's delete it. - del ours - - # Ask the remote server for the states we don't - # know about - for p in missing_prevs: - logger.info("Requesting state after missing prev_event %s", p) - - with nested_logging_context(p): - # note that if any of the missing prevs share missing state or - # auth events, the requests to fetch those events are deduped - # by the get_pdu_cache in federation_client. - remote_state = ( - await self._get_state_after_missing_prev_event( - origin, room_id, p - ) + # state_maps is a list of mappings from (type, state_key) to event_id + state_maps: List[StateMap[str]] = list(ours.values()) + + # we don't need this any more, let's delete it. + del ours + + # Ask the remote server for the states we don't + # know about + for p in missing_prevs: + logger.info( + "Requesting state after missing prev_event %s", p ) - remote_state_map = { - (x.type, x.state_key): x.event_id for x in remote_state - } - state_maps.append(remote_state_map) + with nested_logging_context(p): + # note that if any of the missing prevs share missing state or + # auth events, the requests to fetch those events are deduped + # by the get_pdu_cache in federation_client. + remote_state = ( + await self._get_state_after_missing_prev_event( + origin, room_id, p + ) + ) + + remote_state_map = { + (x.type, x.state_key): x.event_id + for x in remote_state + } + state_maps.append(remote_state_map) - for x in remote_state: - event_map[x.event_id] = x + for x in remote_state: + event_map[x.event_id] = x - room_version = await self.store.get_room_version_id(room_id) - state_map = ( - await self._state_resolution_handler.resolve_events_with_store( + room_version = await self.store.get_room_version_id(room_id) + state_map = await self._state_resolution_handler.resolve_events_with_store( room_id, room_version, state_maps, event_map, state_res_store=StateResolutionStore(self.store), ) - ) - # We need to give _process_received_pdu the actual state events - # rather than event ids, so generate that now. + # We need to give _process_received_pdu the actual state events + # rather than event ids, so generate that now. - # First though we need to fetch all the events that are in - # state_map, so we can build up the state below. - evs = await self.store.get_events( - list(state_map.values()), - get_prev_content=False, - redact_behaviour=EventRedactBehaviour.AS_IS, - ) - event_map.update(evs) + # First though we need to fetch all the events that are in + # state_map, so we can build up the state below. + evs = await self.store.get_events( + list(state_map.values()), + get_prev_content=False, + redact_behaviour=EventRedactBehaviour.AS_IS, + ) + event_map.update(evs) - state = [event_map[e] for e in state_map.values()] - except Exception: - logger.warning( - "Error attempting to resolve state at missing " "prev_events", - exc_info=True, - ) - raise FederationError( - "ERROR", - 403, - "We can't get valid state history.", - affected=event_id, - ) + state = [event_map[e] for e in state_map.values()] + except Exception: + logger.warning( + "Error attempting to resolve state at missing " + "prev_events", + exc_info=True, + ) + raise FederationError( + "ERROR", + 403, + "We can't get valid state history.", + affected=event_id, + ) # A second round of checks for all events. Check that the event passes auth # based on `auth_events`, this allows us to assert that the event would |