diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index 6b9a629edd..309a4ba664 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -1501,6 +1501,12 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
event_id: The event that failed to be fetched or processed
cause: The error message or reason that we failed to pull the event
"""
+ logger.debug(
+ "record_event_failed_pull_attempt room_id=%s, event_id=%s, cause=%s",
+ room_id,
+ event_id,
+ cause,
+ )
await self.db_pool.runInteraction(
"record_event_failed_pull_attempt",
self._record_event_failed_pull_attempt_upsert_txn,
@@ -1530,6 +1536,54 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
txn.execute(sql, (room_id, event_id, 1, self._clock.time_msec(), cause))
+ @trace
+ async def get_event_ids_to_not_pull_from_backoff(
+ self,
+ room_id: str,
+ event_ids: Collection[str],
+ ) -> List[str]:
+ """
+ Filter down the events to ones that we've failed to pull before recently. Uses
+ exponential backoff.
+
+ Args:
+ room_id: The room that the events belong to
+ event_ids: A list of events to filter down
+
+ Returns:
+ List of event_ids that should not be attempted to be pulled
+ """
+ event_failed_pull_attempts = await self.db_pool.simple_select_many_batch(
+ table="event_failed_pull_attempts",
+ column="event_id",
+ iterable=event_ids,
+ keyvalues={},
+ retcols=(
+ "event_id",
+ "last_attempt_ts",
+ "num_attempts",
+ ),
+ desc="get_event_ids_to_not_pull_from_backoff",
+ )
+
+ current_time = self._clock.time_msec()
+ return [
+ event_failed_pull_attempt["event_id"]
+ for event_failed_pull_attempt in event_failed_pull_attempts
+ # Exponential back-off (up to the upper bound) so we don't try to
+ # pull the same event over and over. ex. 2hr, 4hr, 8hr, 16hr, etc.
+ if current_time
+ < event_failed_pull_attempt["last_attempt_ts"]
+ + (
+ 2
+ ** min(
+ event_failed_pull_attempt["num_attempts"],
+ BACKFILL_EVENT_EXPONENTIAL_BACKOFF_MAXIMUM_DOUBLING_STEPS,
+ )
+ )
+ * BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_MILLISECONDS
+ ]
+
async def get_missing_events(
self,
room_id: str,
|