summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/13589.feature1
-rw-r--r--synapse/handlers/federation_event.py7
-rw-r--r--synapse/storage/databases/main/event_federation.py45
-rw-r--r--synapse/storage/databases/main/events.py32
-rw-r--r--synapse/storage/schema/__init__.py2
-rw-r--r--synapse/storage/schema/main/delta/73/01event_failed_pull_attempts.sql29
-rw-r--r--tests/handlers/test_federation_event.py222
7 files changed, 329 insertions, 9 deletions
diff --git a/changelog.d/13589.feature b/changelog.d/13589.feature
new file mode 100644
index 0000000000..78fa1ddb52
--- /dev/null
+++ b/changelog.d/13589.feature
@@ -0,0 +1 @@
+Keep track when we attempt to backfill an event but fail so we can intelligently back-off in the future.
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index ace7adcffb..9e065e1116 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -862,6 +862,9 @@ class FederationEventHandler:
             self._sanity_check_event(event)
         except SynapseError as err:
             logger.warning("Event %s failed sanity check: %s", event_id, err)
+            await self._store.record_event_failed_pull_attempt(
+                event.room_id, event_id, str(err)
+            )
             return
 
         try:
@@ -897,6 +900,10 @@ class FederationEventHandler:
                     backfilled=backfilled,
                 )
         except FederationError as e:
+            await self._store.record_event_failed_pull_attempt(
+                event.room_id, event_id, str(e)
+            )
+
             if e.code == 403:
                 logger.warning("Pulled event %s failed history check.", event_id)
             else:
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index ca47a22bf1..ef477978ed 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -1294,6 +1294,51 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
 
         return event_id_results
 
+    @trace
+    async def record_event_failed_pull_attempt(
+        self, room_id: str, event_id: str, cause: str
+    ) -> None:
+        """
+        Record when we fail to pull an event over federation.
+
+        This information allows us to be more intelligent when we decide to
+        retry (we don't need to fail over and over) and we can process that
+        event in the background so we don't block on it each time.
+
+        Args:
+            room_id: The room where the event failed to pull from
+            event_id: The event that failed to be fetched or processed
+            cause: The error message or reason that we failed to pull the event
+        """
+        await self.db_pool.runInteraction(
+            "record_event_failed_pull_attempt",
+            self._record_event_failed_pull_attempt_upsert_txn,
+            room_id,
+            event_id,
+            cause,
+            db_autocommit=True,  # Safe as it's a single upsert
+        )
+
+    def _record_event_failed_pull_attempt_upsert_txn(
+        self,
+        txn: LoggingTransaction,
+        room_id: str,
+        event_id: str,
+        cause: str,
+    ) -> None:
+        sql = """
+            INSERT INTO event_failed_pull_attempts (
+                room_id, event_id, num_attempts, last_attempt_ts, last_cause
+            )
+                VALUES (?, ?, ?, ?, ?)
+            ON CONFLICT (room_id, event_id) DO UPDATE SET
+                num_attempts=event_failed_pull_attempts.num_attempts + 1,
+                last_attempt_ts=EXCLUDED.last_attempt_ts,
+                last_cause=EXCLUDED.last_cause;
+        """
+
+        txn.execute(sql, (room_id, event_id, 1, self._clock.time_msec(), cause))
+
     async def get_missing_events(
         self,
         room_id: str,
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index c0b4080e4b..1b54a2eb57 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -2435,17 +2435,31 @@ class PersistEventsStore:
             "DELETE FROM event_backward_extremities"
             " WHERE event_id = ? AND room_id = ?"
         )
+        backward_extremity_tuples_to_remove = [
+            (ev.event_id, ev.room_id)
+            for ev in events
+            if not ev.internal_metadata.is_outlier()
+            # If we encountered an event with no prev_events, then we might
+            # as well remove it now because it won't ever have anything else
+            # to backfill from.
+            or len(ev.prev_event_ids()) == 0
+        ]
         txn.execute_batch(
             query,
-            [
-                (ev.event_id, ev.room_id)
-                for ev in events
-                if not ev.internal_metadata.is_outlier()
-                # If we encountered an event with no prev_events, then we might
-                # as well remove it now because it won't ever have anything else
-                # to backfill from.
-                or len(ev.prev_event_ids()) == 0
-            ],
+            backward_extremity_tuples_to_remove,
+        )
+
+        # Clear out the failed backfill attempts after we successfully pulled
+        # the event. Since we no longer need these events as backward
+        # extremities, it also means that they won't be backfilled from again so
+        # we no longer need to store the backfill attempts around it.
+        query = """
+            DELETE FROM event_failed_pull_attempts
+            WHERE event_id = ? and room_id = ?
+        """
+        txn.execute_batch(
+            query,
+            backward_extremity_tuples_to_remove,
         )
 
 
diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py
index 38c9532bfd..68e055c664 100644
--- a/synapse/storage/schema/__init__.py
+++ b/synapse/storage/schema/__init__.py
@@ -81,6 +81,8 @@ Changes in SCHEMA_VERSION = 72:
 Changes in SCHEMA_VERSION = 73;
     - thread_id column is added to event_push_actions, event_push_actions_staging
       event_push_summary, receipts_linearized, and receipts_graph.
+    - Add table `event_failed_pull_attempts` to keep track when we fail to pull
+      events over federation.
 """
 
 
diff --git a/synapse/storage/schema/main/delta/73/01event_failed_pull_attempts.sql b/synapse/storage/schema/main/delta/73/01event_failed_pull_attempts.sql
new file mode 100644
index 0000000000..d397ee1082
--- /dev/null
+++ b/synapse/storage/schema/main/delta/73/01event_failed_pull_attempts.sql
@@ -0,0 +1,29 @@
+/* Copyright 2022 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+-- Add a table that keeps track of when we failed to pull an event over
+-- federation (via /backfill, `/event`, `/get_missing_events`, etc). This allows
+-- us to be more intelligent when we decide to retry (we don't need to fail over
+-- and over) and we can process that event in the background so we don't block
+-- on it each time.
+CREATE TABLE IF NOT EXISTS event_failed_pull_attempts(
+    room_id TEXT NOT NULL REFERENCES rooms (room_id),
+    event_id TEXT NOT NULL,
+    num_attempts INT NOT NULL,
+    last_attempt_ts BIGINT NOT NULL,
+    last_cause TEXT NOT NULL,
+    PRIMARY KEY (room_id, event_id)
+);
diff --git a/tests/handlers/test_federation_event.py b/tests/handlers/test_federation_event.py
index 51c8dd6498..b5b89405a4 100644
--- a/tests/handlers/test_federation_event.py
+++ b/tests/handlers/test_federation_event.py
@@ -227,3 +227,225 @@ class FederationEventHandlerTests(unittest.FederatingHomeserverTestCase):
 
         if prev_exists_as_outlier:
             self.mock_federation_transport_client.get_event.assert_not_called()
+
+    def test_process_pulled_event_records_failed_backfill_attempts(
+        self,
+    ) -> None:
+        """
+        Test to make sure that failed backfill attempts for an event are
+        recorded in the `event_failed_pull_attempts` table.
+
+        In this test, we pretend we are processing a "pulled" event via
+        backfill. The pulled event has a fake `prev_event` which our server has
+        obviously never seen before so it attempts to request the state at that
+        `prev_event` which expectedly fails because it's a fake event. Because
+        the server can't fetch the state at the missing `prev_event`, the
+        "pulled" event fails the history check and is fails to process.
+
+        We check that we correctly record the number of failed pull attempts
+        of the pulled event and as a sanity check, that the "pulled" event isn't
+        persisted.
+        """
+        OTHER_USER = f"@user:{self.OTHER_SERVER_NAME}"
+        main_store = self.hs.get_datastores().main
+
+        # Create the room
+        user_id = self.register_user("kermit", "test")
+        tok = self.login("kermit", "test")
+        room_id = self.helper.create_room_as(room_creator=user_id, tok=tok)
+        room_version = self.get_success(main_store.get_room_version(room_id))
+
+        # We expect an outbound request to /state_ids, so stub that out
+        self.mock_federation_transport_client.get_room_state_ids.return_value = make_awaitable(
+            {
+                # Mimic the other server not knowing about the state at all.
+                # We want to cause Synapse to throw an error (`Unable to get
+                # missing prev_event $fake_prev_event`) and fail to backfill
+                # the pulled event.
+                "pdu_ids": [],
+                "auth_chain_ids": [],
+            }
+        )
+        # We also expect an outbound request to /state
+        self.mock_federation_transport_client.get_room_state.return_value = make_awaitable(
+            StateRequestResponse(
+                # Mimic the other server not knowing about the state at all.
+                # We want to cause Synapse to throw an error (`Unable to get
+                # missing prev_event $fake_prev_event`) and fail to backfill
+                # the pulled event.
+                auth_events=[],
+                state=[],
+            )
+        )
+
+        pulled_event = make_event_from_dict(
+            self.add_hashes_and_signatures_from_other_server(
+                {
+                    "type": "test_regular_type",
+                    "room_id": room_id,
+                    "sender": OTHER_USER,
+                    "prev_events": [
+                        # The fake prev event will make the pulled event fail
+                        # the history check (`Unable to get missing prev_event
+                        # $fake_prev_event`)
+                        "$fake_prev_event"
+                    ],
+                    "auth_events": [],
+                    "origin_server_ts": 1,
+                    "depth": 12,
+                    "content": {"body": "pulled"},
+                }
+            ),
+            room_version,
+        )
+
+        # The function under test: try to process the pulled event
+        with LoggingContext("test"):
+            self.get_success(
+                self.hs.get_federation_event_handler()._process_pulled_event(
+                    self.OTHER_SERVER_NAME, pulled_event, backfilled=True
+                )
+            )
+
+        # Make sure our failed pull attempt was recorded
+        backfill_num_attempts = self.get_success(
+            main_store.db_pool.simple_select_one_onecol(
+                table="event_failed_pull_attempts",
+                keyvalues={"event_id": pulled_event.event_id},
+                retcol="num_attempts",
+            )
+        )
+        self.assertEqual(backfill_num_attempts, 1)
+
+        # The function under test: try to process the pulled event again
+        with LoggingContext("test"):
+            self.get_success(
+                self.hs.get_federation_event_handler()._process_pulled_event(
+                    self.OTHER_SERVER_NAME, pulled_event, backfilled=True
+                )
+            )
+
+        # Make sure our second failed pull attempt was recorded (`num_attempts` was incremented)
+        backfill_num_attempts = self.get_success(
+            main_store.db_pool.simple_select_one_onecol(
+                table="event_failed_pull_attempts",
+                keyvalues={"event_id": pulled_event.event_id},
+                retcol="num_attempts",
+            )
+        )
+        self.assertEqual(backfill_num_attempts, 2)
+
+        # And as a sanity check, make sure the event was not persisted through all of this.
+        persisted = self.get_success(
+            main_store.get_event(pulled_event.event_id, allow_none=True)
+        )
+        self.assertIsNone(
+            persisted,
+            "pulled event that fails the history check should not be persisted at all",
+        )
+
+    def test_process_pulled_event_clears_backfill_attempts_after_being_successfully_persisted(
+        self,
+    ) -> None:
+        """
+        Test to make sure that failed pull attempts
+        (`event_failed_pull_attempts` table) for an event are cleared after the
+        event is successfully persisted.
+
+        In this test, we pretend we are processing a "pulled" event via
+        backfill. The pulled event succesfully processes and the backward
+        extremeties are updated along with clearing out any failed pull attempts
+        for those old extremities.
+
+        We check that we correctly cleared failed pull attempts of the
+        pulled event.
+        """
+        OTHER_USER = f"@user:{self.OTHER_SERVER_NAME}"
+        main_store = self.hs.get_datastores().main
+
+        # Create the room
+        user_id = self.register_user("kermit", "test")
+        tok = self.login("kermit", "test")
+        room_id = self.helper.create_room_as(room_creator=user_id, tok=tok)
+        room_version = self.get_success(main_store.get_room_version(room_id))
+
+        # allow the remote user to send state events
+        self.helper.send_state(
+            room_id,
+            "m.room.power_levels",
+            {"events_default": 0, "state_default": 0},
+            tok=tok,
+        )
+
+        # add the remote user to the room
+        member_event = self.get_success(
+            event_injection.inject_member_event(self.hs, room_id, OTHER_USER, "join")
+        )
+
+        initial_state_map = self.get_success(
+            main_store.get_partial_current_state_ids(room_id)
+        )
+
+        auth_event_ids = [
+            initial_state_map[("m.room.create", "")],
+            initial_state_map[("m.room.power_levels", "")],
+            member_event.event_id,
+        ]
+
+        pulled_event = make_event_from_dict(
+            self.add_hashes_and_signatures_from_other_server(
+                {
+                    "type": "test_regular_type",
+                    "room_id": room_id,
+                    "sender": OTHER_USER,
+                    "prev_events": [member_event.event_id],
+                    "auth_events": auth_event_ids,
+                    "origin_server_ts": 1,
+                    "depth": 12,
+                    "content": {"body": "pulled"},
+                }
+            ),
+            room_version,
+        )
+
+        # Fake the "pulled" event failing to backfill once so we can test
+        # if it's cleared out later on.
+        self.get_success(
+            main_store.record_event_failed_pull_attempt(
+                pulled_event.room_id, pulled_event.event_id, "fake cause"
+            )
+        )
+        # Make sure we have a failed pull attempt recorded for the pulled event
+        backfill_num_attempts = self.get_success(
+            main_store.db_pool.simple_select_one_onecol(
+                table="event_failed_pull_attempts",
+                keyvalues={"event_id": pulled_event.event_id},
+                retcol="num_attempts",
+            )
+        )
+        self.assertEqual(backfill_num_attempts, 1)
+
+        # The function under test: try to process the pulled event
+        with LoggingContext("test"):
+            self.get_success(
+                self.hs.get_federation_event_handler()._process_pulled_event(
+                    self.OTHER_SERVER_NAME, pulled_event, backfilled=True
+                )
+            )
+
+        # Make sure the failed pull attempts for the pulled event are cleared
+        backfill_num_attempts = self.get_success(
+            main_store.db_pool.simple_select_one_onecol(
+                table="event_failed_pull_attempts",
+                keyvalues={"event_id": pulled_event.event_id},
+                retcol="num_attempts",
+                allow_none=True,
+            )
+        )
+        self.assertIsNone(backfill_num_attempts)
+
+        # And as a sanity check, make sure the "pulled" event was persisted.
+        persisted = self.get_success(
+            main_store.get_event(pulled_event.event_id, allow_none=True)
+        )
+        self.assertIsNotNone(persisted, "pulled event was not persisted at all")