summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/13936.feature1
-rw-r--r--synapse/storage/databases/main/event_federation.py82
-rw-r--r--tests/storage/test_event_federation.py61
3 files changed, 103 insertions, 41 deletions
diff --git a/changelog.d/13936.feature b/changelog.d/13936.feature
new file mode 100644
index 0000000000..d86bf7ed80
--- /dev/null
+++ b/changelog.d/13936.feature
@@ -0,0 +1 @@
+Exponentially backoff from backfilling the same event over and over.
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index 17f2fd4458..6b9a629edd 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -73,13 +73,30 @@ pdus_pruned_from_federation_queue = Counter(
 
 logger = logging.getLogger(__name__)
 
-BACKFILL_EVENT_BACKOFF_UPPER_BOUND_SECONDS: int = int(
-    datetime.timedelta(days=7).total_seconds()
-)
-BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_SECONDS: int = int(
-    datetime.timedelta(hours=1).total_seconds()
+# Parameters controlling exponential backoff between backfill failures.
+# After the first failure to backfill, we wait 2 hours before trying again. If the
+# second attempt fails, we wait 4 hours before trying again. If the third attempt fails,
+# we wait 8 hours before trying again, ... and so on.
+#
+# Each successive backoff period is twice as long as the last. However we cap this
+# period at a maximum of 2^8 = 256 hours: a little over 10 days. (This is the smallest
+# power of 2 which yields a maximum backoff period of at least 7 days---which was the
+# original maximum backoff period.) Even when we hit this cap, we will continue to
+# make backfill attempts once every 10 days.
+BACKFILL_EVENT_EXPONENTIAL_BACKOFF_MAXIMUM_DOUBLING_STEPS = 8
+BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_MILLISECONDS = int(
+    datetime.timedelta(hours=1).total_seconds() * 1000
 )
 
+# We need a cap on the power of 2 or else the backoff period
+#   2^N * (milliseconds per hour)
+# will overflow when calcuated within the database. We ensure overflow does not occur
+# by checking that the largest backoff period fits in a 32-bit signed integer.
+_LONGEST_BACKOFF_PERIOD_MILLISECONDS = (
+    2**BACKFILL_EVENT_EXPONENTIAL_BACKOFF_MAXIMUM_DOUBLING_STEPS
+) * BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_MILLISECONDS
+assert 0 < _LONGEST_BACKOFF_PERIOD_MILLISECONDS <= ((2**31) - 1)
+
 
 # All the info we need while iterating the DAG while backfilling
 @attr.s(frozen=True, slots=True, auto_attribs=True)
@@ -767,7 +784,15 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
             # persisted in our database yet (meaning we don't know their depth
             # specifically). So we need to look for the approximate depth from
             # the events connected to the current backwards extremeties.
-            sql = """
+
+            if isinstance(self.database_engine, PostgresEngine):
+                least_function = "LEAST"
+            elif isinstance(self.database_engine, Sqlite3Engine):
+                least_function = "MIN"
+            else:
+                raise RuntimeError("Unknown database engine")
+
+            sql = f"""
                 SELECT backward_extrem.event_id, event.depth FROM events AS event
                 /**
                  * Get the edge connections from the event_edges table
@@ -825,7 +850,10 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
                      */
                     AND (
                         failed_backfill_attempt_info.event_id IS NULL
-                        OR ? /* current_time */ >= failed_backfill_attempt_info.last_attempt_ts + /*least*/%s((1 << failed_backfill_attempt_info.num_attempts) * ? /* step */, ? /* upper bound */)
+                        OR ? /* current_time */ >= failed_backfill_attempt_info.last_attempt_ts + (
+                            (1 << {least_function}(failed_backfill_attempt_info.num_attempts, ? /* max doubling steps */))
+                            * ? /* step */
+                        )
                     )
                 /**
                  * Sort from highest (closest to the `current_depth`) to the lowest depth
@@ -837,22 +865,15 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
                 LIMIT ?
             """
 
-            if isinstance(self.database_engine, PostgresEngine):
-                least_function = "least"
-            elif isinstance(self.database_engine, Sqlite3Engine):
-                least_function = "min"
-            else:
-                raise RuntimeError("Unknown database engine")
-
             txn.execute(
-                sql % (least_function,),
+                sql,
                 (
                     room_id,
                     False,
                     current_depth,
                     self._clock.time_msec(),
-                    1000 * BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_SECONDS,
-                    1000 * BACKFILL_EVENT_BACKOFF_UPPER_BOUND_SECONDS,
+                    BACKFILL_EVENT_EXPONENTIAL_BACKOFF_MAXIMUM_DOUBLING_STEPS,
+                    BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_MILLISECONDS,
                     limit,
                 ),
             )
@@ -902,7 +923,14 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
         def get_insertion_event_backward_extremities_in_room_txn(
             txn: LoggingTransaction, room_id: str
         ) -> List[Tuple[str, int]]:
-            sql = """
+            if isinstance(self.database_engine, PostgresEngine):
+                least_function = "LEAST"
+            elif isinstance(self.database_engine, Sqlite3Engine):
+                least_function = "MIN"
+            else:
+                raise RuntimeError("Unknown database engine")
+
+            sql = f"""
                 SELECT
                     insertion_event_extremity.event_id, event.depth
                 /* We only want insertion events that are also marked as backwards extremities */
@@ -942,7 +970,10 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
                      */
                     AND (
                         failed_backfill_attempt_info.event_id IS NULL
-                        OR ? /* current_time */ >= failed_backfill_attempt_info.last_attempt_ts + /*least*/%s((1 << failed_backfill_attempt_info.num_attempts) * ? /* step */, ? /* upper bound */)
+                        OR ? /* current_time */ >= failed_backfill_attempt_info.last_attempt_ts + (
+                            (1 << {least_function}(failed_backfill_attempt_info.num_attempts, ? /* max doubling steps */))
+                            * ? /* step */
+                        )
                     )
                 /**
                  * Sort from highest (closest to the `current_depth`) to the lowest depth
@@ -954,21 +985,14 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
                 LIMIT ?
             """
 
-            if isinstance(self.database_engine, PostgresEngine):
-                least_function = "least"
-            elif isinstance(self.database_engine, Sqlite3Engine):
-                least_function = "min"
-            else:
-                raise RuntimeError("Unknown database engine")
-
             txn.execute(
-                sql % (least_function,),
+                sql,
                 (
                     room_id,
                     current_depth,
                     self._clock.time_msec(),
-                    1000 * BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_SECONDS,
-                    1000 * BACKFILL_EVENT_BACKOFF_UPPER_BOUND_SECONDS,
+                    BACKFILL_EVENT_EXPONENTIAL_BACKOFF_MAXIMUM_DOUBLING_STEPS,
+                    BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_MILLISECONDS,
                     limit,
                 ),
             )
diff --git a/tests/storage/test_event_federation.py b/tests/storage/test_event_federation.py
index 398f338b66..59b8910907 100644
--- a/tests/storage/test_event_federation.py
+++ b/tests/storage/test_event_federation.py
@@ -766,9 +766,7 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase):
             self.store.get_backfill_points_in_room(room_id, depth_map["B"], limit=100)
         )
         backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
-        self.assertListEqual(
-            backfill_event_ids, ["b6", "b5", "b4", "2", "b3", "b2", "b1"]
-        )
+        self.assertEqual(backfill_event_ids, ["b6", "b5", "b4", "2", "b3", "b2", "b1"])
 
         # Try at "A"
         backfill_points = self.get_success(
@@ -814,7 +812,7 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase):
         )
         backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
         # Only the backfill points that we didn't record earlier exist here.
-        self.assertListEqual(backfill_event_ids, ["b6", "2", "b1"])
+        self.assertEqual(backfill_event_ids, ["b6", "2", "b1"])
 
     def test_get_backfill_points_in_room_attempted_event_retry_after_backoff_duration(
         self,
@@ -860,7 +858,7 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase):
             self.store.get_backfill_points_in_room(room_id, depth_map["A"], limit=100)
         )
         backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
-        self.assertListEqual(backfill_event_ids, ["b3", "b2"])
+        self.assertEqual(backfill_event_ids, ["b3", "b2"])
 
         # Now advance time by 20 hours (above 2^4 because we made 4 attemps) and
         # see if we can now backfill it
@@ -871,7 +869,48 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase):
             self.store.get_backfill_points_in_room(room_id, depth_map["A"], limit=100)
         )
         backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
-        self.assertListEqual(backfill_event_ids, ["b3", "b2", "b1"])
+        self.assertEqual(backfill_event_ids, ["b3", "b2", "b1"])
+
+    def test_get_backfill_points_in_room_works_after_many_failed_pull_attempts_that_could_naively_overflow(
+        self,
+    ) -> None:
+        """
+        A test that reproduces #13929 (Postgres only).
+
+        Test to make sure we can still get backfill points after many failed pull
+        attempts that cause us to backoff to the limit. Even if the backoff formula
+        would tell us to wait for more seconds than can be expressed in a 32 bit
+        signed int.
+        """
+        setup_info = self._setup_room_for_backfill_tests()
+        room_id = setup_info.room_id
+        depth_map = setup_info.depth_map
+
+        # Pretend that we have tried and failed 10 times to backfill event b1.
+        for _ in range(10):
+            self.get_success(
+                self.store.record_event_failed_pull_attempt(room_id, "b1", "fake cause")
+            )
+
+        # If the backoff periods grow without limit:
+        # After the first failed attempt, we would have backed off for 1 << 1 = 2 hours.
+        # After the second failed attempt we would have backed off for 1 << 2 = 4 hours,
+        # so after the 10th failed attempt we should backoff for 1 << 10 == 1024 hours.
+        # Wait 1100 hours just so we have a nice round number.
+        self.reactor.advance(datetime.timedelta(hours=1100).total_seconds())
+
+        # 1024 hours in milliseconds is 1024 * 3600000, which exceeds the largest 32 bit
+        # signed integer. The bug we're reproducing is that this overflow causes an
+        # error in postgres preventing us from fetching a set of backwards extremities
+        # to retry fetching.
+        backfill_points = self.get_success(
+            self.store.get_backfill_points_in_room(room_id, depth_map["A"], limit=100)
+        )
+
+        # We should aim to fetch all backoff points: b1's latest backoff period has
+        # expired, and we haven't tried the rest.
+        backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
+        self.assertEqual(backfill_event_ids, ["b3", "b2", "b1"])
 
     def _setup_room_for_insertion_backfill_tests(self) -> _BackfillSetupInfo:
         """
@@ -965,9 +1004,7 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase):
             )
         )
         backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
-        self.assertListEqual(
-            backfill_event_ids, ["insertion_eventB", "insertion_eventA"]
-        )
+        self.assertEqual(backfill_event_ids, ["insertion_eventB", "insertion_eventA"])
 
         # Try at "insertion_eventA"
         backfill_points = self.get_success(
@@ -1011,7 +1048,7 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase):
         )
         backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
         # Only the backfill points that we didn't record earlier exist here.
-        self.assertListEqual(backfill_event_ids, ["insertion_eventB"])
+        self.assertEqual(backfill_event_ids, ["insertion_eventB"])
 
     def test_get_insertion_event_backward_extremities_in_room_attempted_event_retry_after_backoff_duration(
         self,
@@ -1069,7 +1106,7 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase):
             )
         )
         backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
-        self.assertListEqual(backfill_event_ids, [])
+        self.assertEqual(backfill_event_ids, [])
 
         # Now advance time by 20 hours (above 2^4 because we made 4 attemps) and
         # see if we can now backfill it
@@ -1083,7 +1120,7 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase):
             )
         )
         backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
-        self.assertListEqual(backfill_event_ids, ["insertion_eventA"])
+        self.assertEqual(backfill_event_ids, ["insertion_eventA"])
 
 
 @attr.s