summary refs log tree commit diff
path: root/synapse/handlers/federation.py
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/handlers/federation.py149
1 files changed, 100 insertions, 49 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 986ffed3d5..b2784d7333 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -45,6 +45,7 @@ from synapse.api.errors import (
     Codes,
     FederationDeniedError,
     FederationError,
+    FederationPullAttemptBackoffError,
     HttpResponseException,
     LimitExceededError,
     NotFoundError,
@@ -69,8 +70,8 @@ from synapse.replication.http.federation import (
 )
 from synapse.storage.databases.main.events import PartialStateConflictError
 from synapse.storage.databases.main.events_worker import EventRedactBehaviour
-from synapse.storage.state import StateFilter
 from synapse.types import JsonDict, get_domain_from_id
+from synapse.types.state import StateFilter
 from synapse.util.async_helpers import Linearizer
 from synapse.util.retryutils import NotRetryingDestination
 from synapse.visibility import filter_events_for_server
@@ -151,6 +152,7 @@ class FederationHandler:
         self._federation_event_handler = hs.get_federation_event_handler()
         self._device_handler = hs.get_device_handler()
         self._bulk_push_rule_evaluator = hs.get_bulk_push_rule_evaluator()
+        self._notifier = hs.get_notifier()
 
         self._clean_room_for_join_client = ReplicationCleanRoomRestServlet.make_client(
             hs
@@ -378,6 +380,7 @@ class FederationHandler:
             filtered_extremities = await filter_events_for_server(
                 self._storage_controllers,
                 self.server_name,
+                self.server_name,
                 events_to_check,
                 redact=False,
                 check_history_visibility_only=True,
@@ -441,6 +444,15 @@ class FederationHandler:
                     # appropriate stuff.
                     # TODO: We can probably do something more intelligent here.
                     return True
+                except NotRetryingDestination as e:
+                    logger.info("_maybe_backfill_inner: %s", e)
+                    continue
+                except FederationDeniedError:
+                    logger.info(
+                        "_maybe_backfill_inner: Not attempting to backfill from %s because the homeserver is not on our federation whitelist",
+                        dom,
+                    )
+                    continue
                 except (SynapseError, InvalidResponseError) as e:
                     logger.info("Failed to backfill from %s because %s", dom, e)
                     continue
@@ -476,15 +488,9 @@ class FederationHandler:
 
                     logger.info("Failed to backfill from %s because %s", dom, e)
                     continue
-                except NotRetryingDestination as e:
-                    logger.info(str(e))
-                    continue
                 except RequestSendFailed as e:
                     logger.info("Failed to get backfill from %s because %s", dom, e)
                     continue
-                except FederationDeniedError as e:
-                    logger.info(e)
-                    continue
                 except Exception as e:
                     logger.exception("Failed to backfill from %s because %s", dom, e)
                     continue
@@ -631,6 +637,7 @@ class FederationHandler:
                     room_id=room_id,
                     servers=ret.servers_in_room,
                     device_lists_stream_id=self.store.get_device_stream_token(),
+                    joined_via=origin,
                 )
 
             try:
@@ -781,15 +788,27 @@ class FederationHandler:
 
         # Send the signed event back to the room, and potentially receive some
         # further information about the room in the form of partial state events
-        stripped_room_state = await self.federation_client.send_knock(
-            target_hosts, event
-        )
+        knock_response = await self.federation_client.send_knock(target_hosts, event)
 
         # Store any stripped room state events in the "unsigned" key of the event.
         # This is a bit of a hack and is cribbing off of invites. Basically we
         # store the room state here and retrieve it again when this event appears
         # in the invitee's sync stream. It is stripped out for all other local users.
-        event.unsigned["knock_room_state"] = stripped_room_state["knock_state_events"]
+        stripped_room_state = (
+            knock_response.get("knock_room_state")
+            # Since v1.37, Synapse incorrectly used "knock_state_events" for this field.
+            # Thus, we also check for a 'knock_state_events' to support old instances.
+            # See https://github.com/matrix-org/synapse/issues/14088.
+            or knock_response.get("knock_state_events")
+        )
+
+        if stripped_room_state is None:
+            raise KeyError(
+                "Missing 'knock_room_state' (or legacy 'knock_state_events') field in "
+                "send_knock response"
+            )
+
+        event.unsigned["knock_room_state"] = stripped_room_state
 
         context = EventContext.for_outlier(self._storage_controllers)
         stream_id = await self._federation_event_handler.persist_events_and_notify(
@@ -928,7 +947,7 @@ class FederationHandler:
 
         # The remote hasn't signed it yet, obviously. We'll do the full checks
         # when we get the event back in `on_send_join_request`
-        await self._event_auth_handler.check_auth_rules_from_context(event, context)
+        await self._event_auth_handler.check_auth_rules_from_context(event)
         return event
 
     async def on_invite_request(
@@ -1003,7 +1022,9 @@ class FederationHandler:
 
         context = EventContext.for_outlier(self._storage_controllers)
 
-        await self._bulk_push_rule_evaluator.action_for_event_by_user(event, context)
+        await self._bulk_push_rule_evaluator.action_for_events_by_user(
+            [(event, context)]
+        )
         try:
             await self._federation_event_handler.persist_events_and_notify(
                 event.room_id, [(event, context)]
@@ -1109,7 +1130,7 @@ class FederationHandler:
         try:
             # The remote hasn't signed it yet, obviously. We'll do the full checks
             # when we get the event back in `on_send_leave_request`
-            await self._event_auth_handler.check_auth_rules_from_context(event, context)
+            await self._event_auth_handler.check_auth_rules_from_context(event)
         except AuthError as e:
             logger.warning("Failed to create new leave %r because %s", event, e)
             raise e
@@ -1168,7 +1189,7 @@ class FederationHandler:
         try:
             # The remote hasn't signed it yet, obviously. We'll do the full checks
             # when we get the event back in `on_send_knock_request`
-            await self._event_auth_handler.check_auth_rules_from_context(event, context)
+            await self._event_auth_handler.check_auth_rules_from_context(event)
         except AuthError as e:
             logger.warning("Failed to create new knock %r because %s", event, e)
             raise e
@@ -1212,7 +1233,9 @@ class FederationHandler:
     async def on_backfill_request(
         self, origin: str, room_id: str, pdu_list: List[str], limit: int
     ) -> List[EventBase]:
-        await self._event_auth_handler.assert_host_in_room(room_id, origin)
+        # We allow partially joined rooms since in this case we are filtering out
+        # non-local events in `filter_events_for_server`.
+        await self._event_auth_handler.assert_host_in_room(room_id, origin, True)
 
         # Synapse asks for 100 events per backfill request. Do not allow more.
         limit = min(limit, 100)
@@ -1233,7 +1256,7 @@ class FederationHandler:
         )
 
         events = await filter_events_for_server(
-            self._storage_controllers, origin, events
+            self._storage_controllers, origin, self.server_name, events
         )
 
         return events
@@ -1264,7 +1287,7 @@ class FederationHandler:
         await self._event_auth_handler.assert_host_in_room(event.room_id, origin)
 
         events = await filter_events_for_server(
-            self._storage_controllers, origin, [event]
+            self._storage_controllers, origin, self.server_name, [event]
         )
         event = events[0]
         return event
@@ -1277,7 +1300,9 @@ class FederationHandler:
         latest_events: List[str],
         limit: int,
     ) -> List[EventBase]:
-        await self._event_auth_handler.assert_host_in_room(room_id, origin)
+        # We allow partially joined rooms since in this case we are filtering out
+        # non-local events in `filter_events_for_server`.
+        await self._event_auth_handler.assert_host_in_room(room_id, origin, True)
 
         # Only allow up to 20 events to be retrieved per request.
         limit = min(limit, 20)
@@ -1290,7 +1315,7 @@ class FederationHandler:
         )
 
         missing_events = await filter_events_for_server(
-            self._storage_controllers, origin, missing_events
+            self._storage_controllers, origin, self.server_name, missing_events
         )
 
         return missing_events
@@ -1334,9 +1359,7 @@ class FederationHandler:
 
             try:
                 validate_event_for_room_version(event)
-                await self._event_auth_handler.check_auth_rules_from_context(
-                    event, context
-                )
+                await self._event_auth_handler.check_auth_rules_from_context(event)
             except AuthError as e:
                 logger.warning("Denying new third party invite %r because %s", event, e)
                 raise e
@@ -1386,7 +1409,7 @@ class FederationHandler:
 
         try:
             validate_event_for_room_version(event)
-            await self._event_auth_handler.check_auth_rules_from_context(event, context)
+            await self._event_auth_handler.check_auth_rules_from_context(event)
         except AuthError as e:
             logger.warning("Denying third party invite %r because %s", event, e)
             raise e
@@ -1579,8 +1602,8 @@ class FederationHandler:
         Fetch the complexity of a remote room over federation.
 
         Args:
-            remote_room_hosts (list[str]): The remote servers to ask.
-            room_id (str): The room ID to ask about.
+            remote_room_hosts: The remote servers to ask.
+            room_id: The room ID to ask about.
 
         Returns:
             Dict contains the complexity
@@ -1602,13 +1625,13 @@ class FederationHandler:
         """Resumes resyncing of all partial-state rooms after a restart."""
         assert not self.config.worker.worker_app
 
-        partial_state_rooms = await self.store.get_partial_state_rooms_and_servers()
-        for room_id, servers_in_room in partial_state_rooms.items():
+        partial_state_rooms = await self.store.get_partial_state_room_resync_info()
+        for room_id, resync_info in partial_state_rooms.items():
             run_as_background_process(
                 desc="sync_partial_state_room",
                 func=self._sync_partial_state_room,
-                initial_destination=None,
-                other_destinations=servers_in_room,
+                initial_destination=resync_info.joined_via,
+                other_destinations=resync_info.servers_in_room,
                 room_id=room_id,
             )
 
@@ -1637,28 +1660,12 @@ class FederationHandler:
         #   really leave, that might mean we have difficulty getting the room state over
         #   federation.
         #   https://github.com/matrix-org/synapse/issues/12802
-        #
-        # TODO(faster_joins): we need some way of prioritising which homeservers in
-        #   `other_destinations` to try first, otherwise we'll spend ages trying dead
-        #   homeservers for large rooms.
-        #   https://github.com/matrix-org/synapse/issues/12999
-
-        if initial_destination is None and len(other_destinations) == 0:
-            raise ValueError(
-                f"Cannot resync state of {room_id}: no destinations provided"
-            )
 
         # Make an infinite iterator of destinations to try. Once we find a working
         # destination, we'll stick with it until it flakes.
-        destinations: Collection[str]
-        if initial_destination is not None:
-            # Move `initial_destination` to the front of the list.
-            destinations = list(other_destinations)
-            if initial_destination in destinations:
-                destinations.remove(initial_destination)
-            destinations = [initial_destination] + destinations
-        else:
-            destinations = other_destinations
+        destinations = _prioritise_destinations_for_partial_state_resync(
+            initial_destination, other_destinations, room_id
+        )
         destination_iter = itertools.cycle(destinations)
 
         # `destination` is the current remote homeserver we're pulling from.
@@ -1686,6 +1693,9 @@ class FederationHandler:
                     self._storage_controllers.state.notify_room_un_partial_stated(
                         room_id
                     )
+                    # Poke the notifier so that other workers see the write to
+                    # the un-partial-stated rooms stream.
+                    self._notifier.notify_replication()
 
                     # TODO(faster_joins) update room stats and user directory?
                     #   https://github.com/matrix-org/synapse/issues/12814
@@ -1708,7 +1718,22 @@ class FederationHandler:
                             destination, event
                         )
                         break
+                    except FederationPullAttemptBackoffError as exc:
+                        # Log a warning about why we failed to process the event (the error message
+                        # for `FederationPullAttemptBackoffError` is pretty good)
+                        logger.warning("_sync_partial_state_room: %s", exc)
+                        # We do not record a failed pull attempt when we backoff fetching a missing
+                        # `prev_event` because not being able to fetch the `prev_events` just means
+                        # we won't be able to de-outlier the pulled event. But we can still use an
+                        # `outlier` in the state/auth chain for another event. So we shouldn't stop
+                        # a downstream event from trying to pull it.
+                        #
+                        # This avoids a cascade of backoff for all events in the DAG downstream from
+                        # one event backoff upstream.
                     except FederationError as e:
+                        # TODO: We should `record_event_failed_pull_attempt` here,
+                        #   see https://github.com/matrix-org/synapse/issues/13700
+
                         if attempt == len(destinations) - 1:
                             # We have tried every remote server for this event. Give up.
                             # TODO(faster_joins) giving up isn't the right thing to do
@@ -1741,3 +1766,29 @@ class FederationHandler:
                             room_id,
                             destination,
                         )
+
+
+def _prioritise_destinations_for_partial_state_resync(
+    initial_destination: Optional[str],
+    other_destinations: Collection[str],
+    room_id: str,
+) -> Collection[str]:
+    """Work out the order in which we should ask servers to resync events.
+
+    If an `initial_destination` is given, it takes top priority. Otherwise
+    all servers are treated equally.
+
+    :raises ValueError: if no destination is provided at all.
+    """
+    if initial_destination is None and len(other_destinations) == 0:
+        raise ValueError(f"Cannot resync state of {room_id}: no destinations provided")
+
+    if initial_destination is None:
+        return other_destinations
+
+    # Move `initial_destination` to the front of the list.
+    destinations = list(other_destinations)
+    if initial_destination in destinations:
+        destinations.remove(initial_destination)
+    destinations = [initial_destination] + destinations
+    return destinations