summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/api/errors.py17
-rw-r--r--synapse/handlers/federation.py36
-rw-r--r--synapse/handlers/federation_event.py24
-rw-r--r--synapse/storage/databases/main/event_federation.py35
4 files changed, 69 insertions, 43 deletions
diff --git a/synapse/api/errors.py b/synapse/api/errors.py
index 8c6822f3c6..f2d6f9ab2d 100644
--- a/synapse/api/errors.py
+++ b/synapse/api/errors.py
@@ -27,7 +27,7 @@ from synapse.util import json_decoder
 
 if typing.TYPE_CHECKING:
     from synapse.config.homeserver import HomeServerConfig
-    from synapse.types import JsonDict
+    from synapse.types import JsonDict, StrCollection
 
 logger = logging.getLogger(__name__)
 
@@ -682,18 +682,27 @@ class FederationPullAttemptBackoffError(RuntimeError):
     Attributes:
         event_id: The event_id which we are refusing to pull
         message: A custom error message that gives more context
+        retry_after_ms: The remaining backoff interval, in milliseconds
     """
 
-    def __init__(self, event_ids: List[str], message: Optional[str]):
-        self.event_ids = event_ids
+    def __init__(
+        self, event_ids: "StrCollection", message: Optional[str], retry_after_ms: int
+    ):
+        event_ids = list(event_ids)
 
         if message:
             error_message = message
         else:
-            error_message = f"Not attempting to pull event_ids={self.event_ids} because we already tried to pull them recently (backing off)."
+            error_message = (
+                f"Not attempting to pull event_ids={event_ids} because we already "
+                "tried to pull them recently (backing off)."
+            )
 
         super().__init__(error_message)
 
+        self.event_ids = event_ids
+        self.retry_after_ms = retry_after_ms
+
 
 class HttpResponseException(CodeMessageException):
     """
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 80156ef343..65461a0787 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -1949,27 +1949,25 @@ class FederationHandler:
             )
             for event in events:
                 for attempt in itertools.count():
+                    # We try a new destination on every iteration.
                     try:
-                        await self._federation_event_handler.update_state_for_partial_state_event(
-                            destination, event
-                        )
+                        while True:
+                            try:
+                                await self._federation_event_handler.update_state_for_partial_state_event(
+                                    destination, event
+                                )
+                                break
+                            except FederationPullAttemptBackoffError as e:
+                                # We are in the backoff period for one of the event's
+                                # prev_events. Wait it out and try again after.
+                                logger.warning(
+                                    "%s; waiting for %d ms...", e, e.retry_after_ms
+                                )
+                                await self.clock.sleep(e.retry_after_ms / 1000)
+
+                        # Success, no need to try the rest of the destinations.
                         break
-                    except FederationPullAttemptBackoffError as exc:
-                        # Log a warning about why we failed to process the event (the error message
-                        # for `FederationPullAttemptBackoffError` is pretty good)
-                        logger.warning("_sync_partial_state_room: %s", exc)
-                        # We do not record a failed pull attempt when we backoff fetching a missing
-                        # `prev_event` because not being able to fetch the `prev_events` just means
-                        # we won't be able to de-outlier the pulled event. But we can still use an
-                        # `outlier` in the state/auth chain for another event. So we shouldn't stop
-                        # a downstream event from trying to pull it.
-                        #
-                        # This avoids a cascade of backoff for all events in the DAG downstream from
-                        # one event backoff upstream.
                     except FederationError as e:
-                        # TODO: We should `record_event_failed_pull_attempt` here,
-                        #   see https://github.com/matrix-org/synapse/issues/13700
-
                         if attempt == len(destinations) - 1:
                             # We have tried every remote server for this event. Give up.
                             # TODO(faster_joins) giving up isn't the right thing to do
@@ -1986,6 +1984,8 @@ class FederationHandler:
                                 destination,
                                 e,
                             )
+                            # TODO: We should `record_event_failed_pull_attempt` here,
+                            #   see https://github.com/matrix-org/synapse/issues/13700
                             raise
 
                         # Try the next remote server.
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index 648843cdbe..982c8d3b2f 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -140,6 +140,7 @@ class FederationEventHandler:
     """
 
     def __init__(self, hs: "HomeServer"):
+        self._clock = hs.get_clock()
         self._store = hs.get_datastores().main
         self._storage_controllers = hs.get_storage_controllers()
         self._state_storage_controller = self._storage_controllers.state
@@ -1038,8 +1039,8 @@ class FederationEventHandler:
 
         Raises:
             FederationPullAttemptBackoffError if we are are deliberately not attempting
-                to pull the given event over federation because we've already done so
-                recently and are backing off.
+                to pull one of the given event's `prev_event`s over federation because
+                we've already done so recently and are backing off.
             FederationError if we fail to get the state from the remote server after any
                 missing `prev_event`s.
         """
@@ -1053,13 +1054,22 @@ class FederationEventHandler:
         # If we've already recently attempted to pull this missing event, don't
         # try it again so soon. Since we have to fetch all of the prev_events, we can
         # bail early here if we find any to ignore.
-        prevs_to_ignore = await self._store.get_event_ids_to_not_pull_from_backoff(
-            room_id, missing_prevs
+        prevs_with_pull_backoff = (
+            await self._store.get_event_ids_to_not_pull_from_backoff(
+                room_id, missing_prevs
+            )
         )
-        if len(prevs_to_ignore) > 0:
+        if len(prevs_with_pull_backoff) > 0:
             raise FederationPullAttemptBackoffError(
-                event_ids=prevs_to_ignore,
-                message=f"While computing context for event={event_id}, not attempting to pull missing prev_event={prevs_to_ignore[0]} because we already tried to pull recently (backing off).",
+                event_ids=prevs_with_pull_backoff.keys(),
+                message=(
+                    f"While computing context for event={event_id}, not attempting to "
+                    f"pull missing prev_events={list(prevs_with_pull_backoff.keys())} "
+                    "because we already tried to pull recently (backing off)."
+                ),
+                retry_after_ms=(
+                    max(prevs_with_pull_backoff.values()) - self._clock.time_msec()
+                ),
             )
 
         if not missing_prevs:
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index ff3edeb716..a19ba88bf8 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -1544,7 +1544,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
         self,
         room_id: str,
         event_ids: Collection[str],
-    ) -> List[str]:
+    ) -> Dict[str, int]:
         """
         Filter down the events to ones that we've failed to pull before recently. Uses
         exponential backoff.
@@ -1554,7 +1554,8 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
             event_ids: A list of events to filter down
 
         Returns:
-            List of event_ids that should not be attempted to be pulled
+            A dictionary of event_ids that should not be attempted to be pulled and the
+            next timestamp at which we may try pulling them again.
         """
         event_failed_pull_attempts = await self.db_pool.simple_select_many_batch(
             table="event_failed_pull_attempts",
@@ -1570,22 +1571,28 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
         )
 
         current_time = self._clock.time_msec()
-        return [
-            event_failed_pull_attempt["event_id"]
-            for event_failed_pull_attempt in event_failed_pull_attempts
+
+        event_ids_with_backoff = {}
+        for event_failed_pull_attempt in event_failed_pull_attempts:
+            event_id = event_failed_pull_attempt["event_id"]
             # Exponential back-off (up to the upper bound) so we don't try to
             # pull the same event over and over. ex. 2hr, 4hr, 8hr, 16hr, etc.
-            if current_time
-            < event_failed_pull_attempt["last_attempt_ts"]
-            + (
-                2
-                ** min(
-                    event_failed_pull_attempt["num_attempts"],
-                    BACKFILL_EVENT_EXPONENTIAL_BACKOFF_MAXIMUM_DOUBLING_STEPS,
+            backoff_end_time = (
+                event_failed_pull_attempt["last_attempt_ts"]
+                + (
+                    2
+                    ** min(
+                        event_failed_pull_attempt["num_attempts"],
+                        BACKFILL_EVENT_EXPONENTIAL_BACKOFF_MAXIMUM_DOUBLING_STEPS,
+                    )
                 )
+                * BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_MILLISECONDS
             )
-            * BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_MILLISECONDS
-        ]
+
+            if current_time < backoff_end_time:  # `backoff_end_time` is exclusive
+                event_ids_with_backoff[event_id] = backoff_end_time
+
+        return event_ids_with_backoff
 
     async def get_missing_events(
         self,