summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/space_summary.py201
1 files changed, 198 insertions, 3 deletions
diff --git a/synapse/handlers/space_summary.py b/synapse/handlers/space_summary.py
index d04afe6c31..fd76c34695 100644
--- a/synapse/handlers/space_summary.py
+++ b/synapse/handlers/space_summary.py
@@ -18,7 +18,7 @@ import re
 from collections import deque
 from typing import (
     TYPE_CHECKING,
-    Collection,
+    Deque,
     Dict,
     Iterable,
     List,
@@ -38,9 +38,12 @@ from synapse.api.constants import (
     Membership,
     RoomTypes,
 )
+from synapse.api.errors import Codes, SynapseError
 from synapse.events import EventBase
 from synapse.events.utils import format_event_for_client_v2
 from synapse.types import JsonDict
+from synapse.util.caches.response_cache import ResponseCache
+from synapse.util.stringutils import random_string
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
@@ -57,6 +60,29 @@ MAX_ROOMS_PER_SPACE = 50
 MAX_SERVERS_PER_SPACE = 3
 
 
+@attr.s(slots=True, frozen=True, auto_attribs=True)
+class _PaginationKey:
+    """The key used to find unique pagination session."""
+
+    # The first three entries match the request parameters (and cannot change
+    # during a pagination session).
+    room_id: str
+    suggested_only: bool
+    max_depth: Optional[int]
+    # The randomly generated token.
+    token: str
+
+
+@attr.s(slots=True, frozen=True, auto_attribs=True)
+class _PaginationSession:
+    """The information that is stored for pagination."""
+
+    # The queue of rooms which are still to process.
+    room_queue: Deque["_RoomQueueEntry"]
+    # A set of rooms which have been processed.
+    processed_rooms: Set[str]
+
+
 class SpaceSummaryHandler:
     def __init__(self, hs: "HomeServer"):
         self._clock = hs.get_clock()
@@ -67,6 +93,21 @@ class SpaceSummaryHandler:
         self._server_name = hs.hostname
         self._federation_client = hs.get_federation_client()
 
+        # A map of query information to the current pagination state.
+        #
+        # TODO Allow for multiple workers to share this data.
+        # TODO Expire pagination tokens.
+        self._pagination_sessions: Dict[_PaginationKey, _PaginationSession] = {}
+
+        # If a user tries to fetch the same page multiple times in quick succession,
+        # only process the first attempt and return its result to subsequent requests.
+        self._pagination_response_cache: ResponseCache[
+            Tuple[str, bool, Optional[int], Optional[int], Optional[str]]
+        ] = ResponseCache(
+            hs.get_clock(),
+            "get_room_hierarchy",
+        )
+
     async def get_space_summary(
         self,
         requester: str,
@@ -130,7 +171,7 @@ class SpaceSummaryHandler:
                     requester, None, room_id, suggested_only, max_children
                 )
 
-                events: Collection[JsonDict] = []
+                events: Sequence[JsonDict] = []
                 if room_entry:
                     rooms_result.append(room_entry.room)
                     events = room_entry.children
@@ -207,6 +248,154 @@ class SpaceSummaryHandler:
 
         return {"rooms": rooms_result, "events": events_result}
 
+    async def get_room_hierarchy(
+        self,
+        requester: str,
+        requested_room_id: str,
+        suggested_only: bool = False,
+        max_depth: Optional[int] = None,
+        limit: Optional[int] = None,
+        from_token: Optional[str] = None,
+    ) -> JsonDict:
+        """
+        Implementation of the room hierarchy C-S API.
+
+        Args:
+            requester: The user ID of the user making this request.
+            requested_room_id: The room ID to start the hierarchy at (the "root" room).
+            suggested_only: Whether we should only return children with the "suggested"
+                flag set.
+            max_depth: The maximum depth in the tree to explore, must be a
+                non-negative integer.
+
+                0 would correspond to just the root room, 1 would include just
+                the root room's children, etc.
+            limit: An optional limit on the number of rooms to return per
+                page. Must be a positive integer.
+            from_token: An optional pagination token.
+
+        Returns:
+            The JSON hierarchy dictionary.
+        """
+        # If a user tries to fetch the same page multiple times in quick succession,
+        # only process the first attempt and return its result to subsequent requests.
+        #
+        # This is due to the pagination process mutating internal state, attempting
+        # to process multiple requests for the same page will result in errors.
+        return await self._pagination_response_cache.wrap(
+            (requested_room_id, suggested_only, max_depth, limit, from_token),
+            self._get_room_hierarchy,
+            requester,
+            requested_room_id,
+            suggested_only,
+            max_depth,
+            limit,
+            from_token,
+        )
+
+    async def _get_room_hierarchy(
+        self,
+        requester: str,
+        requested_room_id: str,
+        suggested_only: bool = False,
+        max_depth: Optional[int] = None,
+        limit: Optional[int] = None,
+        from_token: Optional[str] = None,
+    ) -> JsonDict:
+        """See docstring for SpaceSummaryHandler.get_room_hierarchy."""
+
+        # first of all, check that the user is in the room in question (or it's
+        # world-readable)
+        await self._auth.check_user_in_room_or_world_readable(
+            requested_room_id, requester
+        )
+
+        # If this is continuing a previous session, pull the persisted data.
+        if from_token:
+            pagination_key = _PaginationKey(
+                requested_room_id, suggested_only, max_depth, from_token
+            )
+            if pagination_key not in self._pagination_sessions:
+                raise SynapseError(400, "Unknown pagination token", Codes.INVALID_PARAM)
+
+            # Load the previous state.
+            pagination_session = self._pagination_sessions[pagination_key]
+            room_queue = pagination_session.room_queue
+            processed_rooms = pagination_session.processed_rooms
+        else:
+            # the queue of rooms to process
+            room_queue = deque((_RoomQueueEntry(requested_room_id, ()),))
+
+            # Rooms we have already processed.
+            processed_rooms = set()
+
+        rooms_result: List[JsonDict] = []
+
+        # Cap the limit to a server-side maximum.
+        if limit is None:
+            limit = MAX_ROOMS
+        else:
+            limit = min(limit, MAX_ROOMS)
+
+        # Iterate through the queue until we reach the limit or run out of
+        # rooms to include.
+        while room_queue and len(rooms_result) < limit:
+            queue_entry = room_queue.popleft()
+            room_id = queue_entry.room_id
+            current_depth = queue_entry.depth
+            if room_id in processed_rooms:
+                # already done this room
+                continue
+
+            logger.debug("Processing room %s", room_id)
+
+            is_in_room = await self._store.is_host_joined(room_id, self._server_name)
+            if is_in_room:
+                room_entry = await self._summarize_local_room(
+                    requester,
+                    None,
+                    room_id,
+                    suggested_only,
+                    # TODO Handle max children.
+                    max_children=None,
+                )
+
+                if room_entry:
+                    rooms_result.append(room_entry.as_json())
+
+                    # Add the child to the queue. We have already validated
+                    # that the vias are a list of server names.
+                    #
+                    # If the current depth is the maximum depth, do not queue
+                    # more entries.
+                    if max_depth is None or current_depth < max_depth:
+                        room_queue.extendleft(
+                            _RoomQueueEntry(
+                                ev["state_key"], ev["content"]["via"], current_depth + 1
+                            )
+                            for ev in reversed(room_entry.children)
+                        )
+
+                processed_rooms.add(room_id)
+            else:
+                # TODO Federation.
+                pass
+
+        result: JsonDict = {"rooms": rooms_result}
+
+        # If there's additional data, generate a pagination token (and persist state).
+        if room_queue:
+            next_token = random_string(24)
+            result["next_token"] = next_token
+            pagination_key = _PaginationKey(
+                requested_room_id, suggested_only, max_depth, next_token
+            )
+            self._pagination_sessions[pagination_key] = _PaginationSession(
+                room_queue, processed_rooms
+            )
+
+        return result
+
     async def federation_space_summary(
         self,
         origin: str,
@@ -652,6 +841,7 @@ class SpaceSummaryHandler:
 class _RoomQueueEntry:
     room_id: str
     via: Sequence[str]
+    depth: int = 0
 
 
 @attr.s(frozen=True, slots=True, auto_attribs=True)
@@ -662,7 +852,12 @@ class _RoomEntry:
     # An iterable of the sorted, stripped children events for children of this room.
     #
     # This may not include all children.
-    children: Collection[JsonDict] = ()
+    children: Sequence[JsonDict] = ()
+
+    def as_json(self) -> JsonDict:
+        result = dict(self.room)
+        result["children_state"] = self.children
+        return result
 
 
 def _has_valid_via(e: EventBase) -> bool: