diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index 17f2fd4458..6b9a629edd 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -73,13 +73,30 @@ pdus_pruned_from_federation_queue = Counter(
logger = logging.getLogger(__name__)
-BACKFILL_EVENT_BACKOFF_UPPER_BOUND_SECONDS: int = int(
- datetime.timedelta(days=7).total_seconds()
-)
-BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_SECONDS: int = int(
- datetime.timedelta(hours=1).total_seconds()
+# Parameters controlling exponential backoff between backfill failures.
+# After the first failure to backfill, we wait 2 hours before trying again. If the
+# second attempt fails, we wait 4 hours before trying again. If the third attempt fails,
+# we wait 8 hours before trying again, ... and so on.
+#
+# Each successive backoff period is twice as long as the last. However we cap this
+# period at a maximum of 2^8 = 256 hours: a little over 10 days. (This is the smallest
+# power of 2 which yields a maximum backoff period of at least 7 days---which was the
+# original maximum backoff period.) Even when we hit this cap, we will continue to
+# make backfill attempts once every 10 days.
+BACKFILL_EVENT_EXPONENTIAL_BACKOFF_MAXIMUM_DOUBLING_STEPS = 8
+BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_MILLISECONDS = int(
+ datetime.timedelta(hours=1).total_seconds() * 1000
)
+# We need a cap on the power of 2 or else the backoff period
+# 2^N * (milliseconds per hour)
+# will overflow when calcuated within the database. We ensure overflow does not occur
+# by checking that the largest backoff period fits in a 32-bit signed integer.
+_LONGEST_BACKOFF_PERIOD_MILLISECONDS = (
+ 2**BACKFILL_EVENT_EXPONENTIAL_BACKOFF_MAXIMUM_DOUBLING_STEPS
+) * BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_MILLISECONDS
+assert 0 < _LONGEST_BACKOFF_PERIOD_MILLISECONDS <= ((2**31) - 1)
+
# All the info we need while iterating the DAG while backfilling
@attr.s(frozen=True, slots=True, auto_attribs=True)
@@ -767,7 +784,15 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
# persisted in our database yet (meaning we don't know their depth
# specifically). So we need to look for the approximate depth from
# the events connected to the current backwards extremeties.
- sql = """
+
+ if isinstance(self.database_engine, PostgresEngine):
+ least_function = "LEAST"
+ elif isinstance(self.database_engine, Sqlite3Engine):
+ least_function = "MIN"
+ else:
+ raise RuntimeError("Unknown database engine")
+
+ sql = f"""
SELECT backward_extrem.event_id, event.depth FROM events AS event
/**
* Get the edge connections from the event_edges table
@@ -825,7 +850,10 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
*/
AND (
failed_backfill_attempt_info.event_id IS NULL
- OR ? /* current_time */ >= failed_backfill_attempt_info.last_attempt_ts + /*least*/%s((1 << failed_backfill_attempt_info.num_attempts) * ? /* step */, ? /* upper bound */)
+ OR ? /* current_time */ >= failed_backfill_attempt_info.last_attempt_ts + (
+ (1 << {least_function}(failed_backfill_attempt_info.num_attempts, ? /* max doubling steps */))
+ * ? /* step */
+ )
)
/**
* Sort from highest (closest to the `current_depth`) to the lowest depth
@@ -837,22 +865,15 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
LIMIT ?
"""
- if isinstance(self.database_engine, PostgresEngine):
- least_function = "least"
- elif isinstance(self.database_engine, Sqlite3Engine):
- least_function = "min"
- else:
- raise RuntimeError("Unknown database engine")
-
txn.execute(
- sql % (least_function,),
+ sql,
(
room_id,
False,
current_depth,
self._clock.time_msec(),
- 1000 * BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_SECONDS,
- 1000 * BACKFILL_EVENT_BACKOFF_UPPER_BOUND_SECONDS,
+ BACKFILL_EVENT_EXPONENTIAL_BACKOFF_MAXIMUM_DOUBLING_STEPS,
+ BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_MILLISECONDS,
limit,
),
)
@@ -902,7 +923,14 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
def get_insertion_event_backward_extremities_in_room_txn(
txn: LoggingTransaction, room_id: str
) -> List[Tuple[str, int]]:
- sql = """
+ if isinstance(self.database_engine, PostgresEngine):
+ least_function = "LEAST"
+ elif isinstance(self.database_engine, Sqlite3Engine):
+ least_function = "MIN"
+ else:
+ raise RuntimeError("Unknown database engine")
+
+ sql = f"""
SELECT
insertion_event_extremity.event_id, event.depth
/* We only want insertion events that are also marked as backwards extremities */
@@ -942,7 +970,10 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
*/
AND (
failed_backfill_attempt_info.event_id IS NULL
- OR ? /* current_time */ >= failed_backfill_attempt_info.last_attempt_ts + /*least*/%s((1 << failed_backfill_attempt_info.num_attempts) * ? /* step */, ? /* upper bound */)
+ OR ? /* current_time */ >= failed_backfill_attempt_info.last_attempt_ts + (
+ (1 << {least_function}(failed_backfill_attempt_info.num_attempts, ? /* max doubling steps */))
+ * ? /* step */
+ )
)
/**
* Sort from highest (closest to the `current_depth`) to the lowest depth
@@ -954,21 +985,14 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
LIMIT ?
"""
- if isinstance(self.database_engine, PostgresEngine):
- least_function = "least"
- elif isinstance(self.database_engine, Sqlite3Engine):
- least_function = "min"
- else:
- raise RuntimeError("Unknown database engine")
-
txn.execute(
- sql % (least_function,),
+ sql,
(
room_id,
current_depth,
self._clock.time_msec(),
- 1000 * BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_SECONDS,
- 1000 * BACKFILL_EVENT_BACKOFF_UPPER_BOUND_SECONDS,
+ BACKFILL_EVENT_EXPONENTIAL_BACKOFF_MAXIMUM_DOUBLING_STEPS,
+ BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_MILLISECONDS,
limit,
),
)
|