diff --git a/changelog.d/15351.bugfix b/changelog.d/15351.bugfix
new file mode 100644
index 0000000000..e68023c671
--- /dev/null
+++ b/changelog.d/15351.bugfix
@@ -0,0 +1 @@
+Fix a bug introduced in Synapse 1.70.0 where the background sync from a faster join could spin for hours when one of the events involved had been marked for backoff.
diff --git a/synapse/api/errors.py b/synapse/api/errors.py
index 8c6822f3c6..f2d6f9ab2d 100644
--- a/synapse/api/errors.py
+++ b/synapse/api/errors.py
@@ -27,7 +27,7 @@ from synapse.util import json_decoder
if typing.TYPE_CHECKING:
from synapse.config.homeserver import HomeServerConfig
- from synapse.types import JsonDict
+ from synapse.types import JsonDict, StrCollection
logger = logging.getLogger(__name__)
@@ -682,18 +682,27 @@ class FederationPullAttemptBackoffError(RuntimeError):
Attributes:
event_id: The event_id which we are refusing to pull
message: A custom error message that gives more context
+ retry_after_ms: The remaining backoff interval, in milliseconds
"""
- def __init__(self, event_ids: List[str], message: Optional[str]):
- self.event_ids = event_ids
+ def __init__(
+ self, event_ids: "StrCollection", message: Optional[str], retry_after_ms: int
+ ):
+ event_ids = list(event_ids)
if message:
error_message = message
else:
- error_message = f"Not attempting to pull event_ids={self.event_ids} because we already tried to pull them recently (backing off)."
+ error_message = (
+ f"Not attempting to pull event_ids={event_ids} because we already "
+ "tried to pull them recently (backing off)."
+ )
super().__init__(error_message)
+ self.event_ids = event_ids
+ self.retry_after_ms = retry_after_ms
+
class HttpResponseException(CodeMessageException):
"""
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 80156ef343..65461a0787 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -1949,27 +1949,25 @@ class FederationHandler:
)
for event in events:
for attempt in itertools.count():
+ # We try a new destination on every iteration.
try:
- await self._federation_event_handler.update_state_for_partial_state_event(
- destination, event
- )
+ while True:
+ try:
+ await self._federation_event_handler.update_state_for_partial_state_event(
+ destination, event
+ )
+ break
+ except FederationPullAttemptBackoffError as e:
+ # We are in the backoff period for one of the event's
+ # prev_events. Wait it out and try again after.
+ logger.warning(
+ "%s; waiting for %d ms...", e, e.retry_after_ms
+ )
+ await self.clock.sleep(e.retry_after_ms / 1000)
+
+ # Success, no need to try the rest of the destinations.
break
- except FederationPullAttemptBackoffError as exc:
- # Log a warning about why we failed to process the event (the error message
- # for `FederationPullAttemptBackoffError` is pretty good)
- logger.warning("_sync_partial_state_room: %s", exc)
- # We do not record a failed pull attempt when we backoff fetching a missing
- # `prev_event` because not being able to fetch the `prev_events` just means
- # we won't be able to de-outlier the pulled event. But we can still use an
- # `outlier` in the state/auth chain for another event. So we shouldn't stop
- # a downstream event from trying to pull it.
- #
- # This avoids a cascade of backoff for all events in the DAG downstream from
- # one event backoff upstream.
except FederationError as e:
- # TODO: We should `record_event_failed_pull_attempt` here,
- # see https://github.com/matrix-org/synapse/issues/13700
-
if attempt == len(destinations) - 1:
# We have tried every remote server for this event. Give up.
# TODO(faster_joins) giving up isn't the right thing to do
@@ -1986,6 +1984,8 @@ class FederationHandler:
destination,
e,
)
+ # TODO: We should `record_event_failed_pull_attempt` here,
+ # see https://github.com/matrix-org/synapse/issues/13700
raise
# Try the next remote server.
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index 648843cdbe..982c8d3b2f 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -140,6 +140,7 @@ class FederationEventHandler:
"""
def __init__(self, hs: "HomeServer"):
+ self._clock = hs.get_clock()
self._store = hs.get_datastores().main
self._storage_controllers = hs.get_storage_controllers()
self._state_storage_controller = self._storage_controllers.state
@@ -1038,8 +1039,8 @@ class FederationEventHandler:
Raises:
FederationPullAttemptBackoffError if we are are deliberately not attempting
- to pull the given event over federation because we've already done so
- recently and are backing off.
+ to pull one of the given event's `prev_event`s over federation because
+ we've already done so recently and are backing off.
FederationError if we fail to get the state from the remote server after any
missing `prev_event`s.
"""
@@ -1053,13 +1054,22 @@ class FederationEventHandler:
# If we've already recently attempted to pull this missing event, don't
# try it again so soon. Since we have to fetch all of the prev_events, we can
# bail early here if we find any to ignore.
- prevs_to_ignore = await self._store.get_event_ids_to_not_pull_from_backoff(
- room_id, missing_prevs
+ prevs_with_pull_backoff = (
+ await self._store.get_event_ids_to_not_pull_from_backoff(
+ room_id, missing_prevs
+ )
)
- if len(prevs_to_ignore) > 0:
+ if len(prevs_with_pull_backoff) > 0:
raise FederationPullAttemptBackoffError(
- event_ids=prevs_to_ignore,
- message=f"While computing context for event={event_id}, not attempting to pull missing prev_event={prevs_to_ignore[0]} because we already tried to pull recently (backing off).",
+ event_ids=prevs_with_pull_backoff.keys(),
+ message=(
+ f"While computing context for event={event_id}, not attempting to "
+ f"pull missing prev_events={list(prevs_with_pull_backoff.keys())} "
+ "because we already tried to pull recently (backing off)."
+ ),
+ retry_after_ms=(
+ max(prevs_with_pull_backoff.values()) - self._clock.time_msec()
+ ),
)
if not missing_prevs:
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index ff3edeb716..a19ba88bf8 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -1544,7 +1544,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
self,
room_id: str,
event_ids: Collection[str],
- ) -> List[str]:
+ ) -> Dict[str, int]:
"""
Filter down the events to ones that we've failed to pull before recently. Uses
exponential backoff.
@@ -1554,7 +1554,8 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
event_ids: A list of events to filter down
Returns:
- List of event_ids that should not be attempted to be pulled
+ A dictionary of event_ids that should not be attempted to be pulled and the
+ next timestamp at which we may try pulling them again.
"""
event_failed_pull_attempts = await self.db_pool.simple_select_many_batch(
table="event_failed_pull_attempts",
@@ -1570,22 +1571,28 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
)
current_time = self._clock.time_msec()
- return [
- event_failed_pull_attempt["event_id"]
- for event_failed_pull_attempt in event_failed_pull_attempts
+
+ event_ids_with_backoff = {}
+ for event_failed_pull_attempt in event_failed_pull_attempts:
+ event_id = event_failed_pull_attempt["event_id"]
# Exponential back-off (up to the upper bound) so we don't try to
# pull the same event over and over. ex. 2hr, 4hr, 8hr, 16hr, etc.
- if current_time
- < event_failed_pull_attempt["last_attempt_ts"]
- + (
- 2
- ** min(
- event_failed_pull_attempt["num_attempts"],
- BACKFILL_EVENT_EXPONENTIAL_BACKOFF_MAXIMUM_DOUBLING_STEPS,
+ backoff_end_time = (
+ event_failed_pull_attempt["last_attempt_ts"]
+ + (
+ 2
+ ** min(
+ event_failed_pull_attempt["num_attempts"],
+ BACKFILL_EVENT_EXPONENTIAL_BACKOFF_MAXIMUM_DOUBLING_STEPS,
+ )
)
+ * BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_MILLISECONDS
)
- * BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_MILLISECONDS
- ]
+
+ if current_time < backoff_end_time: # `backoff_end_time` is exclusive
+ event_ids_with_backoff[event_id] = backoff_end_time
+
+ return event_ids_with_backoff
async def get_missing_events(
self,
diff --git a/tests/storage/test_event_federation.py b/tests/storage/test_event_federation.py
index 3e1984c15c..81e50bdd55 100644
--- a/tests/storage/test_event_federation.py
+++ b/tests/storage/test_event_federation.py
@@ -1143,19 +1143,24 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase):
tok = self.login("alice", "test")
room_id = self.helper.create_room_as(room_creator=user_id, tok=tok)
+ failure_time = self.clock.time_msec()
self.get_success(
self.store.record_event_failed_pull_attempt(
room_id, "$failed_event_id", "fake cause"
)
)
- event_ids_to_backoff = self.get_success(
+ event_ids_with_backoff = self.get_success(
self.store.get_event_ids_to_not_pull_from_backoff(
room_id=room_id, event_ids=["$failed_event_id", "$normal_event_id"]
)
)
- self.assertEqual(event_ids_to_backoff, ["$failed_event_id"])
+ self.assertEqual(
+ event_ids_with_backoff,
+ # We expect a 2^1 hour backoff after a single failed attempt.
+ {"$failed_event_id": failure_time + 2 * 60 * 60 * 1000},
+ )
def test_get_event_ids_to_not_pull_from_backoff_retry_after_backoff_duration(
self,
@@ -1179,14 +1184,14 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase):
# attempt (2^1 hours).
self.reactor.advance(datetime.timedelta(hours=2).total_seconds())
- event_ids_to_backoff = self.get_success(
+ event_ids_with_backoff = self.get_success(
self.store.get_event_ids_to_not_pull_from_backoff(
room_id=room_id, event_ids=["$failed_event_id", "$normal_event_id"]
)
)
# Since this function only returns events we should backoff from, time has
# elapsed past the backoff range so there is no events to backoff from.
- self.assertEqual(event_ids_to_backoff, [])
+ self.assertEqual(event_ids_with_backoff, {})
@attr.s(auto_attribs=True)
|