diff --git a/tests/handlers/test_federation_event.py b/tests/handlers/test_federation_event.py
index 51c8dd6498..b5b89405a4 100644
--- a/tests/handlers/test_federation_event.py
+++ b/tests/handlers/test_federation_event.py
@@ -227,3 +227,225 @@ class FederationEventHandlerTests(unittest.FederatingHomeserverTestCase):
if prev_exists_as_outlier:
self.mock_federation_transport_client.get_event.assert_not_called()
+
+ def test_process_pulled_event_records_failed_backfill_attempts(
+ self,
+ ) -> None:
+ """
+ Test to make sure that failed backfill attempts for an event are
+ recorded in the `event_failed_pull_attempts` table.
+
+ In this test, we pretend we are processing a "pulled" event via
+ backfill. The pulled event has a fake `prev_event` which our server has
+ obviously never seen before so it attempts to request the state at that
+ `prev_event` which expectedly fails because it's a fake event. Because
+ the server can't fetch the state at the missing `prev_event`, the
+ "pulled" event fails the history check and is fails to process.
+
+ We check that we correctly record the number of failed pull attempts
+ of the pulled event and as a sanity check, that the "pulled" event isn't
+ persisted.
+ """
+ OTHER_USER = f"@user:{self.OTHER_SERVER_NAME}"
+ main_store = self.hs.get_datastores().main
+
+ # Create the room
+ user_id = self.register_user("kermit", "test")
+ tok = self.login("kermit", "test")
+ room_id = self.helper.create_room_as(room_creator=user_id, tok=tok)
+ room_version = self.get_success(main_store.get_room_version(room_id))
+
+ # We expect an outbound request to /state_ids, so stub that out
+ self.mock_federation_transport_client.get_room_state_ids.return_value = make_awaitable(
+ {
+ # Mimic the other server not knowing about the state at all.
+ # We want to cause Synapse to throw an error (`Unable to get
+ # missing prev_event $fake_prev_event`) and fail to backfill
+ # the pulled event.
+ "pdu_ids": [],
+ "auth_chain_ids": [],
+ }
+ )
+ # We also expect an outbound request to /state
+ self.mock_federation_transport_client.get_room_state.return_value = make_awaitable(
+ StateRequestResponse(
+ # Mimic the other server not knowing about the state at all.
+ # We want to cause Synapse to throw an error (`Unable to get
+ # missing prev_event $fake_prev_event`) and fail to backfill
+ # the pulled event.
+ auth_events=[],
+ state=[],
+ )
+ )
+
+ pulled_event = make_event_from_dict(
+ self.add_hashes_and_signatures_from_other_server(
+ {
+ "type": "test_regular_type",
+ "room_id": room_id,
+ "sender": OTHER_USER,
+ "prev_events": [
+ # The fake prev event will make the pulled event fail
+ # the history check (`Unable to get missing prev_event
+ # $fake_prev_event`)
+ "$fake_prev_event"
+ ],
+ "auth_events": [],
+ "origin_server_ts": 1,
+ "depth": 12,
+ "content": {"body": "pulled"},
+ }
+ ),
+ room_version,
+ )
+
+ # The function under test: try to process the pulled event
+ with LoggingContext("test"):
+ self.get_success(
+ self.hs.get_federation_event_handler()._process_pulled_event(
+ self.OTHER_SERVER_NAME, pulled_event, backfilled=True
+ )
+ )
+
+ # Make sure our failed pull attempt was recorded
+ backfill_num_attempts = self.get_success(
+ main_store.db_pool.simple_select_one_onecol(
+ table="event_failed_pull_attempts",
+ keyvalues={"event_id": pulled_event.event_id},
+ retcol="num_attempts",
+ )
+ )
+ self.assertEqual(backfill_num_attempts, 1)
+
+ # The function under test: try to process the pulled event again
+ with LoggingContext("test"):
+ self.get_success(
+ self.hs.get_federation_event_handler()._process_pulled_event(
+ self.OTHER_SERVER_NAME, pulled_event, backfilled=True
+ )
+ )
+
+ # Make sure our second failed pull attempt was recorded (`num_attempts` was incremented)
+ backfill_num_attempts = self.get_success(
+ main_store.db_pool.simple_select_one_onecol(
+ table="event_failed_pull_attempts",
+ keyvalues={"event_id": pulled_event.event_id},
+ retcol="num_attempts",
+ )
+ )
+ self.assertEqual(backfill_num_attempts, 2)
+
+ # And as a sanity check, make sure the event was not persisted through all of this.
+ persisted = self.get_success(
+ main_store.get_event(pulled_event.event_id, allow_none=True)
+ )
+ self.assertIsNone(
+ persisted,
+ "pulled event that fails the history check should not be persisted at all",
+ )
+
+ def test_process_pulled_event_clears_backfill_attempts_after_being_successfully_persisted(
+ self,
+ ) -> None:
+ """
+ Test to make sure that failed pull attempts
+ (`event_failed_pull_attempts` table) for an event are cleared after the
+ event is successfully persisted.
+
+ In this test, we pretend we are processing a "pulled" event via
+ backfill. The pulled event succesfully processes and the backward
+ extremeties are updated along with clearing out any failed pull attempts
+ for those old extremities.
+
+ We check that we correctly cleared failed pull attempts of the
+ pulled event.
+ """
+ OTHER_USER = f"@user:{self.OTHER_SERVER_NAME}"
+ main_store = self.hs.get_datastores().main
+
+ # Create the room
+ user_id = self.register_user("kermit", "test")
+ tok = self.login("kermit", "test")
+ room_id = self.helper.create_room_as(room_creator=user_id, tok=tok)
+ room_version = self.get_success(main_store.get_room_version(room_id))
+
+ # allow the remote user to send state events
+ self.helper.send_state(
+ room_id,
+ "m.room.power_levels",
+ {"events_default": 0, "state_default": 0},
+ tok=tok,
+ )
+
+ # add the remote user to the room
+ member_event = self.get_success(
+ event_injection.inject_member_event(self.hs, room_id, OTHER_USER, "join")
+ )
+
+ initial_state_map = self.get_success(
+ main_store.get_partial_current_state_ids(room_id)
+ )
+
+ auth_event_ids = [
+ initial_state_map[("m.room.create", "")],
+ initial_state_map[("m.room.power_levels", "")],
+ member_event.event_id,
+ ]
+
+ pulled_event = make_event_from_dict(
+ self.add_hashes_and_signatures_from_other_server(
+ {
+ "type": "test_regular_type",
+ "room_id": room_id,
+ "sender": OTHER_USER,
+ "prev_events": [member_event.event_id],
+ "auth_events": auth_event_ids,
+ "origin_server_ts": 1,
+ "depth": 12,
+ "content": {"body": "pulled"},
+ }
+ ),
+ room_version,
+ )
+
+ # Fake the "pulled" event failing to backfill once so we can test
+ # if it's cleared out later on.
+ self.get_success(
+ main_store.record_event_failed_pull_attempt(
+ pulled_event.room_id, pulled_event.event_id, "fake cause"
+ )
+ )
+ # Make sure we have a failed pull attempt recorded for the pulled event
+ backfill_num_attempts = self.get_success(
+ main_store.db_pool.simple_select_one_onecol(
+ table="event_failed_pull_attempts",
+ keyvalues={"event_id": pulled_event.event_id},
+ retcol="num_attempts",
+ )
+ )
+ self.assertEqual(backfill_num_attempts, 1)
+
+ # The function under test: try to process the pulled event
+ with LoggingContext("test"):
+ self.get_success(
+ self.hs.get_federation_event_handler()._process_pulled_event(
+ self.OTHER_SERVER_NAME, pulled_event, backfilled=True
+ )
+ )
+
+ # Make sure the failed pull attempts for the pulled event are cleared
+ backfill_num_attempts = self.get_success(
+ main_store.db_pool.simple_select_one_onecol(
+ table="event_failed_pull_attempts",
+ keyvalues={"event_id": pulled_event.event_id},
+ retcol="num_attempts",
+ allow_none=True,
+ )
+ )
+ self.assertIsNone(backfill_num_attempts)
+
+ # And as a sanity check, make sure the "pulled" event was persisted.
+ persisted = self.get_success(
+ main_store.get_event(pulled_event.event_id, allow_none=True)
+ )
+ self.assertIsNotNone(persisted, "pulled event was not persisted at all")
|