summary refs log tree commit diff
path: root/synapse/handlers/federation.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/federation.py')
-rw-r--r--synapse/handlers/federation.py86
1 files changed, 78 insertions, 8 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 80ee7e7b4e..b4b63a342a 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -20,7 +20,16 @@ import itertools
 import logging
 from enum import Enum
 from http import HTTPStatus
-from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Tuple, Union
+from typing import (
+    TYPE_CHECKING,
+    Collection,
+    Dict,
+    Iterable,
+    List,
+    Optional,
+    Tuple,
+    Union,
+)
 
 import attr
 from signedjson.key import decode_verify_key_bytes
@@ -34,6 +43,7 @@ from synapse.api.errors import (
     CodeMessageException,
     Codes,
     FederationDeniedError,
+    FederationError,
     HttpResponseException,
     NotFoundError,
     RequestSendFailed,
@@ -545,7 +555,8 @@ class FederationHandler:
                 run_as_background_process(
                     desc="sync_partial_state_room",
                     func=self._sync_partial_state_room,
-                    destination=origin,
+                    initial_destination=origin,
+                    other_destinations=ret.servers_in_room,
                     room_id=room_id,
                 )
 
@@ -1454,13 +1465,16 @@ class FederationHandler:
 
     async def _sync_partial_state_room(
         self,
-        destination: str,
+        initial_destination: Optional[str],
+        other_destinations: Collection[str],
         room_id: str,
     ) -> None:
         """Background process to resync the state of a partial-state room
 
         Args:
-            destination: homeserver to pull the state from
+            initial_destination: the initial homeserver to pull the state from
+            other_destinations: other homeservers to try to pull the state from, if
+                `initial_destination` is unavailable
             room_id: room to be resynced
         """
 
@@ -1472,8 +1486,29 @@ class FederationHandler:
         #   really leave, that might mean we have difficulty getting the room state over
         #   federation.
         #
-        # TODO(faster_joins): try other destinations if the one we have fails
+        # TODO(faster_joins): we need some way of prioritising which homeservers in
+        #   `other_destinations` to try first, otherwise we'll spend ages trying dead
+        #   homeservers for large rooms.
+
+        if initial_destination is None and len(other_destinations) == 0:
+            raise ValueError(
+                f"Cannot resync state of {room_id}: no destinations provided"
+            )
 
+        # Make an infinite iterator of destinations to try. Once we find a working
+        # destination, we'll stick with it until it flakes.
+        if initial_destination is not None:
+            # Move `initial_destination` to the front of the list.
+            destinations = list(other_destinations)
+            if initial_destination in destinations:
+                destinations.remove(initial_destination)
+            destinations = [initial_destination] + destinations
+            destination_iter = itertools.cycle(destinations)
+        else:
+            destination_iter = itertools.cycle(other_destinations)
+
+        # `destination` is the current remote homeserver we're pulling from.
+        destination = next(destination_iter)
         logger.info("Syncing state for room %s via %s", room_id, destination)
 
         # we work through the queue in order of increasing stream ordering.
@@ -1511,6 +1546,41 @@ class FederationHandler:
                 allow_rejected=True,
             )
             for event in events:
-                await self._federation_event_handler.update_state_for_partial_state_event(
-                    destination, event
-                )
+                for attempt in itertools.count():
+                    try:
+                        await self._federation_event_handler.update_state_for_partial_state_event(
+                            destination, event
+                        )
+                        break
+                    except FederationError as e:
+                        if attempt == len(destinations) - 1:
+                            # We have tried every remote server for this event. Give up.
+                            # TODO(faster_joins) giving up isn't the right thing to do
+                            #   if there's a temporary network outage. retrying
+                            #   indefinitely is also not the right thing to do if we can
+                            #   reach all homeservers and they all claim they don't have
+                            #   the state we want.
+                            logger.error(
+                                "Failed to get state for %s at %s from %s because %s, "
+                                "giving up!",
+                                room_id,
+                                event,
+                                destination,
+                                e,
+                            )
+                            raise
+
+                        # Try the next remote server.
+                        logger.info(
+                            "Failed to get state for %s at %s from %s because %s",
+                            room_id,
+                            event,
+                            destination,
+                            e,
+                        )
+                        destination = next(destination_iter)
+                        logger.info(
+                            "Syncing state for room %s via %s instead",
+                            room_id,
+                            destination,
+                        )