diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index c0e13bdaac..246df43501 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -203,18 +203,13 @@ class FederationHandler(BaseHandler):
self._ephemeral_messages_enabled = hs.config.enable_ephemeral_messages
- async def on_receive_pdu(
- self, origin: str, pdu: EventBase, sent_to_us_directly: bool = False
- ) -> None:
- """Process a PDU received via a federation /send/ transaction, or
- via backfill of missing prev_events
+ async def on_receive_pdu(self, origin: str, pdu: EventBase) -> None:
+ """Process a PDU received via a federation /send/ transaction
Args:
origin: server which initiated the /send/ transaction. Will
be used to fetch missing events or state.
pdu: received PDU
- sent_to_us_directly: True if this event was pushed to us; False if
- we pulled it as the result of a missing prev_event.
"""
room_id = pdu.room_id
@@ -276,8 +271,6 @@ class FederationHandler(BaseHandler):
)
return None
- state = None
-
# Check that the event passes auth based on the state at the event. This is
# done for events that are to be added to the timeline (non-outliers).
#
@@ -285,23 +278,16 @@ class FederationHandler(BaseHandler):
# - Fetching any missing prev events to fill in gaps in the graph
# - Fetching state if we have a hole in the graph
if not pdu.internal_metadata.is_outlier():
- # We only backfill backwards to the min depth.
- min_depth = await self.get_min_depth_for_context(pdu.room_id)
-
- logger.debug("min_depth: %d", min_depth)
-
prevs = set(pdu.prev_event_ids())
seen = await self.store.have_events_in_timeline(prevs)
+ missing_prevs = prevs - seen
+
+ if missing_prevs:
+ # We only backfill backwards to the min depth.
+ min_depth = await self.get_min_depth_for_context(pdu.room_id)
+ logger.debug("min_depth: %d", min_depth)
- if min_depth is not None and pdu.depth < min_depth:
- # This is so that we don't notify the user about this
- # message, to work around the fact that some events will
- # reference really really old events we really don't want to
- # send to the clients.
- pdu.internal_metadata.outlier = True
- elif min_depth is not None and pdu.depth > min_depth:
- missing_prevs = prevs - seen
- if sent_to_us_directly and missing_prevs:
+ if min_depth is not None and pdu.depth > min_depth:
# If we're missing stuff, ensure we only fetch stuff one
# at a time.
logger.info(
@@ -325,42 +311,23 @@ class FederationHandler(BaseHandler):
% (event_id, e)
) from e
- # Update the set of things we've seen after trying to
- # fetch the missing stuff
- seen = await self.store.have_events_in_timeline(prevs)
-
- if not prevs - seen:
- logger.info(
- "Found all missing prev_events",
- )
-
- missing_prevs = prevs - seen
- if missing_prevs:
- # We've still not been able to get all of the prev_events for this event.
- #
- # In this case, we need to fall back to asking another server in the
- # federation for the state at this event. That's ok provided we then
- # resolve the state against other bits of the DAG before using it (which
- # will ensure that you can't just take over a room by sending an event,
- # withholding its prev_events, and declaring yourself to be an admin in
- # the subsequent state request).
- #
- # Now, if we're pulling this event as a missing prev_event, then clearly
- # this event is not going to become the only forward-extremity and we are
- # guaranteed to resolve its state against our existing forward
- # extremities, so that should be fine.
- #
- # On the other hand, if this event was pushed to us, it is possible for
- # it to become the only forward-extremity in the room, and we would then
- # trust its state to be the state for the whole room. This is very bad.
- # Further, if the event was pushed to us, there is no excuse for us not to
- # have all the prev_events. We therefore reject any such events.
- #
- # XXX this really feels like it could/should be merged with the above,
- # but there is an interaction with min_depth that I'm not really
- # following.
-
- if sent_to_us_directly:
+ # Update the set of things we've seen after trying to
+ # fetch the missing stuff
+ seen = await self.store.have_events_in_timeline(prevs)
+ missing_prevs = prevs - seen
+
+ if not missing_prevs:
+ logger.info("Found all missing prev_events")
+
+ if missing_prevs:
+ # since this event was pushed to us, it is possible for it to
+ # become the only forward-extremity in the room, and we would then
+ # trust its state to be the state for the whole room. This is very
+ # bad. Further, if the event was pushed to us, there is no excuse
+ # for us not to have all the prev_events. (XXX: apart from
+ # min_depth?)
+ #
+ # We therefore reject any such events.
logger.warning(
"Rejecting: failed to fetch %d prev events: %s",
len(missing_prevs),
@@ -376,93 +343,7 @@ class FederationHandler(BaseHandler):
affected=pdu.event_id,
)
- logger.info(
- "Event %s is missing prev_events %s: calculating state for a "
- "backwards extremity",
- event_id,
- shortstr(missing_prevs),
- )
-
- # Calculate the state after each of the previous events, and
- # resolve them to find the correct state at the current event.
- event_map = {event_id: pdu}
- try:
- # Get the state of the events we know about
- ours = await self.state_store.get_state_groups_ids(room_id, seen)
-
- # state_maps is a list of mappings from (type, state_key) to event_id
- state_maps: List[StateMap[str]] = list(ours.values())
-
- # we don't need this any more, let's delete it.
- del ours
-
- # Ask the remote server for the states we don't
- # know about
- for p in missing_prevs:
- logger.info("Requesting state after missing prev_event %s", p)
-
- with nested_logging_context(p):
- # note that if any of the missing prevs share missing state or
- # auth events, the requests to fetch those events are deduped
- # by the get_pdu_cache in federation_client.
- remote_state = (
- await self._get_state_after_missing_prev_event(
- origin, room_id, p
- )
- )
-
- remote_state_map = {
- (x.type, x.state_key): x.event_id for x in remote_state
- }
- state_maps.append(remote_state_map)
-
- for x in remote_state:
- event_map[x.event_id] = x
-
- room_version = await self.store.get_room_version_id(room_id)
- state_map = (
- await self._state_resolution_handler.resolve_events_with_store(
- room_id,
- room_version,
- state_maps,
- event_map,
- state_res_store=StateResolutionStore(self.store),
- )
- )
-
- # We need to give _process_received_pdu the actual state events
- # rather than event ids, so generate that now.
-
- # First though we need to fetch all the events that are in
- # state_map, so we can build up the state below.
- evs = await self.store.get_events(
- list(state_map.values()),
- get_prev_content=False,
- redact_behaviour=EventRedactBehaviour.AS_IS,
- )
- event_map.update(evs)
-
- state = [event_map[e] for e in state_map.values()]
- except Exception:
- logger.warning(
- "Error attempting to resolve state at missing " "prev_events",
- exc_info=True,
- )
- raise FederationError(
- "ERROR",
- 403,
- "We can't get valid state history.",
- affected=event_id,
- )
-
- # A second round of checks for all events. Check that the event passes auth
- # based on `auth_events`, this allows us to assert that the event would
- # have been allowed at some point. If an event passes this check its OK
- # for it to be used as part of a returned `/state` request, as either
- # a) we received the event as part of the original join and so trust it, or
- # b) we'll do a state resolution with existing state before it becomes
- # part of the "current state", which adds more protection.
- await self._process_received_pdu(origin, pdu, state=state)
+ await self._process_received_pdu(origin, pdu, state=None)
async def _get_missing_events_for_pdu(
self, origin: str, pdu: EventBase, prevs: Set[str], min_depth: int
@@ -562,24 +443,7 @@ class FederationHandler(BaseHandler):
return
logger.info("Got %d prev_events", len(missing_events))
-
- # We want to sort these by depth so we process them and
- # tell clients about them in order.
- missing_events.sort(key=lambda x: x.depth)
-
- for ev in missing_events:
- logger.info("Handling received prev_event %s", ev)
- with nested_logging_context(ev.event_id):
- try:
- await self.on_receive_pdu(origin, ev, sent_to_us_directly=False)
- except FederationError as e:
- if e.code == 403:
- logger.warning(
- "Received prev_event %s failed history check.",
- ev.event_id,
- )
- else:
- raise
+ await self._process_pulled_events(origin, missing_events)
async def _get_state_for_room(
self,
@@ -1496,6 +1360,198 @@ class FederationHandler(BaseHandler):
event_infos,
)
+ async def _process_pulled_events(
+ self, origin: str, events: Iterable[EventBase]
+ ) -> None:
+ """Process a batch of events we have pulled from a remote server
+
+ Pulls in any events required to auth the events, persists the received events,
+ and notifies clients, if appropriate.
+
+ Assumes the events have already had their signatures and hashes checked.
+
+ Params:
+ origin: The server we received these events from
+ events: The received events.
+ """
+
+ # We want to sort these by depth so we process them and
+ # tell clients about them in order.
+ sorted_events = sorted(events, key=lambda x: x.depth)
+
+ for ev in sorted_events:
+ with nested_logging_context(ev.event_id):
+ await self._process_pulled_event(origin, ev)
+
+ async def _process_pulled_event(self, origin: str, event: EventBase) -> None:
+ """Process a single event that we have pulled from a remote server
+
+ Pulls in any events required to auth the event, persists the received event,
+ and notifies clients, if appropriate.
+
+ Assumes the event has already had its signatures and hashes checked.
+
+ This is somewhat equivalent to on_receive_pdu, but applies somewhat different
+ logic in the case that we are missing prev_events (in particular, it just
+ requests the state at that point, rather than triggering a get_missing_events) -
+ so is appropriate when we have pulled the event from a remote server, rather
+ than having it pushed to us.
+
+ Params:
+ origin: The server we received this event from
+ events: The received event
+ """
+ logger.info("Processing pulled event %s", event)
+
+ # these should not be outliers.
+ assert not event.internal_metadata.is_outlier()
+
+ event_id = event.event_id
+
+ existing = await self.store.get_event(
+ event_id, allow_none=True, allow_rejected=True
+ )
+ if existing:
+ if not existing.internal_metadata.is_outlier():
+ logger.info(
+ "Ignoring received event %s which we have already seen",
+ event_id,
+ )
+ return
+ logger.info("De-outliering event %s", event_id)
+
+ try:
+ self._sanity_check_event(event)
+ except SynapseError as err:
+ logger.warning("Event %s failed sanity check: %s", event_id, err)
+ return
+
+ try:
+ state = await self._resolve_state_at_missing_prevs(origin, event)
+ await self._process_received_pdu(origin, event, state=state)
+ except FederationError as e:
+ if e.code == 403:
+ logger.warning("Pulled event %s failed history check.", event_id)
+ else:
+ raise
+
+ async def _resolve_state_at_missing_prevs(
+ self, dest: str, event: EventBase
+ ) -> Optional[Iterable[EventBase]]:
+ """Calculate the state at an event with missing prev_events.
+
+ This is used when we have pulled a batch of events from a remote server, and
+ still don't have all the prev_events.
+
+ If we already have all the prev_events for `event`, this method does nothing.
+
+ Otherwise, the missing prevs become new backwards extremities, and we fall back
+ to asking the remote server for the state after each missing `prev_event`,
+ and resolving across them.
+
+ That's ok provided we then resolve the state against other bits of the DAG
+ before using it - in other words, that the received event `event` is not going
+ to become the only forwards_extremity in the room (which will ensure that you
+ can't just take over a room by sending an event, withholding its prev_events,
+ and declaring yourself to be an admin in the subsequent state request).
+
+ In other words: we should only call this method if `event` has been *pulled*
+ as part of a batch of missing prev events, or similar.
+
+ Params:
+ dest: the remote server to ask for state at the missing prevs. Typically,
+ this will be the server we got `event` from.
+ event: an event to check for missing prevs.
+
+ Returns:
+ if we already had all the prev events, `None`. Otherwise, returns a list of
+ the events in the state at `event`.
+ """
+ room_id = event.room_id
+ event_id = event.event_id
+
+ prevs = set(event.prev_event_ids())
+ seen = await self.store.have_events_in_timeline(prevs)
+ missing_prevs = prevs - seen
+
+ if not missing_prevs:
+ return None
+
+ logger.info(
+ "Event %s is missing prev_events %s: calculating state for a "
+ "backwards extremity",
+ event_id,
+ shortstr(missing_prevs),
+ )
+ # Calculate the state after each of the previous events, and
+ # resolve them to find the correct state at the current event.
+ event_map = {event_id: event}
+ try:
+ # Get the state of the events we know about
+ ours = await self.state_store.get_state_groups_ids(room_id, seen)
+
+ # state_maps is a list of mappings from (type, state_key) to event_id
+ state_maps: List[StateMap[str]] = list(ours.values())
+
+ # we don't need this any more, let's delete it.
+ del ours
+
+ # Ask the remote server for the states we don't
+ # know about
+ for p in missing_prevs:
+ logger.info("Requesting state after missing prev_event %s", p)
+
+ with nested_logging_context(p):
+ # note that if any of the missing prevs share missing state or
+ # auth events, the requests to fetch those events are deduped
+ # by the get_pdu_cache in federation_client.
+ remote_state = await self._get_state_after_missing_prev_event(
+ dest, room_id, p
+ )
+
+ remote_state_map = {
+ (x.type, x.state_key): x.event_id for x in remote_state
+ }
+ state_maps.append(remote_state_map)
+
+ for x in remote_state:
+ event_map[x.event_id] = x
+
+ room_version = await self.store.get_room_version_id(room_id)
+ state_map = await self._state_resolution_handler.resolve_events_with_store(
+ room_id,
+ room_version,
+ state_maps,
+ event_map,
+ state_res_store=StateResolutionStore(self.store),
+ )
+
+ # We need to give _process_received_pdu the actual state events
+ # rather than event ids, so generate that now.
+
+ # First though we need to fetch all the events that are in
+ # state_map, so we can build up the state below.
+ evs = await self.store.get_events(
+ list(state_map.values()),
+ get_prev_content=False,
+ redact_behaviour=EventRedactBehaviour.AS_IS,
+ )
+ event_map.update(evs)
+
+ state = [event_map[e] for e in state_map.values()]
+ except Exception:
+ logger.warning(
+ "Error attempting to resolve state at missing prev_events",
+ exc_info=True,
+ )
+ raise FederationError(
+ "ERROR",
+ 403,
+ "We can't get valid state history.",
+ affected=event_id,
+ )
+ return state
+
def _sanity_check_event(self, ev: EventBase) -> None:
"""
Do some early sanity checks of a received event
@@ -1764,7 +1820,7 @@ class FederationHandler(BaseHandler):
p,
)
with nested_logging_context(p.event_id):
- await self.on_receive_pdu(origin, p, sent_to_us_directly=True)
+ await self.on_receive_pdu(origin, p)
except Exception as e:
logger.warning(
"Error handling queued PDU %s from %s: %s", p.event_id, origin, e
@@ -2375,6 +2431,7 @@ class FederationHandler(BaseHandler):
not event.internal_metadata.is_outlier()
and not backfilled
and not context.rejected
+ and (await self.store.get_min_depth(event.room_id)) <= event.depth
):
await self.action_generator.handle_push_actions_for_event(
event, context
|