summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/13879.misc1
-rw-r--r--synapse/handlers/federation.py109
-rw-r--r--synapse/storage/databases/main/event_federation.py90
-rw-r--r--tests/storage/test_event_federation.py80
4 files changed, 198 insertions, 82 deletions
diff --git a/changelog.d/13879.misc b/changelog.d/13879.misc
new file mode 100644
index 0000000000..3cc2a2420f
--- /dev/null
+++ b/changelog.d/13879.misc
@@ -0,0 +1 @@
+Only pull relevant backfill points from the database based on the current depth and limit (instead of all) every time we want to `/backfill`.
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 360ab6fee2..500c1c16d0 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -38,7 +38,7 @@ from signedjson.sign import verify_signed_json
 from unpaddedbase64 import decode_base64
 
 from synapse import event_auth
-from synapse.api.constants import EventContentFields, EventTypes, Membership
+from synapse.api.constants import MAX_DEPTH, EventContentFields, EventTypes, Membership
 from synapse.api.errors import (
     AuthError,
     CodeMessageException,
@@ -211,7 +211,7 @@ class FederationHandler:
         current_depth: int,
         limit: int,
         *,
-        processing_start_time: int,
+        processing_start_time: Optional[int],
     ) -> bool:
         """
         Checks whether the `current_depth` is at or approaching any backfill
@@ -223,12 +223,23 @@ class FederationHandler:
             room_id: The room to backfill in.
             current_depth: The depth to check at for any upcoming backfill points.
             limit: The max number of events to request from the remote federated server.
-            processing_start_time: The time when `maybe_backfill` started
-                processing. Only used for timing.
+            processing_start_time: The time when `maybe_backfill` started processing.
+                Only used for timing. If `None`, no timing observation will be made.
         """
         backwards_extremities = [
             _BackfillPoint(event_id, depth, _BackfillPointType.BACKWARDS_EXTREMITY)
-            for event_id, depth in await self.store.get_backfill_points_in_room(room_id)
+            for event_id, depth in await self.store.get_backfill_points_in_room(
+                room_id=room_id,
+                current_depth=current_depth,
+                # We only need to end up with 5 extremities combined with the
+                # insertion event extremities to make the `/backfill` request
+                # but fetch an order of magnitude more to make sure there is
+                # enough even after we filter them by whether visible in the
+                # history. This isn't fool-proof as all backfill points within
+                # our limit could be filtered out but seems like a good amount
+                # to try with at least.
+                limit=50,
+            )
         ]
 
         insertion_events_to_be_backfilled: List[_BackfillPoint] = []
@@ -236,7 +247,12 @@ class FederationHandler:
             insertion_events_to_be_backfilled = [
                 _BackfillPoint(event_id, depth, _BackfillPointType.INSERTION_PONT)
                 for event_id, depth in await self.store.get_insertion_event_backward_extremities_in_room(
-                    room_id
+                    room_id=room_id,
+                    current_depth=current_depth,
+                    # We only need to end up with 5 extremities combined with
+                    # the backfill points to make the `/backfill` request ...
+                    # (see the other comment above for more context).
+                    limit=50,
                 )
             ]
         logger.debug(
@@ -245,10 +261,6 @@ class FederationHandler:
             insertion_events_to_be_backfilled,
         )
 
-        if not backwards_extremities and not insertion_events_to_be_backfilled:
-            logger.debug("Not backfilling as no extremeties found.")
-            return False
-
         # we now have a list of potential places to backpaginate from. We prefer to
         # start with the most recent (ie, max depth), so let's sort the list.
         sorted_backfill_points: List[_BackfillPoint] = sorted(
@@ -269,6 +281,33 @@ class FederationHandler:
             sorted_backfill_points,
         )
 
+        # If we have no backfill points lower than the `current_depth` then
+        # either we can a) bail or b) still attempt to backfill. We opt to try
+        # backfilling anyway just in case we do get relevant events.
+        if not sorted_backfill_points and current_depth != MAX_DEPTH:
+            logger.debug(
+                "_maybe_backfill_inner: all backfill points are *after* current depth. Trying again with later backfill points."
+            )
+            return await self._maybe_backfill_inner(
+                room_id=room_id,
+                # We use `MAX_DEPTH` so that we find all backfill points next
+                # time (all events are below the `MAX_DEPTH`)
+                current_depth=MAX_DEPTH,
+                limit=limit,
+                # We don't want to start another timing observation from this
+                # nested recursive call. The top-most call can record the time
+                # overall otherwise the smaller one will throw off the results.
+                processing_start_time=None,
+            )
+
+        # Even after recursing with `MAX_DEPTH`, we didn't find any
+        # backward extremities to backfill from.
+        if not sorted_backfill_points:
+            logger.debug(
+                "_maybe_backfill_inner: Not backfilling as no backward extremeties found."
+            )
+            return False
+
         # If we're approaching an extremity we trigger a backfill, otherwise we
         # no-op.
         #
@@ -278,47 +317,16 @@ class FederationHandler:
         # chose more than one times the limit in case of failure, but choosing a
         # much larger factor will result in triggering a backfill request much
         # earlier than necessary.
-        #
-        # XXX: shouldn't we do this *after* the filter by depth below? Again, we don't
-        # care about events that have happened after our current position.
-        #
-        max_depth = sorted_backfill_points[0].depth
-        if current_depth - 2 * limit > max_depth:
+        max_depth_of_backfill_points = sorted_backfill_points[0].depth
+        if current_depth - 2 * limit > max_depth_of_backfill_points:
             logger.debug(
                 "Not backfilling as we don't need to. %d < %d - 2 * %d",
-                max_depth,
+                max_depth_of_backfill_points,
                 current_depth,
                 limit,
             )
             return False
 
-        # We ignore extremities that have a greater depth than our current depth
-        # as:
-        #    1. we don't really care about getting events that have happened
-        #       after our current position; and
-        #    2. we have likely previously tried and failed to backfill from that
-        #       extremity, so to avoid getting "stuck" requesting the same
-        #       backfill repeatedly we drop those extremities.
-        #
-        # However, we need to check that the filtered extremities are non-empty.
-        # If they are empty then either we can a) bail or b) still attempt to
-        # backfill. We opt to try backfilling anyway just in case we do get
-        # relevant events.
-        #
-        filtered_sorted_backfill_points = [
-            t for t in sorted_backfill_points if t.depth <= current_depth
-        ]
-        if filtered_sorted_backfill_points:
-            logger.debug(
-                "_maybe_backfill_inner: backfill points before current depth: %s",
-                filtered_sorted_backfill_points,
-            )
-            sorted_backfill_points = filtered_sorted_backfill_points
-        else:
-            logger.debug(
-                "_maybe_backfill_inner: all backfill points are *after* current depth. Backfilling anyway."
-            )
-
         # For performance's sake, we only want to paginate from a particular extremity
         # if we can actually see the events we'll get. Otherwise, we'd just spend a lot
         # of resources to get redacted events. We check each extremity in turn and
@@ -452,10 +460,15 @@ class FederationHandler:
 
             return False
 
-        processing_end_time = self.clock.time_msec()
-        backfill_processing_before_timer.observe(
-            (processing_end_time - processing_start_time) / 1000
-        )
+        # If we have the `processing_start_time`, then we can make an
+        # observation. We wouldn't have the `processing_start_time` in the case
+        # where `_maybe_backfill_inner` is recursively called to find any
+        # backfill points regardless of `current_depth`.
+        if processing_start_time is not None:
+            processing_end_time = self.clock.time_msec()
+            backfill_processing_before_timer.observe(
+                (processing_end_time - processing_start_time) / 1000
+            )
 
         success = await try_backfill(likely_domains)
         if success:
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index 3251fca6fb..17f2fd4458 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -726,17 +726,35 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
     async def get_backfill_points_in_room(
         self,
         room_id: str,
+        current_depth: int,
+        limit: int,
     ) -> List[Tuple[str, int]]:
         """
-        Gets the oldest events(backwards extremities) in the room along with the
-        approximate depth. Sorted by depth, highest to lowest (descending).
+        Get the backward extremities to backfill from in the room along with the
+        approximate depth.
+
+        Only returns events that are at a depth lower than or
+        equal to the `current_depth`. Sorted by depth, highest to lowest (descending)
+        so the closest events to the `current_depth` are first in the list.
+
+        We ignore extremities that are newer than the user's current scroll position
+        (ie, those with depth greater than `current_depth`) as:
+            1. we don't really care about getting events that have happened
+               after our current position; and
+            2. by the nature of paginating and scrolling back, we have likely
+               previously tried and failed to backfill from that extremity, so
+               to avoid getting "stuck" requesting the same backfill repeatedly
+               we drop those extremities.
 
         Args:
             room_id: Room where we want to find the oldest events
+            current_depth: The depth at the user's current scrollback position
+            limit: The max number of backfill points to return
 
         Returns:
             List of (event_id, depth) tuples. Sorted by depth, highest to lowest
-            (descending)
+            (descending) so the closest events to the `current_depth` are first
+            in the list.
         """
 
         def get_backfill_points_in_room_txn(
@@ -785,6 +803,18 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
                      */
                     AND edge.is_state is ? /* False */
                     /**
+                     * We only want backwards extremities that are older than or at
+                     * the same position of the given `current_depth` (where older
+                     * means less than the given depth) because we're looking backwards
+                     * from the `current_depth` when backfilling.
+                     *
+                     *                         current_depth (ignore events that come after this, ignore 2-4)
+                     *                         |
+                     *                         ▼
+                     * <oldest-in-time> [0]<--[1]<--[2]<--[3]<--[4] <newest-in-time>
+                     */
+                    AND event.depth <= ? /* current_depth */
+                    /**
                      * Exponential back-off (up to the upper bound) so we don't retry the
                      * same backfill point over and over. ex. 2hr, 4hr, 8hr, 16hr, etc.
                      *
@@ -798,11 +828,13 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
                         OR ? /* current_time */ >= failed_backfill_attempt_info.last_attempt_ts + /*least*/%s((1 << failed_backfill_attempt_info.num_attempts) * ? /* step */, ? /* upper bound */)
                     )
                 /**
-                 * Sort from highest to the lowest depth. Then tie-break on
-                 * alphabetical order of the event_ids so we get a consistent
-                 * ordering which is nice when asserting things in tests.
+                 * Sort from highest (closest to the `current_depth`) to the lowest depth
+                 * because the closest are most relevant to backfill from first.
+                 * Then tie-break on alphabetical order of the event_ids so we get a
+                 * consistent ordering which is nice when asserting things in tests.
                  */
                 ORDER BY event.depth DESC, backward_extrem.event_id DESC
+                LIMIT ?
             """
 
             if isinstance(self.database_engine, PostgresEngine):
@@ -817,9 +849,11 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
                 (
                     room_id,
                     False,
+                    current_depth,
                     self._clock.time_msec(),
                     1000 * BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_SECONDS,
                     1000 * BACKFILL_EVENT_BACKOFF_UPPER_BOUND_SECONDS,
+                    limit,
                 ),
             )
 
@@ -835,18 +869,34 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
     async def get_insertion_event_backward_extremities_in_room(
         self,
         room_id: str,
+        current_depth: int,
+        limit: int,
     ) -> List[Tuple[str, int]]:
         """
         Get the insertion events we know about that we haven't backfilled yet
-        along with the approximate depth. Sorted by depth, highest to lowest
-        (descending).
+        along with the approximate depth. Only returns insertion events that are
+        at a depth lower than or equal to the `current_depth`. Sorted by depth,
+        highest to lowest (descending) so the closest events to the
+        `current_depth` are first in the list.
+
+        We ignore insertion events that are newer than the user's current scroll
+        position (ie, those with depth greater than `current_depth`) as:
+            1. we don't really care about getting events that have happened
+               after our current position; and
+            2. by the nature of paginating and scrolling back, we have likely
+               previously tried and failed to backfill from that insertion event, so
+               to avoid getting "stuck" requesting the same backfill repeatedly
+               we drop those insertion event.
 
         Args:
             room_id: Room where we want to find the oldest events
+            current_depth: The depth at the user's current scrollback position
+            limit: The max number of insertion event extremities to return
 
         Returns:
             List of (event_id, depth) tuples. Sorted by depth, highest to lowest
-            (descending)
+            (descending) so the closest events to the `current_depth` are first
+            in the list.
         """
 
         def get_insertion_event_backward_extremities_in_room_txn(
@@ -870,6 +920,18 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
                 WHERE
                     insertion_event_extremity.room_id = ?
                     /**
+                     * We only want extremities that are older than or at
+                     * the same position of the given `current_depth` (where older
+                     * means less than the given depth) because we're looking backwards
+                     * from the `current_depth` when backfilling.
+                     *
+                     *                         current_depth (ignore events that come after this, ignore 2-4)
+                     *                         |
+                     *                         ▼
+                     * <oldest-in-time> [0]<--[1]<--[2]<--[3]<--[4] <newest-in-time>
+                     */
+                    AND event.depth <= ? /* current_depth */
+                    /**
                      * Exponential back-off (up to the upper bound) so we don't retry the
                      * same backfill point over and over. ex. 2hr, 4hr, 8hr, 16hr, etc
                      *
@@ -883,11 +945,13 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
                         OR ? /* current_time */ >= failed_backfill_attempt_info.last_attempt_ts + /*least*/%s((1 << failed_backfill_attempt_info.num_attempts) * ? /* step */, ? /* upper bound */)
                     )
                 /**
-                 * Sort from highest to the lowest depth. Then tie-break on
-                 * alphabetical order of the event_ids so we get a consistent
-                 * ordering which is nice when asserting things in tests.
+                 * Sort from highest (closest to the `current_depth`) to the lowest depth
+                 * because the closest are most relevant to backfill from first.
+                 * Then tie-break on alphabetical order of the event_ids so we get a
+                 * consistent ordering which is nice when asserting things in tests.
                  */
                 ORDER BY event.depth DESC, insertion_event_extremity.event_id DESC
+                LIMIT ?
             """
 
             if isinstance(self.database_engine, PostgresEngine):
@@ -901,9 +965,11 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
                 sql % (least_function,),
                 (
                     room_id,
+                    current_depth,
                     self._clock.time_msec(),
                     1000 * BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_SECONDS,
                     1000 * BACKFILL_EVENT_BACKOFF_UPPER_BOUND_SECONDS,
+                    limit,
                 ),
             )
             return cast(List[Tuple[str, int]], txn.fetchall())
diff --git a/tests/storage/test_event_federation.py b/tests/storage/test_event_federation.py
index 85739c464e..398f338b66 100644
--- a/tests/storage/test_event_federation.py
+++ b/tests/storage/test_event_federation.py
@@ -754,19 +754,31 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase):
 
     def test_get_backfill_points_in_room(self):
         """
-        Test to make sure we get some backfill points
+        Test to make sure only backfill points that are older and come before
+        the `current_depth` are returned.
         """
         setup_info = self._setup_room_for_backfill_tests()
         room_id = setup_info.room_id
+        depth_map = setup_info.depth_map
 
+        # Try at "B"
         backfill_points = self.get_success(
-            self.store.get_backfill_points_in_room(room_id)
+            self.store.get_backfill_points_in_room(room_id, depth_map["B"], limit=100)
         )
         backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
         self.assertListEqual(
             backfill_event_ids, ["b6", "b5", "b4", "2", "b3", "b2", "b1"]
         )
 
+        # Try at "A"
+        backfill_points = self.get_success(
+            self.store.get_backfill_points_in_room(room_id, depth_map["A"], limit=100)
+        )
+        backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
+        # Event "2" has a depth of 2 but is not included here because we only
+        # know the approximate depth of 5 from our event "3".
+        self.assertListEqual(backfill_event_ids, ["b3", "b2", "b1"])
+
     def test_get_backfill_points_in_room_excludes_events_we_have_attempted(
         self,
     ):
@@ -776,6 +788,7 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase):
         """
         setup_info = self._setup_room_for_backfill_tests()
         room_id = setup_info.room_id
+        depth_map = setup_info.depth_map
 
         # Record some attempts to backfill these events which will make
         # `get_backfill_points_in_room` exclude them because we
@@ -795,8 +808,9 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase):
 
         # No time has passed since we attempted to backfill ^
 
+        # Try at "B"
         backfill_points = self.get_success(
-            self.store.get_backfill_points_in_room(room_id)
+            self.store.get_backfill_points_in_room(room_id, depth_map["B"], limit=100)
         )
         backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
         # Only the backfill points that we didn't record earlier exist here.
@@ -812,6 +826,7 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase):
         """
         setup_info = self._setup_room_for_backfill_tests()
         room_id = setup_info.room_id
+        depth_map = setup_info.depth_map
 
         # Record some attempts to backfill these events which will make
         # `get_backfill_points_in_room` exclude them because we
@@ -839,26 +854,24 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase):
         # visible regardless.
         self.reactor.advance(datetime.timedelta(hours=2).total_seconds())
 
-        # Make sure that "b1" is not in the list because we've
+        # Try at "A" and make sure that "b1" is not in the list because we've
         # already attempted many times
         backfill_points = self.get_success(
-            self.store.get_backfill_points_in_room(room_id)
+            self.store.get_backfill_points_in_room(room_id, depth_map["A"], limit=100)
         )
         backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
-        self.assertListEqual(backfill_event_ids, ["b6", "b5", "b4", "2", "b3", "b2"])
+        self.assertListEqual(backfill_event_ids, ["b3", "b2"])
 
         # Now advance time by 20 hours (above 2^4 because we made 4 attemps) and
         # see if we can now backfill it
         self.reactor.advance(datetime.timedelta(hours=20).total_seconds())
 
-        # Try again after we advanced enough time and we should see "b3" again
+        # Try at "A" again after we advanced enough time and we should see "b3" again
         backfill_points = self.get_success(
-            self.store.get_backfill_points_in_room(room_id)
+            self.store.get_backfill_points_in_room(room_id, depth_map["A"], limit=100)
         )
         backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
-        self.assertListEqual(
-            backfill_event_ids, ["b6", "b5", "b4", "2", "b3", "b2", "b1"]
-        )
+        self.assertListEqual(backfill_event_ids, ["b3", "b2", "b1"])
 
     def _setup_room_for_insertion_backfill_tests(self) -> _BackfillSetupInfo:
         """
@@ -938,19 +951,35 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase):
 
     def test_get_insertion_event_backward_extremities_in_room(self):
         """
-        Test to make sure insertion event backward extremities are returned.
+        Test to make sure only insertion event backward extremities that are
+        older and come before the `current_depth` are returned.
         """
         setup_info = self._setup_room_for_insertion_backfill_tests()
         room_id = setup_info.room_id
+        depth_map = setup_info.depth_map
 
+        # Try at "insertion_eventB"
         backfill_points = self.get_success(
-            self.store.get_insertion_event_backward_extremities_in_room(room_id)
+            self.store.get_insertion_event_backward_extremities_in_room(
+                room_id, depth_map["insertion_eventB"], limit=100
+            )
         )
         backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
         self.assertListEqual(
             backfill_event_ids, ["insertion_eventB", "insertion_eventA"]
         )
 
+        # Try at "insertion_eventA"
+        backfill_points = self.get_success(
+            self.store.get_insertion_event_backward_extremities_in_room(
+                room_id, depth_map["insertion_eventA"], limit=100
+            )
+        )
+        backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
+        # Event "2" has a depth of 2 but is not included here because we only
+        # know the approximate depth of 5 from our event "3".
+        self.assertListEqual(backfill_event_ids, ["insertion_eventA"])
+
     def test_get_insertion_event_backward_extremities_in_room_excludes_events_we_have_attempted(
         self,
     ):
@@ -961,6 +990,7 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase):
         """
         setup_info = self._setup_room_for_insertion_backfill_tests()
         room_id = setup_info.room_id
+        depth_map = setup_info.depth_map
 
         # Record some attempts to backfill these events which will make
         # `get_insertion_event_backward_extremities_in_room` exclude them
@@ -973,8 +1003,11 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase):
 
         # No time has passed since we attempted to backfill ^
 
+        # Try at "insertion_eventB"
         backfill_points = self.get_success(
-            self.store.get_insertion_event_backward_extremities_in_room(room_id)
+            self.store.get_insertion_event_backward_extremities_in_room(
+                room_id, depth_map["insertion_eventB"], limit=100
+            )
         )
         backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
         # Only the backfill points that we didn't record earlier exist here.
@@ -991,6 +1024,7 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase):
         """
         setup_info = self._setup_room_for_insertion_backfill_tests()
         room_id = setup_info.room_id
+        depth_map = setup_info.depth_map
 
         # Record some attempts to backfill these events which will make
         # `get_backfill_points_in_room` exclude them because we
@@ -1027,13 +1061,15 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase):
         # because we haven't waited long enough for this many attempts.
         self.reactor.advance(datetime.timedelta(hours=2).total_seconds())
 
-        # Make sure that "insertion_eventA" is not in the list because we've
-        # already attempted many times
+        # Try at "insertion_eventA" and make sure that "insertion_eventA" is not
+        # in the list because we've already attempted many times
         backfill_points = self.get_success(
-            self.store.get_insertion_event_backward_extremities_in_room(room_id)
+            self.store.get_insertion_event_backward_extremities_in_room(
+                room_id, depth_map["insertion_eventA"], limit=100
+            )
         )
         backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
-        self.assertListEqual(backfill_event_ids, ["insertion_eventB"])
+        self.assertListEqual(backfill_event_ids, [])
 
         # Now advance time by 20 hours (above 2^4 because we made 4 attemps) and
         # see if we can now backfill it
@@ -1042,12 +1078,12 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase):
         # Try at "insertion_eventA" again after we advanced enough time and we
         # should see "insertion_eventA" again
         backfill_points = self.get_success(
-            self.store.get_insertion_event_backward_extremities_in_room(room_id)
+            self.store.get_insertion_event_backward_extremities_in_room(
+                room_id, depth_map["insertion_eventA"], limit=100
+            )
         )
         backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
-        self.assertListEqual(
-            backfill_event_ids, ["insertion_eventB", "insertion_eventA"]
-        )
+        self.assertListEqual(backfill_event_ids, ["insertion_eventA"])
 
 
 @attr.s