diff --git a/synapse/handlers/space_summary.py b/synapse/handlers/space_summary.py
index d04afe6c31..fd76c34695 100644
--- a/synapse/handlers/space_summary.py
+++ b/synapse/handlers/space_summary.py
@@ -18,7 +18,7 @@ import re
from collections import deque
from typing import (
TYPE_CHECKING,
- Collection,
+ Deque,
Dict,
Iterable,
List,
@@ -38,9 +38,12 @@ from synapse.api.constants import (
Membership,
RoomTypes,
)
+from synapse.api.errors import Codes, SynapseError
from synapse.events import EventBase
from synapse.events.utils import format_event_for_client_v2
from synapse.types import JsonDict
+from synapse.util.caches.response_cache import ResponseCache
+from synapse.util.stringutils import random_string
if TYPE_CHECKING:
from synapse.server import HomeServer
@@ -57,6 +60,29 @@ MAX_ROOMS_PER_SPACE = 50
MAX_SERVERS_PER_SPACE = 3
+@attr.s(slots=True, frozen=True, auto_attribs=True)
+class _PaginationKey:
+ """The key used to find unique pagination session."""
+
+ # The first three entries match the request parameters (and cannot change
+ # during a pagination session).
+ room_id: str
+ suggested_only: bool
+ max_depth: Optional[int]
+ # The randomly generated token.
+ token: str
+
+
+@attr.s(slots=True, frozen=True, auto_attribs=True)
+class _PaginationSession:
+ """The information that is stored for pagination."""
+
+ # The queue of rooms which are still to process.
+ room_queue: Deque["_RoomQueueEntry"]
+ # A set of rooms which have been processed.
+ processed_rooms: Set[str]
+
+
class SpaceSummaryHandler:
def __init__(self, hs: "HomeServer"):
self._clock = hs.get_clock()
@@ -67,6 +93,21 @@ class SpaceSummaryHandler:
self._server_name = hs.hostname
self._federation_client = hs.get_federation_client()
+ # A map of query information to the current pagination state.
+ #
+ # TODO Allow for multiple workers to share this data.
+ # TODO Expire pagination tokens.
+ self._pagination_sessions: Dict[_PaginationKey, _PaginationSession] = {}
+
+ # If a user tries to fetch the same page multiple times in quick succession,
+ # only process the first attempt and return its result to subsequent requests.
+ self._pagination_response_cache: ResponseCache[
+ Tuple[str, bool, Optional[int], Optional[int], Optional[str]]
+ ] = ResponseCache(
+ hs.get_clock(),
+ "get_room_hierarchy",
+ )
+
async def get_space_summary(
self,
requester: str,
@@ -130,7 +171,7 @@ class SpaceSummaryHandler:
requester, None, room_id, suggested_only, max_children
)
- events: Collection[JsonDict] = []
+ events: Sequence[JsonDict] = []
if room_entry:
rooms_result.append(room_entry.room)
events = room_entry.children
@@ -207,6 +248,154 @@ class SpaceSummaryHandler:
return {"rooms": rooms_result, "events": events_result}
+ async def get_room_hierarchy(
+ self,
+ requester: str,
+ requested_room_id: str,
+ suggested_only: bool = False,
+ max_depth: Optional[int] = None,
+ limit: Optional[int] = None,
+ from_token: Optional[str] = None,
+ ) -> JsonDict:
+ """
+ Implementation of the room hierarchy C-S API.
+
+ Args:
+ requester: The user ID of the user making this request.
+ requested_room_id: The room ID to start the hierarchy at (the "root" room).
+ suggested_only: Whether we should only return children with the "suggested"
+ flag set.
+ max_depth: The maximum depth in the tree to explore, must be a
+ non-negative integer.
+
+ 0 would correspond to just the root room, 1 would include just
+ the root room's children, etc.
+ limit: An optional limit on the number of rooms to return per
+ page. Must be a positive integer.
+ from_token: An optional pagination token.
+
+ Returns:
+ The JSON hierarchy dictionary.
+ """
+ # If a user tries to fetch the same page multiple times in quick succession,
+ # only process the first attempt and return its result to subsequent requests.
+ #
+ # This is due to the pagination process mutating internal state, attempting
+ # to process multiple requests for the same page will result in errors.
+ return await self._pagination_response_cache.wrap(
+ (requested_room_id, suggested_only, max_depth, limit, from_token),
+ self._get_room_hierarchy,
+ requester,
+ requested_room_id,
+ suggested_only,
+ max_depth,
+ limit,
+ from_token,
+ )
+
+ async def _get_room_hierarchy(
+ self,
+ requester: str,
+ requested_room_id: str,
+ suggested_only: bool = False,
+ max_depth: Optional[int] = None,
+ limit: Optional[int] = None,
+ from_token: Optional[str] = None,
+ ) -> JsonDict:
+ """See docstring for SpaceSummaryHandler.get_room_hierarchy."""
+
+ # first of all, check that the user is in the room in question (or it's
+ # world-readable)
+ await self._auth.check_user_in_room_or_world_readable(
+ requested_room_id, requester
+ )
+
+ # If this is continuing a previous session, pull the persisted data.
+ if from_token:
+ pagination_key = _PaginationKey(
+ requested_room_id, suggested_only, max_depth, from_token
+ )
+ if pagination_key not in self._pagination_sessions:
+ raise SynapseError(400, "Unknown pagination token", Codes.INVALID_PARAM)
+
+ # Load the previous state.
+ pagination_session = self._pagination_sessions[pagination_key]
+ room_queue = pagination_session.room_queue
+ processed_rooms = pagination_session.processed_rooms
+ else:
+ # the queue of rooms to process
+ room_queue = deque((_RoomQueueEntry(requested_room_id, ()),))
+
+ # Rooms we have already processed.
+ processed_rooms = set()
+
+ rooms_result: List[JsonDict] = []
+
+ # Cap the limit to a server-side maximum.
+ if limit is None:
+ limit = MAX_ROOMS
+ else:
+ limit = min(limit, MAX_ROOMS)
+
+ # Iterate through the queue until we reach the limit or run out of
+ # rooms to include.
+ while room_queue and len(rooms_result) < limit:
+ queue_entry = room_queue.popleft()
+ room_id = queue_entry.room_id
+ current_depth = queue_entry.depth
+ if room_id in processed_rooms:
+ # already done this room
+ continue
+
+ logger.debug("Processing room %s", room_id)
+
+ is_in_room = await self._store.is_host_joined(room_id, self._server_name)
+ if is_in_room:
+ room_entry = await self._summarize_local_room(
+ requester,
+ None,
+ room_id,
+ suggested_only,
+ # TODO Handle max children.
+ max_children=None,
+ )
+
+ if room_entry:
+ rooms_result.append(room_entry.as_json())
+
+ # Add the child to the queue. We have already validated
+ # that the vias are a list of server names.
+ #
+ # If the current depth is the maximum depth, do not queue
+ # more entries.
+ if max_depth is None or current_depth < max_depth:
+ room_queue.extendleft(
+ _RoomQueueEntry(
+ ev["state_key"], ev["content"]["via"], current_depth + 1
+ )
+ for ev in reversed(room_entry.children)
+ )
+
+ processed_rooms.add(room_id)
+ else:
+ # TODO Federation.
+ pass
+
+ result: JsonDict = {"rooms": rooms_result}
+
+ # If there's additional data, generate a pagination token (and persist state).
+ if room_queue:
+ next_token = random_string(24)
+ result["next_token"] = next_token
+ pagination_key = _PaginationKey(
+ requested_room_id, suggested_only, max_depth, next_token
+ )
+ self._pagination_sessions[pagination_key] = _PaginationSession(
+ room_queue, processed_rooms
+ )
+
+ return result
+
async def federation_space_summary(
self,
origin: str,
@@ -652,6 +841,7 @@ class SpaceSummaryHandler:
class _RoomQueueEntry:
room_id: str
via: Sequence[str]
+ depth: int = 0
@attr.s(frozen=True, slots=True, auto_attribs=True)
@@ -662,7 +852,12 @@ class _RoomEntry:
# An iterable of the sorted, stripped children events for children of this room.
#
# This may not include all children.
- children: Collection[JsonDict] = ()
+ children: Sequence[JsonDict] = ()
+
+ def as_json(self) -> JsonDict:
+ result = dict(self.room)
+ result["children_state"] = self.children
+ return result
def _has_valid_via(e: EventBase) -> bool:
|