summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/13816.feature1
-rw-r--r--synapse/api/errors.py21
-rw-r--r--synapse/handlers/federation.py16
-rw-r--r--synapse/handlers/federation_event.py31
-rw-r--r--synapse/storage/databases/main/event_federation.py54
-rw-r--r--tests/handlers/test_federation_event.py201
-rw-r--r--tests/storage/test_event_federation.py64
7 files changed, 386 insertions, 2 deletions
diff --git a/changelog.d/13816.feature b/changelog.d/13816.feature
new file mode 100644
index 0000000000..5eaa936b08
--- /dev/null
+++ b/changelog.d/13816.feature
@@ -0,0 +1 @@
+Stop fetching missing `prev_events` after we already know their signature is invalid.
diff --git a/synapse/api/errors.py b/synapse/api/errors.py
index c606207569..e0873b1913 100644
--- a/synapse/api/errors.py
+++ b/synapse/api/errors.py
@@ -640,6 +640,27 @@ class FederationError(RuntimeError):
         }
 
 
+class FederationPullAttemptBackoffError(RuntimeError):
+    """
+    Raised to indicate that we are are deliberately not attempting to pull the given
+    event over federation because we've already done so recently and are backing off.
+
+    Attributes:
+        event_id: The event_id which we are refusing to pull
+        message: A custom error message that gives more context
+    """
+
+    def __init__(self, event_ids: List[str], message: Optional[str]):
+        self.event_ids = event_ids
+
+        if message:
+            error_message = message
+        else:
+            error_message = f"Not attempting to pull event_ids={self.event_ids} because we already tried to pull them recently (backing off)."
+
+        super().__init__(error_message)
+
+
 class HttpResponseException(CodeMessageException):
     """
     Represents an HTTP-level failure of an outbound request
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 44e70c6c3c..5f7e0a1f79 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -45,6 +45,7 @@ from synapse.api.errors import (
     Codes,
     FederationDeniedError,
     FederationError,
+    FederationPullAttemptBackoffError,
     HttpResponseException,
     LimitExceededError,
     NotFoundError,
@@ -1720,7 +1721,22 @@ class FederationHandler:
                             destination, event
                         )
                         break
+                    except FederationPullAttemptBackoffError as exc:
+                        # Log a warning about why we failed to process the event (the error message
+                        # for `FederationPullAttemptBackoffError` is pretty good)
+                        logger.warning("_sync_partial_state_room: %s", exc)
+                        # We do not record a failed pull attempt when we backoff fetching a missing
+                        # `prev_event` because not being able to fetch the `prev_events` just means
+                        # we won't be able to de-outlier the pulled event. But we can still use an
+                        # `outlier` in the state/auth chain for another event. So we shouldn't stop
+                        # a downstream event from trying to pull it.
+                        #
+                        # This avoids a cascade of backoff for all events in the DAG downstream from
+                        # one event backoff upstream.
                     except FederationError as e:
+                        # TODO: We should `record_event_failed_pull_attempt` here,
+                        #   see https://github.com/matrix-org/synapse/issues/13700
+
                         if attempt == len(destinations) - 1:
                             # We have tried every remote server for this event. Give up.
                             # TODO(faster_joins) giving up isn't the right thing to do
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index f382961099..4300e8dd40 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -44,6 +44,7 @@ from synapse.api.errors import (
     AuthError,
     Codes,
     FederationError,
+    FederationPullAttemptBackoffError,
     HttpResponseException,
     RequestSendFailed,
     SynapseError,
@@ -567,6 +568,9 @@ class FederationEventHandler:
             event: partial-state event to be de-partial-stated
 
         Raises:
+            FederationPullAttemptBackoffError if we are are deliberately not attempting
+                to pull the given event over federation because we've already done so
+                recently and are backing off.
             FederationError if we fail to request state from the remote server.
         """
         logger.info("Updating state for %s", event.event_id)
@@ -901,6 +905,18 @@ class FederationEventHandler:
                     context,
                     backfilled=backfilled,
                 )
+        except FederationPullAttemptBackoffError as exc:
+            # Log a warning about why we failed to process the event (the error message
+            # for `FederationPullAttemptBackoffError` is pretty good)
+            logger.warning("_process_pulled_event: %s", exc)
+            # We do not record a failed pull attempt when we backoff fetching a missing
+            # `prev_event` because not being able to fetch the `prev_events` just means
+            # we won't be able to de-outlier the pulled event. But we can still use an
+            # `outlier` in the state/auth chain for another event. So we shouldn't stop
+            # a downstream event from trying to pull it.
+            #
+            # This avoids a cascade of backoff for all events in the DAG downstream from
+            # one event backoff upstream.
         except FederationError as e:
             await self._store.record_event_failed_pull_attempt(
                 event.room_id, event_id, str(e)
@@ -947,6 +963,9 @@ class FederationEventHandler:
             The event context.
 
         Raises:
+            FederationPullAttemptBackoffError if we are are deliberately not attempting
+                to pull the given event over federation because we've already done so
+                recently and are backing off.
             FederationError if we fail to get the state from the remote server after any
                 missing `prev_event`s.
         """
@@ -957,6 +976,18 @@ class FederationEventHandler:
         seen = await self._store.have_events_in_timeline(prevs)
         missing_prevs = prevs - seen
 
+        # If we've already recently attempted to pull this missing event, don't
+        # try it again so soon. Since we have to fetch all of the prev_events, we can
+        # bail early here if we find any to ignore.
+        prevs_to_ignore = await self._store.get_event_ids_to_not_pull_from_backoff(
+            room_id, missing_prevs
+        )
+        if len(prevs_to_ignore) > 0:
+            raise FederationPullAttemptBackoffError(
+                event_ids=prevs_to_ignore,
+                message=f"While computing context for event={event_id}, not attempting to pull missing prev_event={prevs_to_ignore[0]} because we already tried to pull recently (backing off).",
+            )
+
         if not missing_prevs:
             return await self._state_handler.compute_event_context(event)
 
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index 6b9a629edd..309a4ba664 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -1501,6 +1501,12 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
             event_id: The event that failed to be fetched or processed
             cause: The error message or reason that we failed to pull the event
         """
+        logger.debug(
+            "record_event_failed_pull_attempt room_id=%s, event_id=%s, cause=%s",
+            room_id,
+            event_id,
+            cause,
+        )
         await self.db_pool.runInteraction(
             "record_event_failed_pull_attempt",
             self._record_event_failed_pull_attempt_upsert_txn,
@@ -1530,6 +1536,54 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
 
         txn.execute(sql, (room_id, event_id, 1, self._clock.time_msec(), cause))
 
+    @trace
+    async def get_event_ids_to_not_pull_from_backoff(
+        self,
+        room_id: str,
+        event_ids: Collection[str],
+    ) -> List[str]:
+        """
+        Filter down the events to ones that we've failed to pull before recently. Uses
+        exponential backoff.
+
+        Args:
+            room_id: The room that the events belong to
+            event_ids: A list of events to filter down
+
+        Returns:
+            List of event_ids that should not be attempted to be pulled
+        """
+        event_failed_pull_attempts = await self.db_pool.simple_select_many_batch(
+            table="event_failed_pull_attempts",
+            column="event_id",
+            iterable=event_ids,
+            keyvalues={},
+            retcols=(
+                "event_id",
+                "last_attempt_ts",
+                "num_attempts",
+            ),
+            desc="get_event_ids_to_not_pull_from_backoff",
+        )
+
+        current_time = self._clock.time_msec()
+        return [
+            event_failed_pull_attempt["event_id"]
+            for event_failed_pull_attempt in event_failed_pull_attempts
+            # Exponential back-off (up to the upper bound) so we don't try to
+            # pull the same event over and over. ex. 2hr, 4hr, 8hr, 16hr, etc.
+            if current_time
+            < event_failed_pull_attempt["last_attempt_ts"]
+            + (
+                2
+                ** min(
+                    event_failed_pull_attempt["num_attempts"],
+                    BACKFILL_EVENT_EXPONENTIAL_BACKOFF_MAXIMUM_DOUBLING_STEPS,
+                )
+            )
+            * BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_MILLISECONDS
+        ]
+
     async def get_missing_events(
         self,
         room_id: str,
diff --git a/tests/handlers/test_federation_event.py b/tests/handlers/test_federation_event.py
index 918010cddb..e448cb1901 100644
--- a/tests/handlers/test_federation_event.py
+++ b/tests/handlers/test_federation_event.py
@@ -14,7 +14,7 @@
 from typing import Optional
 from unittest import mock
 
-from synapse.api.errors import AuthError
+from synapse.api.errors import AuthError, StoreError
 from synapse.api.room_versions import RoomVersion
 from synapse.event_auth import (
     check_state_dependent_auth_rules,
@@ -43,7 +43,7 @@ class FederationEventHandlerTests(unittest.FederatingHomeserverTestCase):
     def make_homeserver(self, reactor, clock):
         # mock out the federation transport client
         self.mock_federation_transport_client = mock.Mock(
-            spec=["get_room_state_ids", "get_room_state", "get_event"]
+            spec=["get_room_state_ids", "get_room_state", "get_event", "backfill"]
         )
         return super().setup_test_homeserver(
             federation_transport_client=self.mock_federation_transport_client
@@ -459,6 +459,203 @@ class FederationEventHandlerTests(unittest.FederatingHomeserverTestCase):
         )
         self.assertIsNotNone(persisted, "pulled event was not persisted at all")
 
+    def test_backfill_signature_failure_does_not_fetch_same_prev_event_later(
+        self,
+    ) -> None:
+        """
+        Test to make sure we backoff and don't try to fetch a missing prev_event when we
+        already know it has a invalid signature from checking the signatures of all of
+        the events in the backfill response.
+        """
+        OTHER_USER = f"@user:{self.OTHER_SERVER_NAME}"
+        main_store = self.hs.get_datastores().main
+
+        # Create the room
+        user_id = self.register_user("kermit", "test")
+        tok = self.login("kermit", "test")
+        room_id = self.helper.create_room_as(room_creator=user_id, tok=tok)
+        room_version = self.get_success(main_store.get_room_version(room_id))
+
+        # Allow the remote user to send state events
+        self.helper.send_state(
+            room_id,
+            "m.room.power_levels",
+            {"events_default": 0, "state_default": 0},
+            tok=tok,
+        )
+
+        # Add the remote user to the room
+        member_event = self.get_success(
+            event_injection.inject_member_event(self.hs, room_id, OTHER_USER, "join")
+        )
+
+        initial_state_map = self.get_success(
+            main_store.get_partial_current_state_ids(room_id)
+        )
+
+        auth_event_ids = [
+            initial_state_map[("m.room.create", "")],
+            initial_state_map[("m.room.power_levels", "")],
+            member_event.event_id,
+        ]
+
+        # We purposely don't run `add_hashes_and_signatures_from_other_server`
+        # over this because we want the signature check to fail.
+        pulled_event_without_signatures = make_event_from_dict(
+            {
+                "type": "test_regular_type",
+                "room_id": room_id,
+                "sender": OTHER_USER,
+                "prev_events": [member_event.event_id],
+                "auth_events": auth_event_ids,
+                "origin_server_ts": 1,
+                "depth": 12,
+                "content": {"body": "pulled_event_without_signatures"},
+            },
+            room_version,
+        )
+
+        # Create a regular event that should pass except for the
+        # `pulled_event_without_signatures` in the `prev_event`.
+        pulled_event = make_event_from_dict(
+            self.add_hashes_and_signatures_from_other_server(
+                {
+                    "type": "test_regular_type",
+                    "room_id": room_id,
+                    "sender": OTHER_USER,
+                    "prev_events": [
+                        member_event.event_id,
+                        pulled_event_without_signatures.event_id,
+                    ],
+                    "auth_events": auth_event_ids,
+                    "origin_server_ts": 1,
+                    "depth": 12,
+                    "content": {"body": "pulled_event"},
+                }
+            ),
+            room_version,
+        )
+
+        # We expect an outbound request to /backfill, so stub that out
+        self.mock_federation_transport_client.backfill.return_value = make_awaitable(
+            {
+                "origin": self.OTHER_SERVER_NAME,
+                "origin_server_ts": 123,
+                "pdus": [
+                    # This is one of the important aspects of this test: we include
+                    # `pulled_event_without_signatures` so it fails the signature check
+                    # when we filter down the backfill response down to events which
+                    # have valid signatures in
+                    # `_check_sigs_and_hash_for_pulled_events_and_fetch`
+                    pulled_event_without_signatures.get_pdu_json(),
+                    # Then later when we process this valid signature event, when we
+                    # fetch the missing `prev_event`s, we want to make sure that we
+                    # backoff and don't try and fetch `pulled_event_without_signatures`
+                    # again since we know it just had an invalid signature.
+                    pulled_event.get_pdu_json(),
+                ],
+            }
+        )
+
+        # Keep track of the count and make sure we don't make any of these requests
+        event_endpoint_requested_count = 0
+        room_state_ids_endpoint_requested_count = 0
+        room_state_endpoint_requested_count = 0
+
+        async def get_event(
+            destination: str, event_id: str, timeout: Optional[int] = None
+        ) -> None:
+            nonlocal event_endpoint_requested_count
+            event_endpoint_requested_count += 1
+
+        async def get_room_state_ids(
+            destination: str, room_id: str, event_id: str
+        ) -> None:
+            nonlocal room_state_ids_endpoint_requested_count
+            room_state_ids_endpoint_requested_count += 1
+
+        async def get_room_state(
+            room_version: RoomVersion, destination: str, room_id: str, event_id: str
+        ) -> None:
+            nonlocal room_state_endpoint_requested_count
+            room_state_endpoint_requested_count += 1
+
+        # We don't expect an outbound request to `/event`, `/state_ids`, or `/state` in
+        # the happy path but if the logic is sneaking around what we expect, stub that
+        # out so we can detect that failure
+        self.mock_federation_transport_client.get_event.side_effect = get_event
+        self.mock_federation_transport_client.get_room_state_ids.side_effect = (
+            get_room_state_ids
+        )
+        self.mock_federation_transport_client.get_room_state.side_effect = (
+            get_room_state
+        )
+
+        # The function under test: try to backfill and process the pulled event
+        with LoggingContext("test"):
+            self.get_success(
+                self.hs.get_federation_event_handler().backfill(
+                    self.OTHER_SERVER_NAME,
+                    room_id,
+                    limit=1,
+                    extremities=["$some_extremity"],
+                )
+            )
+
+        if event_endpoint_requested_count > 0:
+            self.fail(
+                "We don't expect an outbound request to /event in the happy path but if "
+                "the logic is sneaking around what we expect, make sure to fail the test. "
+                "We don't expect it because the signature failure should cause us to backoff "
+                "and not asking about pulled_event_without_signatures="
+                f"{pulled_event_without_signatures.event_id} again"
+            )
+
+        if room_state_ids_endpoint_requested_count > 0:
+            self.fail(
+                "We don't expect an outbound request to /state_ids in the happy path but if "
+                "the logic is sneaking around what we expect, make sure to fail the test. "
+                "We don't expect it because the signature failure should cause us to backoff "
+                "and not asking about pulled_event_without_signatures="
+                f"{pulled_event_without_signatures.event_id} again"
+            )
+
+        if room_state_endpoint_requested_count > 0:
+            self.fail(
+                "We don't expect an outbound request to /state in the happy path but if "
+                "the logic is sneaking around what we expect, make sure to fail the test. "
+                "We don't expect it because the signature failure should cause us to backoff "
+                "and not asking about pulled_event_without_signatures="
+                f"{pulled_event_without_signatures.event_id} again"
+            )
+
+        # Make sure we only recorded a single failure which corresponds to the signature
+        # failure initially in `_check_sigs_and_hash_for_pulled_events_and_fetch` before
+        # we process all of the pulled events.
+        backfill_num_attempts_for_event_without_signatures = self.get_success(
+            main_store.db_pool.simple_select_one_onecol(
+                table="event_failed_pull_attempts",
+                keyvalues={"event_id": pulled_event_without_signatures.event_id},
+                retcol="num_attempts",
+            )
+        )
+        self.assertEqual(backfill_num_attempts_for_event_without_signatures, 1)
+
+        # And make sure we didn't record a failure for the event that has the missing
+        # prev_event because we don't want to cause a cascade of failures. Not being
+        # able to fetch the `prev_events` just means we won't be able to de-outlier the
+        # pulled event. But we can still use an `outlier` in the state/auth chain for
+        # another event. So we shouldn't stop a downstream event from trying to pull it.
+        self.get_failure(
+            main_store.db_pool.simple_select_one_onecol(
+                table="event_failed_pull_attempts",
+                keyvalues={"event_id": pulled_event.event_id},
+                retcol="num_attempts",
+            ),
+            # StoreError: 404: No row found
+            StoreError,
+        )
+
     def test_process_pulled_event_with_rejected_missing_state(self) -> None:
         """Ensure that we correctly handle pulled events with missing state containing a
         rejected state event
diff --git a/tests/storage/test_event_federation.py b/tests/storage/test_event_federation.py
index 59b8910907..853db930d6 100644
--- a/tests/storage/test_event_federation.py
+++ b/tests/storage/test_event_federation.py
@@ -27,6 +27,8 @@ from synapse.api.room_versions import (
     RoomVersion,
 )
 from synapse.events import _EventInternalMetadata
+from synapse.rest import admin
+from synapse.rest.client import login, room
 from synapse.server import HomeServer
 from synapse.storage.database import LoggingTransaction
 from synapse.types import JsonDict
@@ -43,6 +45,12 @@ class _BackfillSetupInfo:
 
 
 class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase):
+    servlets = [
+        admin.register_servlets,
+        room.register_servlets,
+        login.register_servlets,
+    ]
+
     def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
         self.store = hs.get_datastores().main
 
@@ -1122,6 +1130,62 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase):
         backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
         self.assertEqual(backfill_event_ids, ["insertion_eventA"])
 
+    def test_get_event_ids_to_not_pull_from_backoff(
+        self,
+    ):
+        """
+        Test to make sure only event IDs we should backoff from are returned.
+        """
+        # Create the room
+        user_id = self.register_user("alice", "test")
+        tok = self.login("alice", "test")
+        room_id = self.helper.create_room_as(room_creator=user_id, tok=tok)
+
+        self.get_success(
+            self.store.record_event_failed_pull_attempt(
+                room_id, "$failed_event_id", "fake cause"
+            )
+        )
+
+        event_ids_to_backoff = self.get_success(
+            self.store.get_event_ids_to_not_pull_from_backoff(
+                room_id=room_id, event_ids=["$failed_event_id", "$normal_event_id"]
+            )
+        )
+
+        self.assertEqual(event_ids_to_backoff, ["$failed_event_id"])
+
+    def test_get_event_ids_to_not_pull_from_backoff_retry_after_backoff_duration(
+        self,
+    ):
+        """
+        Test to make sure no event IDs are returned after the backoff duration has
+        elapsed.
+        """
+        # Create the room
+        user_id = self.register_user("alice", "test")
+        tok = self.login("alice", "test")
+        room_id = self.helper.create_room_as(room_creator=user_id, tok=tok)
+
+        self.get_success(
+            self.store.record_event_failed_pull_attempt(
+                room_id, "$failed_event_id", "fake cause"
+            )
+        )
+
+        # Now advance time by 2 hours so we wait long enough for the single failed
+        # attempt (2^1 hours).
+        self.reactor.advance(datetime.timedelta(hours=2).total_seconds())
+
+        event_ids_to_backoff = self.get_success(
+            self.store.get_event_ids_to_not_pull_from_backoff(
+                room_id=room_id, event_ids=["$failed_event_id", "$normal_event_id"]
+            )
+        )
+        # Since this function only returns events we should backoff from, time has
+        # elapsed past the backoff range so there is no events to backoff from.
+        self.assertEqual(event_ids_to_backoff, [])
+
 
 @attr.s
 class FakeEvent: