diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 80ee7e7b4e..b4b63a342a 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -20,7 +20,16 @@ import itertools
import logging
from enum import Enum
from http import HTTPStatus
-from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Tuple, Union
+from typing import (
+ TYPE_CHECKING,
+ Collection,
+ Dict,
+ Iterable,
+ List,
+ Optional,
+ Tuple,
+ Union,
+)
import attr
from signedjson.key import decode_verify_key_bytes
@@ -34,6 +43,7 @@ from synapse.api.errors import (
CodeMessageException,
Codes,
FederationDeniedError,
+ FederationError,
HttpResponseException,
NotFoundError,
RequestSendFailed,
@@ -545,7 +555,8 @@ class FederationHandler:
run_as_background_process(
desc="sync_partial_state_room",
func=self._sync_partial_state_room,
- destination=origin,
+ initial_destination=origin,
+ other_destinations=ret.servers_in_room,
room_id=room_id,
)
@@ -1454,13 +1465,16 @@ class FederationHandler:
async def _sync_partial_state_room(
self,
- destination: str,
+ initial_destination: Optional[str],
+ other_destinations: Collection[str],
room_id: str,
) -> None:
"""Background process to resync the state of a partial-state room
Args:
- destination: homeserver to pull the state from
+ initial_destination: the initial homeserver to pull the state from
+ other_destinations: other homeservers to try to pull the state from, if
+ `initial_destination` is unavailable
room_id: room to be resynced
"""
@@ -1472,8 +1486,29 @@ class FederationHandler:
# really leave, that might mean we have difficulty getting the room state over
# federation.
#
- # TODO(faster_joins): try other destinations if the one we have fails
+ # TODO(faster_joins): we need some way of prioritising which homeservers in
+ # `other_destinations` to try first, otherwise we'll spend ages trying dead
+ # homeservers for large rooms.
+
+ if initial_destination is None and len(other_destinations) == 0:
+ raise ValueError(
+ f"Cannot resync state of {room_id}: no destinations provided"
+ )
+ # Make an infinite iterator of destinations to try. Once we find a working
+ # destination, we'll stick with it until it flakes.
+ if initial_destination is not None:
+ # Move `initial_destination` to the front of the list.
+ destinations = list(other_destinations)
+ if initial_destination in destinations:
+ destinations.remove(initial_destination)
+ destinations = [initial_destination] + destinations
+ destination_iter = itertools.cycle(destinations)
+ else:
+ destination_iter = itertools.cycle(other_destinations)
+
+ # `destination` is the current remote homeserver we're pulling from.
+ destination = next(destination_iter)
logger.info("Syncing state for room %s via %s", room_id, destination)
# we work through the queue in order of increasing stream ordering.
@@ -1511,6 +1546,41 @@ class FederationHandler:
allow_rejected=True,
)
for event in events:
- await self._federation_event_handler.update_state_for_partial_state_event(
- destination, event
- )
+ for attempt in itertools.count():
+ try:
+ await self._federation_event_handler.update_state_for_partial_state_event(
+ destination, event
+ )
+ break
+ except FederationError as e:
+ if attempt == len(destinations) - 1:
+ # We have tried every remote server for this event. Give up.
+ # TODO(faster_joins) giving up isn't the right thing to do
+ # if there's a temporary network outage. retrying
+ # indefinitely is also not the right thing to do if we can
+ # reach all homeservers and they all claim they don't have
+ # the state we want.
+ logger.error(
+ "Failed to get state for %s at %s from %s because %s, "
+ "giving up!",
+ room_id,
+ event,
+ destination,
+ e,
+ )
+ raise
+
+ # Try the next remote server.
+ logger.info(
+ "Failed to get state for %s at %s from %s because %s",
+ room_id,
+ event,
+ destination,
+ e,
+ )
+ destination = next(destination_iter)
+ logger.info(
+ "Syncing state for room %s via %s instead",
+ room_id,
+ destination,
+ )
|