From 3b354faad0e6b1f41ed5dd0269a1785d3f505465 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 5 Aug 2021 08:39:17 -0400 Subject: Refactoring before implementing the updated spaces summary. (#10527) This should have no user-visible changes, but refactors some pieces of the SpaceSummaryHandler before adding support for the updated MSC2946. --- synapse/federation/federation_client.py | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) (limited to 'synapse/federation/federation_client.py') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index b7a10da15a..007d1a27dc 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -1290,7 +1290,7 @@ class FederationClient(FederationBase): ) -@attr.s(frozen=True, slots=True) +@attr.s(frozen=True, slots=True, auto_attribs=True) class FederationSpaceSummaryEventResult: """Represents a single event in the result of a successful get_space_summary call. @@ -1299,12 +1299,13 @@ class FederationSpaceSummaryEventResult: object attributes. """ - event_type = attr.ib(type=str) - state_key = attr.ib(type=str) - via = attr.ib(type=Sequence[str]) + event_type: str + room_id: str + state_key: str + via: Sequence[str] # the raw data, including the above keys - data = attr.ib(type=JsonDict) + data: JsonDict @classmethod def from_json_dict(cls, d: JsonDict) -> "FederationSpaceSummaryEventResult": @@ -1321,6 +1322,10 @@ class FederationSpaceSummaryEventResult: if not isinstance(event_type, str): raise ValueError("Invalid event: 'event_type' must be a str") + room_id = d.get("room_id") + if not isinstance(room_id, str): + raise ValueError("Invalid event: 'room_id' must be a str") + state_key = d.get("state_key") if not isinstance(state_key, str): raise ValueError("Invalid event: 'state_key' must be a str") @@ -1335,15 +1340,15 @@ class FederationSpaceSummaryEventResult: if any(not isinstance(v, str) for v in via): raise ValueError("Invalid event: 'via' must be a list of strings") - return cls(event_type, state_key, via, d) + return cls(event_type, room_id, state_key, via, d) -@attr.s(frozen=True, slots=True) +@attr.s(frozen=True, slots=True, auto_attribs=True) class FederationSpaceSummaryResult: """Represents the data returned by a successful get_space_summary call.""" - rooms = attr.ib(type=Sequence[JsonDict]) - events = attr.ib(type=Sequence[FederationSpaceSummaryEventResult]) + rooms: Sequence[JsonDict] + events: Sequence[FederationSpaceSummaryEventResult] @classmethod def from_json_dict(cls, d: JsonDict) -> "FederationSpaceSummaryResult": -- cgit 1.5.1 From 60f0534b6e910a497800da2454638bcf4aae006e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 6 Aug 2021 14:05:41 +0100 Subject: Fix exceptions in logs when failing to get remote room list (#10541) --- changelog.d/10541.bugfix | 1 + synapse/federation/federation_client.py | 3 +- synapse/handlers/room_list.py | 46 ++++++++++------- synapse/rest/client/v1/room.py | 30 +++++------ tests/rest/client/v1/test_rooms.py | 92 ++++++++++++++++++++++++++++++++- 5 files changed, 134 insertions(+), 38 deletions(-) create mode 100644 changelog.d/10541.bugfix (limited to 'synapse/federation/federation_client.py') diff --git a/changelog.d/10541.bugfix b/changelog.d/10541.bugfix new file mode 100644 index 0000000000..bb946e0920 --- /dev/null +++ b/changelog.d/10541.bugfix @@ -0,0 +1 @@ +Fix exceptions in logs when failing to get remote room list. diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 007d1a27dc..2eefac04fd 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -1108,7 +1108,8 @@ class FederationClient(FederationBase): The response from the remote server. Raises: - HttpResponseException: There was an exception returned from the remote server + HttpResponseException / RequestSendFailed: There was an exception + returned from the remote server SynapseException: M_FORBIDDEN when the remote server has disallowed publicRoom requests over federation diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index fae2c098e3..6d433fad41 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -356,6 +356,12 @@ class RoomListHandler(BaseHandler): include_all_networks: bool = False, third_party_instance_id: Optional[str] = None, ) -> JsonDict: + """Get the public room list from remote server + + Raises: + SynapseError + """ + if not self.enable_room_list_search: return {"chunk": [], "total_room_count_estimate": 0} @@ -395,13 +401,16 @@ class RoomListHandler(BaseHandler): limit = None since_token = None - res = await self._get_remote_list_cached( - server_name, - limit=limit, - since_token=since_token, - include_all_networks=include_all_networks, - third_party_instance_id=third_party_instance_id, - ) + try: + res = await self._get_remote_list_cached( + server_name, + limit=limit, + since_token=since_token, + include_all_networks=include_all_networks, + third_party_instance_id=third_party_instance_id, + ) + except (RequestSendFailed, HttpResponseException): + raise SynapseError(502, "Failed to fetch room list") if search_filter: res = { @@ -423,20 +432,21 @@ class RoomListHandler(BaseHandler): include_all_networks: bool = False, third_party_instance_id: Optional[str] = None, ) -> JsonDict: + """Wrapper around FederationClient.get_public_rooms that caches the + result. + """ + repl_layer = self.hs.get_federation_client() if search_filter: # We can't cache when asking for search - try: - return await repl_layer.get_public_rooms( - server_name, - limit=limit, - since_token=since_token, - search_filter=search_filter, - include_all_networks=include_all_networks, - third_party_instance_id=third_party_instance_id, - ) - except (RequestSendFailed, HttpResponseException): - raise SynapseError(502, "Failed to fetch room list") + return await repl_layer.get_public_rooms( + server_name, + limit=limit, + since_token=since_token, + search_filter=search_filter, + include_all_networks=include_all_networks, + third_party_instance_id=third_party_instance_id, + ) key = ( server_name, diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 982f134148..f887970b76 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -23,7 +23,6 @@ from synapse.api.constants import EventContentFields, EventTypes, Membership from synapse.api.errors import ( AuthError, Codes, - HttpResponseException, InvalidClientCredentialsError, ShadowBanError, SynapseError, @@ -783,12 +782,9 @@ class PublicRoomListRestServlet(TransactionRestServlet): Codes.INVALID_PARAM, ) - try: - data = await handler.get_remote_public_room_list( - server, limit=limit, since_token=since_token - ) - except HttpResponseException as e: - raise e.to_synapse_error() + data = await handler.get_remote_public_room_list( + server, limit=limit, since_token=since_token + ) else: data = await handler.get_local_public_room_list( limit=limit, since_token=since_token @@ -836,17 +832,15 @@ class PublicRoomListRestServlet(TransactionRestServlet): Codes.INVALID_PARAM, ) - try: - data = await handler.get_remote_public_room_list( - server, - limit=limit, - since_token=since_token, - search_filter=search_filter, - include_all_networks=include_all_networks, - third_party_instance_id=third_party_instance_id, - ) - except HttpResponseException as e: - raise e.to_synapse_error() + data = await handler.get_remote_public_room_list( + server, + limit=limit, + since_token=since_token, + search_filter=search_filter, + include_all_networks=include_all_networks, + third_party_instance_id=third_party_instance_id, + ) + else: data = await handler.get_local_public_room_list( limit=limit, diff --git a/tests/rest/client/v1/test_rooms.py b/tests/rest/client/v1/test_rooms.py index 3df070c936..1a9528ec20 100644 --- a/tests/rest/client/v1/test_rooms.py +++ b/tests/rest/client/v1/test_rooms.py @@ -19,11 +19,14 @@ import json from typing import Iterable -from unittest.mock import Mock +from unittest.mock import Mock, call from urllib import parse as urlparse +from twisted.internet import defer + import synapse.rest.admin from synapse.api.constants import EventContentFields, EventTypes, Membership +from synapse.api.errors import HttpResponseException from synapse.handlers.pagination import PurgeStatus from synapse.rest import admin from synapse.rest.client.v1 import directory, login, profile, room @@ -1124,6 +1127,93 @@ class PublicRoomsRestrictedTestCase(unittest.HomeserverTestCase): self.assertEqual(channel.code, 200, channel.result) +class PublicRoomsTestRemoteSearchFallbackTestCase(unittest.HomeserverTestCase): + """Test that we correctly fallback to local filtering if a remote server + doesn't support search. + """ + + servlets = [ + synapse.rest.admin.register_servlets_for_client_rest_resource, + room.register_servlets, + login.register_servlets, + ] + + def make_homeserver(self, reactor, clock): + return self.setup_test_homeserver(federation_client=Mock()) + + def prepare(self, reactor, clock, hs): + self.register_user("user", "pass") + self.token = self.login("user", "pass") + + self.federation_client = hs.get_federation_client() + + def test_simple(self): + "Simple test for searching rooms over federation" + self.federation_client.get_public_rooms.side_effect = ( + lambda *a, **k: defer.succeed({}) + ) + + search_filter = {"generic_search_term": "foobar"} + + channel = self.make_request( + "POST", + b"/_matrix/client/r0/publicRooms?server=testserv", + content={"filter": search_filter}, + access_token=self.token, + ) + self.assertEqual(channel.code, 200, channel.result) + + self.federation_client.get_public_rooms.assert_called_once_with( + "testserv", + limit=100, + since_token=None, + search_filter=search_filter, + include_all_networks=False, + third_party_instance_id=None, + ) + + def test_fallback(self): + "Test that searching public rooms over federation falls back if it gets a 404" + + # The `get_public_rooms` should be called again if the first call fails + # with a 404, when using search filters. + self.federation_client.get_public_rooms.side_effect = ( + HttpResponseException(404, "Not Found", b""), + defer.succeed({}), + ) + + search_filter = {"generic_search_term": "foobar"} + + channel = self.make_request( + "POST", + b"/_matrix/client/r0/publicRooms?server=testserv", + content={"filter": search_filter}, + access_token=self.token, + ) + self.assertEqual(channel.code, 200, channel.result) + + self.federation_client.get_public_rooms.assert_has_calls( + [ + call( + "testserv", + limit=100, + since_token=None, + search_filter=search_filter, + include_all_networks=False, + third_party_instance_id=None, + ), + call( + "testserv", + limit=None, + since_token=None, + search_filter=None, + include_all_networks=False, + third_party_instance_id=None, + ), + ] + ) + + class PerRoomProfilesForbiddenTestCase(unittest.HomeserverTestCase): servlets = [ -- cgit 1.5.1 From 7de445161f2fec115ce8518cde7a3b333a611f16 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Mon, 16 Aug 2021 08:06:17 -0400 Subject: Support federation in the new spaces summary API (MSC2946). (#10569) --- changelog.d/10569.feature | 1 + synapse/federation/federation_client.py | 82 +++++++++ synapse/federation/transport/client.py | 22 +++ synapse/federation/transport/server.py | 28 +++ synapse/handlers/space_summary.py | 258 +++++++++++++++++++++++----- tests/handlers/test_space_summary.py | 292 ++++++++++++++++++-------------- 6 files changed, 518 insertions(+), 165 deletions(-) create mode 100644 changelog.d/10569.feature (limited to 'synapse/federation/federation_client.py') diff --git a/changelog.d/10569.feature b/changelog.d/10569.feature new file mode 100644 index 0000000000..ffc4e4289c --- /dev/null +++ b/changelog.d/10569.feature @@ -0,0 +1 @@ +Add pagination to the spaces summary based on updates to [MSC2946](https://github.com/matrix-org/matrix-doc/pull/2946). diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 2eefac04fd..0af953a5d6 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -1290,6 +1290,88 @@ class FederationClient(FederationBase): failover_on_unknown_endpoint=True, ) + async def get_room_hierarchy( + self, + destinations: Iterable[str], + room_id: str, + suggested_only: bool, + ) -> Tuple[JsonDict, Sequence[JsonDict], Sequence[str]]: + """ + Call other servers to get a hierarchy of the given room. + + Performs simple data validates and parsing of the response. + + Args: + destinations: The remote servers. We will try them in turn, omitting any + that have been blacklisted. + room_id: ID of the space to be queried + suggested_only: If true, ask the remote server to only return children + with the "suggested" flag set + + Returns: + A tuple of: + The room as a JSON dictionary. + A list of children rooms, as JSON dictionaries. + A list of inaccessible children room IDs. + + Raises: + SynapseError if we were unable to get a valid summary from any of the + remote servers + """ + + async def send_request( + destination: str, + ) -> Tuple[JsonDict, Sequence[JsonDict], Sequence[str]]: + res = await self.transport_layer.get_room_hierarchy( + destination=destination, + room_id=room_id, + suggested_only=suggested_only, + ) + + room = res.get("room") + if not isinstance(room, dict): + raise InvalidResponseError("'room' must be a dict") + + # Validate children_state of the room. + children_state = room.get("children_state", []) + if not isinstance(children_state, Sequence): + raise InvalidResponseError("'room.children_state' must be a list") + if any(not isinstance(e, dict) for e in children_state): + raise InvalidResponseError("Invalid event in 'children_state' list") + try: + [ + FederationSpaceSummaryEventResult.from_json_dict(e) + for e in children_state + ] + except ValueError as e: + raise InvalidResponseError(str(e)) + + # Validate the children rooms. + children = res.get("children", []) + if not isinstance(children, Sequence): + raise InvalidResponseError("'children' must be a list") + if any(not isinstance(r, dict) for r in children): + raise InvalidResponseError("Invalid room in 'children' list") + + # Validate the inaccessible children. + inaccessible_children = res.get("inaccessible_children", []) + if not isinstance(inaccessible_children, Sequence): + raise InvalidResponseError("'inaccessible_children' must be a list") + if any(not isinstance(r, str) for r in inaccessible_children): + raise InvalidResponseError( + "Invalid room ID in 'inaccessible_children' list" + ) + + return room, children, inaccessible_children + + # TODO Fallback to the old federation API and translate the results. + return await self._try_destination_list( + "fetch room hierarchy", + destinations, + send_request, + failover_on_unknown_endpoint=True, + ) + @attr.s(frozen=True, slots=True, auto_attribs=True) class FederationSpaceSummaryEventResult: diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 90a7c16b62..8b247fe206 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -1177,6 +1177,28 @@ class TransportLayerClient: destination=destination, path=path, data=params ) + async def get_room_hierarchy( + self, + destination: str, + room_id: str, + suggested_only: bool, + ) -> JsonDict: + """ + Args: + destination: The remote server + room_id: The room ID to ask about. + suggested_only: if True, only suggested rooms will be returned + """ + path = _create_path( + FEDERATION_UNSTABLE_PREFIX, "/org.matrix.msc2946/hierarchy/%s", room_id + ) + + return await self.client.get_json( + destination=destination, + path=path, + args={"suggested_only": "true" if suggested_only else "false"}, + ) + def _create_path(federation_prefix: str, path: str, *args: str) -> str: """ diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 640f46fff6..79a2e1afa0 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -1936,6 +1936,33 @@ class FederationSpaceSummaryServlet(BaseFederationServlet): ) +class FederationRoomHierarchyServlet(BaseFederationServlet): + PREFIX = FEDERATION_UNSTABLE_PREFIX + "/org.matrix.msc2946" + PATH = "/hierarchy/(?P[^/]*)" + + def __init__( + self, + hs: HomeServer, + authenticator: Authenticator, + ratelimiter: FederationRateLimiter, + server_name: str, + ): + super().__init__(hs, authenticator, ratelimiter, server_name) + self.handler = hs.get_space_summary_handler() + + async def on_GET( + self, + origin: str, + content: Literal[None], + query: Mapping[bytes, Sequence[bytes]], + room_id: str, + ) -> Tuple[int, JsonDict]: + suggested_only = parse_boolean_from_args(query, "suggested_only", default=False) + return 200, await self.handler.get_federation_hierarchy( + origin, room_id, suggested_only + ) + + class RoomComplexityServlet(BaseFederationServlet): """ Indicates to other servers how complex (and therefore likely @@ -1999,6 +2026,7 @@ FEDERATION_SERVLET_CLASSES: Tuple[Type[BaseFederationServlet], ...] = ( FederationVersionServlet, RoomComplexityServlet, FederationSpaceSummaryServlet, + FederationRoomHierarchyServlet, FederationV1SendKnockServlet, FederationMakeKnockServlet, ) diff --git a/synapse/handlers/space_summary.py b/synapse/handlers/space_summary.py index d0060f9046..c74e90abbc 100644 --- a/synapse/handlers/space_summary.py +++ b/synapse/handlers/space_summary.py @@ -16,17 +16,7 @@ import itertools import logging import re from collections import deque -from typing import ( - TYPE_CHECKING, - Deque, - Dict, - Iterable, - List, - Optional, - Sequence, - Set, - Tuple, -) +from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Sequence, Set, Tuple import attr @@ -80,7 +70,7 @@ class _PaginationSession: # The time the pagination session was created, in milliseconds. creation_time_ms: int # The queue of rooms which are still to process. - room_queue: Deque["_RoomQueueEntry"] + room_queue: List["_RoomQueueEntry"] # A set of rooms which have been processed. processed_rooms: Set[str] @@ -197,7 +187,7 @@ class SpaceSummaryHandler: events: Sequence[JsonDict] = [] if room_entry: rooms_result.append(room_entry.room) - events = room_entry.children + events = room_entry.children_state_events logger.debug( "Query of local room %s returned events %s", @@ -232,7 +222,7 @@ class SpaceSummaryHandler: room.pop("allowed_spaces", None) rooms_result.append(room) - events.extend(room_entry.children) + events.extend(room_entry.children_state_events) # All rooms returned don't need visiting again (even if the user # didn't have access to them). @@ -350,8 +340,8 @@ class SpaceSummaryHandler: room_queue = pagination_session.room_queue processed_rooms = pagination_session.processed_rooms else: - # the queue of rooms to process - room_queue = deque((_RoomQueueEntry(requested_room_id, ()),)) + # The queue of rooms to process, the next room is last on the stack. + room_queue = [_RoomQueueEntry(requested_room_id, ())] # Rooms we have already processed. processed_rooms = set() @@ -367,7 +357,7 @@ class SpaceSummaryHandler: # Iterate through the queue until we reach the limit or run out of # rooms to include. while room_queue and len(rooms_result) < limit: - queue_entry = room_queue.popleft() + queue_entry = room_queue.pop() room_id = queue_entry.room_id current_depth = queue_entry.depth if room_id in processed_rooms: @@ -376,6 +366,18 @@ class SpaceSummaryHandler: logger.debug("Processing room %s", room_id) + # A map of summaries for children rooms that might be returned over + # federation. The rationale for caching these and *maybe* using them + # is to prefer any information local to the homeserver before trusting + # data received over federation. + children_room_entries: Dict[str, JsonDict] = {} + # A set of room IDs which are children that did not have information + # returned over federation and are known to be inaccessible to the + # current server. We should not reach out over federation to try to + # summarise these rooms. + inaccessible_children: Set[str] = set() + + # If the room is known locally, summarise it! is_in_room = await self._store.is_host_joined(room_id, self._server_name) if is_in_room: room_entry = await self._summarize_local_room( @@ -387,26 +389,68 @@ class SpaceSummaryHandler: max_children=None, ) - if room_entry: - rooms_result.append(room_entry.as_json()) - - # Add the child to the queue. We have already validated - # that the vias are a list of server names. - # - # If the current depth is the maximum depth, do not queue - # more entries. - if max_depth is None or current_depth < max_depth: - room_queue.extendleft( - _RoomQueueEntry( - ev["state_key"], ev["content"]["via"], current_depth + 1 - ) - for ev in reversed(room_entry.children) - ) - - processed_rooms.add(room_id) + # Otherwise, attempt to use information for federation. else: - # TODO Federation. - pass + # A previous call might have included information for this room. + # It can be used if either: + # + # 1. The room is not a space. + # 2. The maximum depth has been achieved (since no children + # information is needed). + if queue_entry.remote_room and ( + queue_entry.remote_room.get("room_type") != RoomTypes.SPACE + or (max_depth is not None and current_depth >= max_depth) + ): + room_entry = _RoomEntry( + queue_entry.room_id, queue_entry.remote_room + ) + + # If the above isn't true, attempt to fetch the room + # information over federation. + else: + ( + room_entry, + children_room_entries, + inaccessible_children, + ) = await self._summarize_remote_room_hiearchy( + queue_entry, + suggested_only, + ) + + # Ensure this room is accessible to the requester (and not just + # the homeserver). + if room_entry and not await self._is_remote_room_accessible( + requester, queue_entry.room_id, room_entry.room + ): + room_entry = None + + # This room has been processed and should be ignored if it appears + # elsewhere in the hierarchy. + processed_rooms.add(room_id) + + # There may or may not be a room entry based on whether it is + # inaccessible to the requesting user. + if room_entry: + # Add the room (including the stripped m.space.child events). + rooms_result.append(room_entry.as_json()) + + # If this room is not at the max-depth, check if there are any + # children to process. + if max_depth is None or current_depth < max_depth: + # The children get added in reverse order so that the next + # room to process, according to the ordering, is the last + # item in the list. + room_queue.extend( + _RoomQueueEntry( + ev["state_key"], + ev["content"]["via"], + current_depth + 1, + children_room_entries.get(ev["state_key"]), + ) + for ev in reversed(room_entry.children_state_events) + if ev["type"] == EventTypes.SpaceChild + and ev["state_key"] not in inaccessible_children + ) result: JsonDict = {"rooms": rooms_result} @@ -477,15 +521,78 @@ class SpaceSummaryHandler: if room_entry: rooms_result.append(room_entry.room) - events_result.extend(room_entry.children) + events_result.extend(room_entry.children_state_events) # add any children to the queue room_queue.extend( - edge_event["state_key"] for edge_event in room_entry.children + edge_event["state_key"] + for edge_event in room_entry.children_state_events ) return {"rooms": rooms_result, "events": events_result} + async def get_federation_hierarchy( + self, + origin: str, + requested_room_id: str, + suggested_only: bool, + ): + """ + Implementation of the room hierarchy Federation API. + + This is similar to get_room_hierarchy, but does not recurse into the space. + It also considers whether anyone on the server may be able to access the + room, as opposed to whether a specific user can. + + Args: + origin: The server requesting the spaces summary. + requested_room_id: The room ID to start the hierarchy at (the "root" room). + suggested_only: whether we should only return children with the "suggested" + flag set. + + Returns: + The JSON hierarchy dictionary. + """ + root_room_entry = await self._summarize_local_room( + None, origin, requested_room_id, suggested_only, max_children=None + ) + if root_room_entry is None: + # Room is inaccessible to the requesting server. + raise SynapseError(404, "Unknown room: %s" % (requested_room_id,)) + + children_rooms_result: List[JsonDict] = [] + inaccessible_children: List[str] = [] + + # Iterate through each child and potentially add it, but not its children, + # to the response. + for child_room in root_room_entry.children_state_events: + room_id = child_room.get("state_key") + assert isinstance(room_id, str) + # If the room is unknown, skip it. + if not await self._store.is_host_joined(room_id, self._server_name): + continue + + room_entry = await self._summarize_local_room( + None, origin, room_id, suggested_only, max_children=0 + ) + # If the room is accessible, include it in the results. + # + # Note that only the room summary (without information on children) + # is included in the summary. + if room_entry: + children_rooms_result.append(room_entry.room) + # Otherwise, note that the requesting server shouldn't bother + # trying to summarize this room - they do not have access to it. + else: + inaccessible_children.append(room_id) + + return { + # Include the requested room (including the stripped children events). + "room": root_room_entry.as_json(), + "children": children_rooms_result, + "inaccessible_children": inaccessible_children, + } + async def _summarize_local_room( self, requester: Optional[str], @@ -519,8 +626,9 @@ class SpaceSummaryHandler: room_entry = await self._build_room_entry(room_id, for_federation=bool(origin)) - # If the room is not a space, return just the room information. - if room_entry.get("room_type") != RoomTypes.SPACE: + # If the room is not a space or the children don't matter, return just + # the room information. + if room_entry.get("room_type") != RoomTypes.SPACE or max_children == 0: return _RoomEntry(room_id, room_entry) # Otherwise, look for child rooms/spaces. @@ -616,6 +724,59 @@ class SpaceSummaryHandler: return results + async def _summarize_remote_room_hiearchy( + self, room: "_RoomQueueEntry", suggested_only: bool + ) -> Tuple[Optional["_RoomEntry"], Dict[str, JsonDict], Set[str]]: + """ + Request room entries and a list of event entries for a given room by querying a remote server. + + Args: + room: The room to summarize. + suggested_only: True if only suggested children should be returned. + Otherwise, all children are returned. + + Returns: + A tuple of: + The room entry. + Partial room data return over federation. + A set of inaccessible children room IDs. + """ + room_id = room.room_id + logger.info("Requesting summary for %s via %s", room_id, room.via) + + via = itertools.islice(room.via, MAX_SERVERS_PER_SPACE) + try: + ( + room_response, + children, + inaccessible_children, + ) = await self._federation_client.get_room_hierarchy( + via, + room_id, + suggested_only=suggested_only, + ) + except Exception as e: + logger.warning( + "Unable to get hierarchy of %s via federation: %s", + room_id, + e, + exc_info=logger.isEnabledFor(logging.DEBUG), + ) + return None, {}, set() + + # Map the children to their room ID. + children_by_room_id = { + c["room_id"]: c + for c in children + if "room_id" in c and isinstance(c["room_id"], str) + } + + return ( + _RoomEntry(room_id, room_response, room_response.pop("children_state", ())), + children_by_room_id, + set(inaccessible_children), + ) + async def _is_local_room_accessible( self, room_id: str, requester: Optional[str], origin: Optional[str] = None ) -> bool: @@ -866,9 +1027,16 @@ class SpaceSummaryHandler: @attr.s(frozen=True, slots=True, auto_attribs=True) class _RoomQueueEntry: + # The room ID of this entry. room_id: str + # The server to query if the room is not known locally. via: Sequence[str] + # The minimum number of hops necessary to get to this room (compared to the + # originally requested room). depth: int = 0 + # The room summary for this room returned via federation. This will only be + # used if the room is not known locally (and is not a space). + remote_room: Optional[JsonDict] = None @attr.s(frozen=True, slots=True, auto_attribs=True) @@ -879,11 +1047,17 @@ class _RoomEntry: # An iterable of the sorted, stripped children events for children of this room. # # This may not include all children. - children: Sequence[JsonDict] = () + children_state_events: Sequence[JsonDict] = () def as_json(self) -> JsonDict: + """ + Returns a JSON dictionary suitable for the room hierarchy endpoint. + + It returns the room summary including the stripped m.space.child events + as a sub-key. + """ result = dict(self.room) - result["children_state"] = self.children + result["children_state"] = self.children_state_events return result diff --git a/tests/handlers/test_space_summary.py b/tests/handlers/test_space_summary.py index 83c2bdd8f9..bc8e131f4a 100644 --- a/tests/handlers/test_space_summary.py +++ b/tests/handlers/test_space_summary.py @@ -481,7 +481,7 @@ class SpaceSummaryTestCase(unittest.HomeserverTestCase): self.assertNotIn("next_batch", result) def test_invalid_pagination_token(self): - """""" + """An invalid pagination token, or changing other parameters, shoudl be rejected.""" room_ids = [] for i in range(1, 10): room = self.helper.create_room_as(self.user, tok=self.token) @@ -581,33 +581,40 @@ class SpaceSummaryTestCase(unittest.HomeserverTestCase): subspace = "#subspace:" + fed_hostname subroom = "#subroom:" + fed_hostname + # Generate some good data, and some bad data: + # + # * Event *back* to the root room. + # * Unrelated events / rooms + # * Multiple levels of events (in a not-useful order, e.g. grandchild + # events before child events). + + # Note that these entries are brief, but should contain enough info. + requested_room_entry = _RoomEntry( + subspace, + { + "room_id": subspace, + "world_readable": True, + "room_type": RoomTypes.SPACE, + }, + [ + { + "type": EventTypes.SpaceChild, + "room_id": subspace, + "state_key": subroom, + "content": {"via": [fed_hostname]}, + } + ], + ) + child_room = { + "room_id": subroom, + "world_readable": True, + } + async def summarize_remote_room( _self, room, suggested_only, max_children, exclude_rooms ): - # Return some good data, and some bad data: - # - # * Event *back* to the root room. - # * Unrelated events / rooms - # * Multiple levels of events (in a not-useful order, e.g. grandchild - # events before child events). - - # Note that these entries are brief, but should contain enough info. return [ - _RoomEntry( - subspace, - { - "room_id": subspace, - "world_readable": True, - "room_type": RoomTypes.SPACE, - }, - [ - { - "room_id": subspace, - "state_key": subroom, - "content": {"via": [fed_hostname]}, - } - ], - ), + requested_room_entry, _RoomEntry( subroom, { @@ -617,6 +624,9 @@ class SpaceSummaryTestCase(unittest.HomeserverTestCase): ), ] + async def summarize_remote_room_hiearchy(_self, room, suggested_only): + return requested_room_entry, {subroom: child_room}, set() + # Add a room to the space which is on another server. self._add_child(self.space, subspace, self.token) @@ -636,6 +646,15 @@ class SpaceSummaryTestCase(unittest.HomeserverTestCase): ] self._assert_rooms(result, expected) + with mock.patch( + "synapse.handlers.space_summary.SpaceSummaryHandler._summarize_remote_room_hiearchy", + new=summarize_remote_room_hiearchy, + ): + result = self.get_success( + self.handler.get_room_hierarchy(self.user, self.space) + ) + self._assert_hierarchy(result, expected) + def test_fed_filtering(self): """ Rooms returned over federation should be properly filtered to only include @@ -657,100 +676,106 @@ class SpaceSummaryTestCase(unittest.HomeserverTestCase): # Poke an invite over federation into the database. self._poke_fed_invite(invited_room, "@remote:" + fed_hostname) + # Note that these entries are brief, but should contain enough info. + children_rooms = ( + ( + public_room, + { + "room_id": public_room, + "world_readable": False, + "join_rules": JoinRules.PUBLIC, + }, + ), + ( + knock_room, + { + "room_id": knock_room, + "world_readable": False, + "join_rules": JoinRules.KNOCK, + }, + ), + ( + not_invited_room, + { + "room_id": not_invited_room, + "world_readable": False, + "join_rules": JoinRules.INVITE, + }, + ), + ( + invited_room, + { + "room_id": invited_room, + "world_readable": False, + "join_rules": JoinRules.INVITE, + }, + ), + ( + restricted_room, + { + "room_id": restricted_room, + "world_readable": False, + "join_rules": JoinRules.RESTRICTED, + "allowed_spaces": [], + }, + ), + ( + restricted_accessible_room, + { + "room_id": restricted_accessible_room, + "world_readable": False, + "join_rules": JoinRules.RESTRICTED, + "allowed_spaces": [self.room], + }, + ), + ( + world_readable_room, + { + "room_id": world_readable_room, + "world_readable": True, + "join_rules": JoinRules.INVITE, + }, + ), + ( + joined_room, + { + "room_id": joined_room, + "world_readable": False, + "join_rules": JoinRules.INVITE, + }, + ), + ) + + subspace_room_entry = _RoomEntry( + subspace, + { + "room_id": subspace, + "world_readable": True, + }, + # Place each room in the sub-space. + [ + { + "type": EventTypes.SpaceChild, + "room_id": subspace, + "state_key": room_id, + "content": {"via": [fed_hostname]}, + } + for room_id, _ in children_rooms + ], + ) + async def summarize_remote_room( _self, room, suggested_only, max_children, exclude_rooms ): - # Note that these entries are brief, but should contain enough info. - rooms = [ - _RoomEntry( - public_room, - { - "room_id": public_room, - "world_readable": False, - "join_rules": JoinRules.PUBLIC, - }, - ), - _RoomEntry( - knock_room, - { - "room_id": knock_room, - "world_readable": False, - "join_rules": JoinRules.KNOCK, - }, - ), - _RoomEntry( - not_invited_room, - { - "room_id": not_invited_room, - "world_readable": False, - "join_rules": JoinRules.INVITE, - }, - ), - _RoomEntry( - invited_room, - { - "room_id": invited_room, - "world_readable": False, - "join_rules": JoinRules.INVITE, - }, - ), - _RoomEntry( - restricted_room, - { - "room_id": restricted_room, - "world_readable": False, - "join_rules": JoinRules.RESTRICTED, - "allowed_spaces": [], - }, - ), - _RoomEntry( - restricted_accessible_room, - { - "room_id": restricted_accessible_room, - "world_readable": False, - "join_rules": JoinRules.RESTRICTED, - "allowed_spaces": [self.room], - }, - ), - _RoomEntry( - world_readable_room, - { - "room_id": world_readable_room, - "world_readable": True, - "join_rules": JoinRules.INVITE, - }, - ), - _RoomEntry( - joined_room, - { - "room_id": joined_room, - "world_readable": False, - "join_rules": JoinRules.INVITE, - }, - ), + return [subspace_room_entry] + [ + # A copy is made of the room data since the allowed_spaces key + # is removed. + _RoomEntry(child_room[0], dict(child_room[1])) + for child_room in children_rooms ] - # Also include the subspace. - rooms.insert( - 0, - _RoomEntry( - subspace, - { - "room_id": subspace, - "world_readable": True, - }, - # Place each room in the sub-space. - [ - { - "room_id": subspace, - "state_key": room.room_id, - "content": {"via": [fed_hostname]}, - } - for room in rooms - ], - ), - ) - return rooms + async def summarize_remote_room_hiearchy(_self, room, suggested_only): + return subspace_room_entry, dict(children_rooms), set() # Add a room to the space which is on another server. self._add_child(self.space, subspace, self.token) @@ -788,6 +813,15 @@ class SpaceSummaryTestCase(unittest.HomeserverTestCase): ] self._assert_rooms(result, expected) + with mock.patch( + "synapse.handlers.space_summary.SpaceSummaryHandler._summarize_remote_room_hiearchy", + new=summarize_remote_room_hiearchy, + ): + result = self.get_success( + self.handler.get_room_hierarchy(self.user, self.space) + ) + self._assert_hierarchy(result, expected) + def test_fed_invited(self): """ A room which the user was invited to should be included in the response. @@ -802,19 +836,22 @@ class SpaceSummaryTestCase(unittest.HomeserverTestCase): # Poke an invite over federation into the database. self._poke_fed_invite(fed_room, "@remote:" + fed_hostname) + fed_room_entry = _RoomEntry( + fed_room, + { + "room_id": fed_room, + "world_readable": False, + "join_rules": JoinRules.INVITE, + }, + ) + async def summarize_remote_room( _self, room, suggested_only, max_children, exclude_rooms ): - return [ - _RoomEntry( - fed_room, - { - "room_id": fed_room, - "world_readable": False, - "join_rules": JoinRules.INVITE, - }, - ), - ] + return [fed_room_entry] + + async def summarize_remote_room_hiearchy(_self, room, suggested_only): + return fed_room_entry, {}, set() # Add a room to the space which is on another server. self._add_child(self.space, fed_room, self.token) @@ -833,3 +870,12 @@ class SpaceSummaryTestCase(unittest.HomeserverTestCase): (fed_room, ()), ] self._assert_rooms(result, expected) + + with mock.patch( + "synapse.handlers.space_summary.SpaceSummaryHandler._summarize_remote_room_hiearchy", + new=summarize_remote_room_hiearchy, + ): + result = self.get_success( + self.handler.get_room_hierarchy(self.user, self.space) + ) + self._assert_hierarchy(result, expected) -- cgit 1.5.1 From c4cf0c047329e125f0940281fd53688474d26581 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 17 Aug 2021 08:19:12 -0400 Subject: Attempt to pull from the legacy spaces summary API over federation. (#10583) If the new /hierarchy API does not exist on all destinations, fallback to querying the /spaces API and translating the results. This is a backwards compatibility hack since not all of the federated homeservers will update at the same time. --- changelog.d/10583.feature | 1 + synapse/federation/federation_client.py | 64 ++++++++++++++++++++++++++++----- 2 files changed, 56 insertions(+), 9 deletions(-) create mode 100644 changelog.d/10583.feature (limited to 'synapse/federation/federation_client.py') diff --git a/changelog.d/10583.feature b/changelog.d/10583.feature new file mode 100644 index 0000000000..ffc4e4289c --- /dev/null +++ b/changelog.d/10583.feature @@ -0,0 +1 @@ +Add pagination to the spaces summary based on updates to [MSC2946](https://github.com/matrix-org/matrix-doc/pull/2946). diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 0af953a5d6..29979414e3 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -1364,13 +1364,59 @@ class FederationClient(FederationBase): return room, children, inaccessible_children - # TODO Fallback to the old federation API and translate the results. - return await self._try_destination_list( - "fetch room hierarchy", - destinations, - send_request, - failover_on_unknown_endpoint=True, - ) + try: + return await self._try_destination_list( + "fetch room hierarchy", + destinations, + send_request, + failover_on_unknown_endpoint=True, + ) + except SynapseError as e: + # Fallback to the old federation API and translate the results if + # no servers implement the new API. + # + # The algorithm below is a bit inefficient as it only attempts to + # get information for the requested room, but the legacy API may + # return additional layers. + if e.code == 502: + legacy_result = await self.get_space_summary( + destinations, + room_id, + suggested_only, + max_rooms_per_space=None, + exclude_rooms=[], + ) + + # Find the requested room in the response (and remove it). + for _i, room in enumerate(legacy_result.rooms): + if room.get("room_id") == room_id: + break + else: + # The requested room was not returned, nothing we can do. + raise + requested_room = legacy_result.rooms.pop(_i) + + # Find any children events of the requested room. + children_events = [] + children_room_ids = set() + for event in legacy_result.events: + if event.room_id == room_id: + children_events.append(event.data) + children_room_ids.add(event.state_key) + # And add them under the requested room. + requested_room["children_state"] = children_events + + # Find the children rooms. + children = [] + for room in legacy_result.rooms: + if room.get("room_id") in children_room_ids: + children.append(room) + + # It isn't clear from the response whether some of the rooms are + # not accessible. + return requested_room, children, () + + raise @attr.s(frozen=True, slots=True, auto_attribs=True) @@ -1430,7 +1476,7 @@ class FederationSpaceSummaryEventResult: class FederationSpaceSummaryResult: """Represents the data returned by a successful get_space_summary call.""" - rooms: Sequence[JsonDict] + rooms: List[JsonDict] events: Sequence[FederationSpaceSummaryEventResult] @classmethod @@ -1444,7 +1490,7 @@ class FederationSpaceSummaryResult: ValueError if d is not a valid /spaces/ response """ rooms = d.get("rooms") - if not isinstance(rooms, Sequence): + if not isinstance(rooms, List): raise ValueError("'rooms' must be a list") if any(not isinstance(r, dict) for r in rooms): raise ValueError("Invalid room in 'rooms' list") -- cgit 1.5.1 From 31dac7ffeeb02f68d1dbe068fd241239e02208dc Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Mon, 23 Aug 2021 08:00:25 -0400 Subject: Do not include stack traces for known exceptions when trying multiple federation destinations. (#10662) --- changelog.d/10662.misc | 1 + synapse/federation/federation_client.py | 7 ++++++- 2 files changed, 7 insertions(+), 1 deletion(-) create mode 100644 changelog.d/10662.misc (limited to 'synapse/federation/federation_client.py') diff --git a/changelog.d/10662.misc b/changelog.d/10662.misc new file mode 100644 index 0000000000..593f9ceaad --- /dev/null +++ b/changelog.d/10662.misc @@ -0,0 +1 @@ +Do not print out stack traces for network errors when fetching data over federation. diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 29979414e3..44d9e8a5c7 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -43,6 +43,7 @@ from synapse.api.errors import ( Codes, FederationDeniedError, HttpResponseException, + RequestSendFailed, SynapseError, UnsupportedRoomVersionError, ) @@ -558,7 +559,11 @@ class FederationClient(FederationBase): try: return await callback(destination) - except InvalidResponseError as e: + except ( + RequestSendFailed, + InvalidResponseError, + NotRetryingDestination, + ) as e: logger.warning("Failed to %s via %s: %s", description, destination, e) except UnsupportedRoomVersionError: raise -- cgit 1.5.1 From 5548fe097881b543cba37c7cda27ff7efe55025d Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 26 Aug 2021 07:16:53 -0400 Subject: Cache the result of fetching the room hierarchy over federation. (#10647) --- changelog.d/10647.misc | 1 + synapse/federation/federation_client.py | 106 ++++++++++++++++++++------------ 2 files changed, 67 insertions(+), 40 deletions(-) create mode 100644 changelog.d/10647.misc (limited to 'synapse/federation/federation_client.py') diff --git a/changelog.d/10647.misc b/changelog.d/10647.misc new file mode 100644 index 0000000000..4407a9030d --- /dev/null +++ b/changelog.d/10647.misc @@ -0,0 +1 @@ +Improve the performance of the `/hierarchy` API (from [MSC2946](https://github.com/matrix-org/matrix-doc/pull/2946)) by caching responses received over federation. diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 44d9e8a5c7..1416abd0fb 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -111,6 +111,23 @@ class FederationClient(FederationBase): reset_expiry_on_get=False, ) + # A cache for fetching the room hierarchy over federation. + # + # Some stale data over federation is OK, but must be refreshed + # periodically since the local server is in the room. + # + # It is a map of (room ID, suggested-only) -> the response of + # get_room_hierarchy. + self._get_room_hierarchy_cache: ExpiringCache[ + Tuple[str, bool], Tuple[JsonDict, Sequence[JsonDict], Sequence[str]] + ] = ExpiringCache( + cache_name="get_room_hierarchy_cache", + clock=self._clock, + max_len=1000, + expiry_ms=5 * 60 * 1000, + reset_expiry_on_get=False, + ) + def _clear_tried_cache(self): """Clear pdu_destination_tried cache""" now = self._clock.time_msec() @@ -1324,6 +1341,10 @@ class FederationClient(FederationBase): remote servers """ + cached_result = self._get_room_hierarchy_cache.get((room_id, suggested_only)) + if cached_result: + return cached_result + async def send_request( destination: str, ) -> Tuple[JsonDict, Sequence[JsonDict], Sequence[str]]: @@ -1370,58 +1391,63 @@ class FederationClient(FederationBase): return room, children, inaccessible_children try: - return await self._try_destination_list( + result = await self._try_destination_list( "fetch room hierarchy", destinations, send_request, failover_on_unknown_endpoint=True, ) except SynapseError as e: + # If an unexpected error occurred, re-raise it. + if e.code != 502: + raise + # Fallback to the old federation API and translate the results if # no servers implement the new API. # # The algorithm below is a bit inefficient as it only attempts to - # get information for the requested room, but the legacy API may + # parse information for the requested room, but the legacy API may # return additional layers. - if e.code == 502: - legacy_result = await self.get_space_summary( - destinations, - room_id, - suggested_only, - max_rooms_per_space=None, - exclude_rooms=[], - ) + legacy_result = await self.get_space_summary( + destinations, + room_id, + suggested_only, + max_rooms_per_space=None, + exclude_rooms=[], + ) - # Find the requested room in the response (and remove it). - for _i, room in enumerate(legacy_result.rooms): - if room.get("room_id") == room_id: - break - else: - # The requested room was not returned, nothing we can do. - raise - requested_room = legacy_result.rooms.pop(_i) - - # Find any children events of the requested room. - children_events = [] - children_room_ids = set() - for event in legacy_result.events: - if event.room_id == room_id: - children_events.append(event.data) - children_room_ids.add(event.state_key) - # And add them under the requested room. - requested_room["children_state"] = children_events - - # Find the children rooms. - children = [] - for room in legacy_result.rooms: - if room.get("room_id") in children_room_ids: - children.append(room) - - # It isn't clear from the response whether some of the rooms are - # not accessible. - return requested_room, children, () - - raise + # Find the requested room in the response (and remove it). + for _i, room in enumerate(legacy_result.rooms): + if room.get("room_id") == room_id: + break + else: + # The requested room was not returned, nothing we can do. + raise + requested_room = legacy_result.rooms.pop(_i) + + # Find any children events of the requested room. + children_events = [] + children_room_ids = set() + for event in legacy_result.events: + if event.room_id == room_id: + children_events.append(event.data) + children_room_ids.add(event.state_key) + # And add them under the requested room. + requested_room["children_state"] = children_events + + # Find the children rooms. + children = [] + for room in legacy_result.rooms: + if room.get("room_id") in children_room_ids: + children.append(room) + + # It isn't clear from the response whether some of the rooms are + # not accessible. + result = (requested_room, children, ()) + + # Cache the result to avoid fetching data over federation every time. + self._get_room_hierarchy_cache[(room_id, suggested_only)] = result + return result @attr.s(frozen=True, slots=True, auto_attribs=True) -- cgit 1.5.1