diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 8197b60b76..8b602e3813 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -42,6 +42,7 @@ from twisted.internet import defer
from synapse import event_auth
from synapse.api.constants import (
+ EventContentFields,
EventTypes,
Membership,
RejectedReason,
@@ -262,7 +263,12 @@ class FederationHandler(BaseHandler):
state = None
- # Get missing pdus if necessary.
+ # Check that the event passes auth based on the state at the event. This is
+ # done for events that are to be added to the timeline (non-outliers).
+ #
+ # Get missing pdus if necessary:
+ # - Fetching any missing prev events to fill in gaps in the graph
+ # - Fetching state if we have a hole in the graph
if not pdu.internal_metadata.is_outlier():
# We only backfill backwards to the min depth.
min_depth = await self.get_min_depth_for_context(pdu.room_id)
@@ -432,6 +438,13 @@ class FederationHandler(BaseHandler):
affected=event_id,
)
+ # A second round of checks for all events. Check that the event passes auth
+ # based on `auth_events`, this allows us to assert that the event would
+ # have been allowed at some point. If an event passes this check its OK
+ # for it to be used as part of a returned `/state` request, as either
+ # a) we received the event as part of the original join and so trust it, or
+ # b) we'll do a state resolution with existing state before it becomes
+ # part of the "current state", which adds more protection.
await self._process_received_pdu(origin, pdu, state=state)
async def _get_missing_events_for_pdu(
@@ -889,6 +902,79 @@ class FederationHandler(BaseHandler):
"resync_device_due_to_pdu", self._resync_device, event.sender
)
+ await self._handle_marker_event(origin, event)
+
+ async def _handle_marker_event(self, origin: str, marker_event: EventBase):
+ """Handles backfilling the insertion event when we receive a marker
+ event that points to one.
+
+ Args:
+ origin: Origin of the event. Will be called to get the insertion event
+ marker_event: The event to process
+ """
+
+ if marker_event.type != EventTypes.MSC2716_MARKER:
+ # Not a marker event
+ return
+
+ if marker_event.rejected_reason is not None:
+ # Rejected event
+ return
+
+ # Skip processing a marker event if the room version doesn't
+ # support it.
+ room_version = await self.store.get_room_version(marker_event.room_id)
+ if not room_version.msc2716_historical:
+ return
+
+ logger.debug("_handle_marker_event: received %s", marker_event)
+
+ insertion_event_id = marker_event.content.get(
+ EventContentFields.MSC2716_MARKER_INSERTION
+ )
+
+ if insertion_event_id is None:
+ # Nothing to retrieve then (invalid marker)
+ return
+
+ logger.debug(
+ "_handle_marker_event: backfilling insertion event %s", insertion_event_id
+ )
+
+ await self._get_events_and_persist(
+ origin,
+ marker_event.room_id,
+ [insertion_event_id],
+ )
+
+ insertion_event = await self.store.get_event(
+ insertion_event_id, allow_none=True
+ )
+ if insertion_event is None:
+ logger.warning(
+ "_handle_marker_event: server %s didn't return insertion event %s for marker %s",
+ origin,
+ insertion_event_id,
+ marker_event.event_id,
+ )
+ return
+
+ logger.debug(
+ "_handle_marker_event: succesfully backfilled insertion event %s from marker event %s",
+ insertion_event,
+ marker_event,
+ )
+
+ await self.store.insert_insertion_extremity(
+ insertion_event_id, marker_event.room_id
+ )
+
+ logger.debug(
+ "_handle_marker_event: insertion extremity added for %s from marker event %s",
+ insertion_event,
+ marker_event,
+ )
+
async def _resync_device(self, sender: str) -> None:
"""We have detected that the device list for the given user may be out
of sync, so we try and resync them.
@@ -1057,9 +1143,19 @@ class FederationHandler(BaseHandler):
async def _maybe_backfill_inner(
self, room_id: str, current_depth: int, limit: int
) -> bool:
- extremities = await self.store.get_oldest_events_with_depth_in_room(room_id)
+ oldest_events_with_depth = (
+ await self.store.get_oldest_event_ids_with_depth_in_room(room_id)
+ )
+ insertion_events_to_be_backfilled = (
+ await self.store.get_insertion_event_backwards_extremities_in_room(room_id)
+ )
+ logger.debug(
+ "_maybe_backfill_inner: extremities oldest_events_with_depth=%s insertion_events_to_be_backfilled=%s",
+ oldest_events_with_depth,
+ insertion_events_to_be_backfilled,
+ )
- if not extremities:
+ if not oldest_events_with_depth and not insertion_events_to_be_backfilled:
logger.debug("Not backfilling as no extremeties found.")
return False
@@ -1089,10 +1185,12 @@ class FederationHandler(BaseHandler):
# state *before* the event, ignoring the special casing certain event
# types have.
- forward_events = await self.store.get_successor_events(list(extremities))
+ forward_event_ids = await self.store.get_successor_events(
+ list(oldest_events_with_depth)
+ )
extremities_events = await self.store.get_events(
- forward_events,
+ forward_event_ids,
redact_behaviour=EventRedactBehaviour.AS_IS,
get_prev_content=False,
)
@@ -1106,10 +1204,19 @@ class FederationHandler(BaseHandler):
redact=False,
check_history_visibility_only=True,
)
+ logger.debug(
+ "_maybe_backfill_inner: filtered_extremities %s", filtered_extremities
+ )
- if not filtered_extremities:
+ if not filtered_extremities and not insertion_events_to_be_backfilled:
return False
+ extremities = {
+ **oldest_events_with_depth,
+ # TODO: insertion_events_to_be_backfilled is currently skipping the filtered_extremities checks
+ **insertion_events_to_be_backfilled,
+ }
+
# Check if we reached a point where we should start backfilling.
sorted_extremeties_tuple = sorted(extremities.items(), key=lambda e: -int(e[1]))
max_depth = sorted_extremeties_tuple[0][1]
|