diff --git a/synapse/api/errors.py b/synapse/api/errors.py
index c606207569..e0873b1913 100644
--- a/synapse/api/errors.py
+++ b/synapse/api/errors.py
@@ -640,6 +640,27 @@ class FederationError(RuntimeError):
}
+class FederationPullAttemptBackoffError(RuntimeError):
+ """
+ Raised to indicate that we are are deliberately not attempting to pull the given
+ event over federation because we've already done so recently and are backing off.
+
+ Attributes:
+ event_id: The event_id which we are refusing to pull
+ message: A custom error message that gives more context
+ """
+
+ def __init__(self, event_ids: List[str], message: Optional[str]):
+ self.event_ids = event_ids
+
+ if message:
+ error_message = message
+ else:
+ error_message = f"Not attempting to pull event_ids={self.event_ids} because we already tried to pull them recently (backing off)."
+
+ super().__init__(error_message)
+
+
class HttpResponseException(CodeMessageException):
"""
Represents an HTTP-level failure of an outbound request
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 44e70c6c3c..5f7e0a1f79 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -45,6 +45,7 @@ from synapse.api.errors import (
Codes,
FederationDeniedError,
FederationError,
+ FederationPullAttemptBackoffError,
HttpResponseException,
LimitExceededError,
NotFoundError,
@@ -1720,7 +1721,22 @@ class FederationHandler:
destination, event
)
break
+ except FederationPullAttemptBackoffError as exc:
+ # Log a warning about why we failed to process the event (the error message
+ # for `FederationPullAttemptBackoffError` is pretty good)
+ logger.warning("_sync_partial_state_room: %s", exc)
+ # We do not record a failed pull attempt when we backoff fetching a missing
+ # `prev_event` because not being able to fetch the `prev_events` just means
+ # we won't be able to de-outlier the pulled event. But we can still use an
+ # `outlier` in the state/auth chain for another event. So we shouldn't stop
+ # a downstream event from trying to pull it.
+ #
+ # This avoids a cascade of backoff for all events in the DAG downstream from
+ # one event backoff upstream.
except FederationError as e:
+ # TODO: We should `record_event_failed_pull_attempt` here,
+ # see https://github.com/matrix-org/synapse/issues/13700
+
if attempt == len(destinations) - 1:
# We have tried every remote server for this event. Give up.
# TODO(faster_joins) giving up isn't the right thing to do
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index f382961099..4300e8dd40 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -44,6 +44,7 @@ from synapse.api.errors import (
AuthError,
Codes,
FederationError,
+ FederationPullAttemptBackoffError,
HttpResponseException,
RequestSendFailed,
SynapseError,
@@ -567,6 +568,9 @@ class FederationEventHandler:
event: partial-state event to be de-partial-stated
Raises:
+ FederationPullAttemptBackoffError if we are are deliberately not attempting
+ to pull the given event over federation because we've already done so
+ recently and are backing off.
FederationError if we fail to request state from the remote server.
"""
logger.info("Updating state for %s", event.event_id)
@@ -901,6 +905,18 @@ class FederationEventHandler:
context,
backfilled=backfilled,
)
+ except FederationPullAttemptBackoffError as exc:
+ # Log a warning about why we failed to process the event (the error message
+ # for `FederationPullAttemptBackoffError` is pretty good)
+ logger.warning("_process_pulled_event: %s", exc)
+ # We do not record a failed pull attempt when we backoff fetching a missing
+ # `prev_event` because not being able to fetch the `prev_events` just means
+ # we won't be able to de-outlier the pulled event. But we can still use an
+ # `outlier` in the state/auth chain for another event. So we shouldn't stop
+ # a downstream event from trying to pull it.
+ #
+ # This avoids a cascade of backoff for all events in the DAG downstream from
+ # one event backoff upstream.
except FederationError as e:
await self._store.record_event_failed_pull_attempt(
event.room_id, event_id, str(e)
@@ -947,6 +963,9 @@ class FederationEventHandler:
The event context.
Raises:
+ FederationPullAttemptBackoffError if we are are deliberately not attempting
+ to pull the given event over federation because we've already done so
+ recently and are backing off.
FederationError if we fail to get the state from the remote server after any
missing `prev_event`s.
"""
@@ -957,6 +976,18 @@ class FederationEventHandler:
seen = await self._store.have_events_in_timeline(prevs)
missing_prevs = prevs - seen
+ # If we've already recently attempted to pull this missing event, don't
+ # try it again so soon. Since we have to fetch all of the prev_events, we can
+ # bail early here if we find any to ignore.
+ prevs_to_ignore = await self._store.get_event_ids_to_not_pull_from_backoff(
+ room_id, missing_prevs
+ )
+ if len(prevs_to_ignore) > 0:
+ raise FederationPullAttemptBackoffError(
+ event_ids=prevs_to_ignore,
+ message=f"While computing context for event={event_id}, not attempting to pull missing prev_event={prevs_to_ignore[0]} because we already tried to pull recently (backing off).",
+ )
+
if not missing_prevs:
return await self._state_handler.compute_event_context(event)
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index 6b9a629edd..309a4ba664 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -1501,6 +1501,12 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
event_id: The event that failed to be fetched or processed
cause: The error message or reason that we failed to pull the event
"""
+ logger.debug(
+ "record_event_failed_pull_attempt room_id=%s, event_id=%s, cause=%s",
+ room_id,
+ event_id,
+ cause,
+ )
await self.db_pool.runInteraction(
"record_event_failed_pull_attempt",
self._record_event_failed_pull_attempt_upsert_txn,
@@ -1530,6 +1536,54 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
txn.execute(sql, (room_id, event_id, 1, self._clock.time_msec(), cause))
+ @trace
+ async def get_event_ids_to_not_pull_from_backoff(
+ self,
+ room_id: str,
+ event_ids: Collection[str],
+ ) -> List[str]:
+ """
+ Filter down the events to ones that we've failed to pull before recently. Uses
+ exponential backoff.
+
+ Args:
+ room_id: The room that the events belong to
+ event_ids: A list of events to filter down
+
+ Returns:
+ List of event_ids that should not be attempted to be pulled
+ """
+ event_failed_pull_attempts = await self.db_pool.simple_select_many_batch(
+ table="event_failed_pull_attempts",
+ column="event_id",
+ iterable=event_ids,
+ keyvalues={},
+ retcols=(
+ "event_id",
+ "last_attempt_ts",
+ "num_attempts",
+ ),
+ desc="get_event_ids_to_not_pull_from_backoff",
+ )
+
+ current_time = self._clock.time_msec()
+ return [
+ event_failed_pull_attempt["event_id"]
+ for event_failed_pull_attempt in event_failed_pull_attempts
+ # Exponential back-off (up to the upper bound) so we don't try to
+ # pull the same event over and over. ex. 2hr, 4hr, 8hr, 16hr, etc.
+ if current_time
+ < event_failed_pull_attempt["last_attempt_ts"]
+ + (
+ 2
+ ** min(
+ event_failed_pull_attempt["num_attempts"],
+ BACKFILL_EVENT_EXPONENTIAL_BACKOFF_MAXIMUM_DOUBLING_STEPS,
+ )
+ )
+ * BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_MILLISECONDS
+ ]
+
async def get_missing_events(
self,
room_id: str,
|