diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index a837c18726..2dc1a2397d 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -43,6 +43,7 @@ from synapse.api.errors import (
Codes,
FederationDeniedError,
HttpResponseException,
+ RequestSendFailed,
SynapseError,
UnsupportedRoomVersionError,
)
@@ -111,6 +112,23 @@ class FederationClient(FederationBase):
reset_expiry_on_get=False,
)
+ # A cache for fetching the room hierarchy over federation.
+ #
+ # Some stale data over federation is OK, but must be refreshed
+ # periodically since the local server is in the room.
+ #
+ # It is a map of (room ID, suggested-only) -> the response of
+ # get_room_hierarchy.
+ self._get_room_hierarchy_cache: ExpiringCache[
+ Tuple[str, bool], Tuple[JsonDict, Sequence[JsonDict], Sequence[str]]
+ ] = ExpiringCache(
+ cache_name="get_room_hierarchy_cache",
+ clock=self._clock,
+ max_len=1000,
+ expiry_ms=5 * 60 * 1000,
+ reset_expiry_on_get=False,
+ )
+
def _clear_tried_cache(self):
"""Clear pdu_destination_tried cache"""
now = self._clock.time_msec()
@@ -559,7 +577,11 @@ class FederationClient(FederationBase):
try:
return await callback(destination)
- except InvalidResponseError as e:
+ except (
+ RequestSendFailed,
+ InvalidResponseError,
+ NotRetryingDestination,
+ ) as e:
logger.warning("Failed to %s via %s: %s", description, destination, e)
except UnsupportedRoomVersionError:
raise
@@ -1112,7 +1134,8 @@ class FederationClient(FederationBase):
The response from the remote server.
Raises:
- HttpResponseException: There was an exception returned from the remote server
+ HttpResponseException / RequestSendFailed: There was an exception
+ returned from the remote server
SynapseException: M_FORBIDDEN when the remote server has disallowed publicRoom
requests over federation
@@ -1293,8 +1316,145 @@ class FederationClient(FederationBase):
failover_on_unknown_endpoint=True,
)
+ async def get_room_hierarchy(
+ self,
+ destinations: Iterable[str],
+ room_id: str,
+ suggested_only: bool,
+ ) -> Tuple[JsonDict, Sequence[JsonDict], Sequence[str]]:
+ """
+ Call other servers to get a hierarchy of the given room.
+
+ Performs simple data validates and parsing of the response.
+
+ Args:
+ destinations: The remote servers. We will try them in turn, omitting any
+ that have been blacklisted.
+ room_id: ID of the space to be queried
+ suggested_only: If true, ask the remote server to only return children
+ with the "suggested" flag set
+
+ Returns:
+ A tuple of:
+ The room as a JSON dictionary.
+ A list of children rooms, as JSON dictionaries.
+ A list of inaccessible children room IDs.
+
+ Raises:
+ SynapseError if we were unable to get a valid summary from any of the
+ remote servers
+ """
+
+ cached_result = self._get_room_hierarchy_cache.get((room_id, suggested_only))
+ if cached_result:
+ return cached_result
-@attr.s(frozen=True, slots=True)
+ async def send_request(
+ destination: str,
+ ) -> Tuple[JsonDict, Sequence[JsonDict], Sequence[str]]:
+ res = await self.transport_layer.get_room_hierarchy(
+ destination=destination,
+ room_id=room_id,
+ suggested_only=suggested_only,
+ )
+
+ room = res.get("room")
+ if not isinstance(room, dict):
+ raise InvalidResponseError("'room' must be a dict")
+
+ # Validate children_state of the room.
+ children_state = room.get("children_state", [])
+ if not isinstance(children_state, Sequence):
+ raise InvalidResponseError("'room.children_state' must be a list")
+ if any(not isinstance(e, dict) for e in children_state):
+ raise InvalidResponseError("Invalid event in 'children_state' list")
+ try:
+ [
+ FederationSpaceSummaryEventResult.from_json_dict(e)
+ for e in children_state
+ ]
+ except ValueError as e:
+ raise InvalidResponseError(str(e))
+
+ # Validate the children rooms.
+ children = res.get("children", [])
+ if not isinstance(children, Sequence):
+ raise InvalidResponseError("'children' must be a list")
+ if any(not isinstance(r, dict) for r in children):
+ raise InvalidResponseError("Invalid room in 'children' list")
+
+ # Validate the inaccessible children.
+ inaccessible_children = res.get("inaccessible_children", [])
+ if not isinstance(inaccessible_children, Sequence):
+ raise InvalidResponseError("'inaccessible_children' must be a list")
+ if any(not isinstance(r, str) for r in inaccessible_children):
+ raise InvalidResponseError(
+ "Invalid room ID in 'inaccessible_children' list"
+ )
+
+ return room, children, inaccessible_children
+
+ try:
+ result = await self._try_destination_list(
+ "fetch room hierarchy",
+ destinations,
+ send_request,
+ failover_on_unknown_endpoint=True,
+ )
+ except SynapseError as e:
+ # If an unexpected error occurred, re-raise it.
+ if e.code != 502:
+ raise
+
+ # Fallback to the old federation API and translate the results if
+ # no servers implement the new API.
+ #
+ # The algorithm below is a bit inefficient as it only attempts to
+ # parse information for the requested room, but the legacy API may
+ # return additional layers.
+ legacy_result = await self.get_space_summary(
+ destinations,
+ room_id,
+ suggested_only,
+ max_rooms_per_space=None,
+ exclude_rooms=[],
+ )
+
+ # Find the requested room in the response (and remove it).
+ for _i, room in enumerate(legacy_result.rooms):
+ if room.get("room_id") == room_id:
+ break
+ else:
+ # The requested room was not returned, nothing we can do.
+ raise
+ requested_room = legacy_result.rooms.pop(_i)
+
+ # Find any children events of the requested room.
+ children_events = []
+ children_room_ids = set()
+ for event in legacy_result.events:
+ if event.room_id == room_id:
+ children_events.append(event.data)
+ children_room_ids.add(event.state_key)
+ # And add them under the requested room.
+ requested_room["children_state"] = children_events
+
+ # Find the children rooms.
+ children = []
+ for room in legacy_result.rooms:
+ if room.get("room_id") in children_room_ids:
+ children.append(room)
+
+ # It isn't clear from the response whether some of the rooms are
+ # not accessible.
+ result = (requested_room, children, ())
+
+ # Cache the result to avoid fetching data over federation every time.
+ self._get_room_hierarchy_cache[(room_id, suggested_only)] = result
+ return result
+
+
+@attr.s(frozen=True, slots=True, auto_attribs=True)
class FederationSpaceSummaryEventResult:
"""Represents a single event in the result of a successful get_space_summary call.
@@ -1303,12 +1463,13 @@ class FederationSpaceSummaryEventResult:
object attributes.
"""
- event_type = attr.ib(type=str)
- state_key = attr.ib(type=str)
- via = attr.ib(type=Sequence[str])
+ event_type: str
+ room_id: str
+ state_key: str
+ via: Sequence[str]
# the raw data, including the above keys
- data = attr.ib(type=JsonDict)
+ data: JsonDict
@classmethod
def from_json_dict(cls, d: JsonDict) -> "FederationSpaceSummaryEventResult":
@@ -1325,6 +1486,10 @@ class FederationSpaceSummaryEventResult:
if not isinstance(event_type, str):
raise ValueError("Invalid event: 'event_type' must be a str")
+ room_id = d.get("room_id")
+ if not isinstance(room_id, str):
+ raise ValueError("Invalid event: 'room_id' must be a str")
+
state_key = d.get("state_key")
if not isinstance(state_key, str):
raise ValueError("Invalid event: 'state_key' must be a str")
@@ -1339,15 +1504,15 @@ class FederationSpaceSummaryEventResult:
if any(not isinstance(v, str) for v in via):
raise ValueError("Invalid event: 'via' must be a list of strings")
- return cls(event_type, state_key, via, d)
+ return cls(event_type, room_id, state_key, via, d)
-@attr.s(frozen=True, slots=True)
+@attr.s(frozen=True, slots=True, auto_attribs=True)
class FederationSpaceSummaryResult:
"""Represents the data returned by a successful get_space_summary call."""
- rooms = attr.ib(type=Sequence[JsonDict])
- events = attr.ib(type=Sequence[FederationSpaceSummaryEventResult])
+ rooms: List[JsonDict]
+ events: Sequence[FederationSpaceSummaryEventResult]
@classmethod
def from_json_dict(cls, d: JsonDict) -> "FederationSpaceSummaryResult":
@@ -1360,7 +1525,7 @@ class FederationSpaceSummaryResult:
ValueError if d is not a valid /spaces/ response
"""
rooms = d.get("rooms")
- if not isinstance(rooms, Sequence):
+ if not isinstance(rooms, List):
raise ValueError("'rooms' must be a list")
if any(not isinstance(r, dict) for r in rooms):
raise ValueError("Invalid room in 'rooms' list")
|