diff options
author | Patrick Cloke <clokep@users.noreply.github.com> | 2021-08-24 08:14:03 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-08-24 08:14:03 -0400 |
commit | d12ba52f178982ecb47207471bee14472f9597b6 (patch) | |
tree | 13a2ad91991cfd372c3ee166188965ad3f92fcdd /synapse/handlers/room_summary.py | |
parent | Correctly initialise the `synapse_user_logins` metric. (#10677) (diff) | |
download | synapse-d12ba52f178982ecb47207471bee14472f9597b6.tar.xz |
Persist room hierarchy pagination sessions to the database. (#10613)
Diffstat (limited to 'synapse/handlers/room_summary.py')
-rw-r--r-- | synapse/handlers/room_summary.py | 76 |
1 files changed, 38 insertions, 38 deletions
diff --git a/synapse/handlers/room_summary.py b/synapse/handlers/room_summary.py index ac6cfc0da9..906985c754 100644 --- a/synapse/handlers/room_summary.py +++ b/synapse/handlers/room_summary.py @@ -28,12 +28,11 @@ from synapse.api.constants import ( Membership, RoomTypes, ) -from synapse.api.errors import AuthError, Codes, NotFoundError, SynapseError +from synapse.api.errors import AuthError, Codes, NotFoundError, StoreError, SynapseError from synapse.events import EventBase from synapse.events.utils import format_event_for_client_v2 from synapse.types import JsonDict from synapse.util.caches.response_cache import ResponseCache -from synapse.util.stringutils import random_string if TYPE_CHECKING: from synapse.server import HomeServer @@ -76,6 +75,9 @@ class _PaginationSession: class RoomSummaryHandler: + # A unique key used for pagination sessions for the room hierarchy endpoint. + _PAGINATION_SESSION_TYPE = "room_hierarchy_pagination" + # The time a pagination session remains valid for. _PAGINATION_SESSION_VALIDITY_PERIOD_MS = 5 * 60 * 1000 @@ -87,12 +89,6 @@ class RoomSummaryHandler: self._server_name = hs.hostname self._federation_client = hs.get_federation_client() - # A map of query information to the current pagination state. - # - # TODO Allow for multiple workers to share this data. - # TODO Expire pagination tokens. - self._pagination_sessions: Dict[_PaginationKey, _PaginationSession] = {} - # If a user tries to fetch the same page multiple times in quick succession, # only process the first attempt and return its result to subsequent requests. self._pagination_response_cache: ResponseCache[ @@ -102,21 +98,6 @@ class RoomSummaryHandler: "get_room_hierarchy", ) - def _expire_pagination_sessions(self): - """Expire pagination session which are old.""" - expire_before = ( - self._clock.time_msec() - self._PAGINATION_SESSION_VALIDITY_PERIOD_MS - ) - to_expire = [] - - for key, value in self._pagination_sessions.items(): - if value.creation_time_ms < expire_before: - to_expire.append(key) - - for key in to_expire: - logger.debug("Expiring pagination session id %s", key) - del self._pagination_sessions[key] - async def get_space_summary( self, requester: str, @@ -327,18 +308,29 @@ class RoomSummaryHandler: # If this is continuing a previous session, pull the persisted data. if from_token: - self._expire_pagination_sessions() + try: + pagination_session = await self._store.get_session( + session_type=self._PAGINATION_SESSION_TYPE, + session_id=from_token, + ) + except StoreError: + raise SynapseError(400, "Unknown pagination token", Codes.INVALID_PARAM) - pagination_key = _PaginationKey( - requested_room_id, suggested_only, max_depth, from_token - ) - if pagination_key not in self._pagination_sessions: + # If the requester, room ID, suggested-only, or max depth were modified + # the session is invalid. + if ( + requester != pagination_session["requester"] + or requested_room_id != pagination_session["room_id"] + or suggested_only != pagination_session["suggested_only"] + or max_depth != pagination_session["max_depth"] + ): raise SynapseError(400, "Unknown pagination token", Codes.INVALID_PARAM) # Load the previous state. - pagination_session = self._pagination_sessions[pagination_key] - room_queue = pagination_session.room_queue - processed_rooms = pagination_session.processed_rooms + room_queue = [ + _RoomQueueEntry(*fields) for fields in pagination_session["room_queue"] + ] + processed_rooms = set(pagination_session["processed_rooms"]) else: # The queue of rooms to process, the next room is last on the stack. room_queue = [_RoomQueueEntry(requested_room_id, ())] @@ -456,13 +448,21 @@ class RoomSummaryHandler: # If there's additional data, generate a pagination token (and persist state). if room_queue: - next_batch = random_string(24) - result["next_batch"] = next_batch - pagination_key = _PaginationKey( - requested_room_id, suggested_only, max_depth, next_batch - ) - self._pagination_sessions[pagination_key] = _PaginationSession( - self._clock.time_msec(), room_queue, processed_rooms + result["next_batch"] = await self._store.create_session( + session_type=self._PAGINATION_SESSION_TYPE, + value={ + # Information which must be identical across pagination. + "requester": requester, + "room_id": requested_room_id, + "suggested_only": suggested_only, + "max_depth": max_depth, + # The stored state. + "room_queue": [ + attr.astuple(room_entry) for room_entry in room_queue + ], + "processed_rooms": list(processed_rooms), + }, + expiry_ms=self._PAGINATION_SESSION_VALIDITY_PERIOD_MS, ) return result |