diff options
Diffstat (limited to '')
-rw-r--r-- | changelog.d/10613.feature | 1 | ||||
-rw-r--r-- | mypy.ini | 1 | ||||
-rw-r--r-- | synapse/app/generic_worker.py | 2 | ||||
-rw-r--r-- | synapse/handlers/room_summary.py | 76 | ||||
-rw-r--r-- | synapse/storage/databases/main/__init__.py | 2 | ||||
-rw-r--r-- | synapse/storage/databases/main/session.py | 145 | ||||
-rw-r--r-- | synapse/storage/schema/main/delta/62/02session_store.sql | 23 |
7 files changed, 212 insertions, 38 deletions
diff --git a/changelog.d/10613.feature b/changelog.d/10613.feature new file mode 100644 index 0000000000..ffc4e4289c --- /dev/null +++ b/changelog.d/10613.feature @@ -0,0 +1 @@ +Add pagination to the spaces summary based on updates to [MSC2946](https://github.com/matrix-org/matrix-doc/pull/2946). diff --git a/mypy.ini b/mypy.ini index b17872211e..745e6b78eb 100644 --- a/mypy.ini +++ b/mypy.ini @@ -57,6 +57,7 @@ files = synapse/storage/databases/main/keys.py, synapse/storage/databases/main/pusher.py, synapse/storage/databases/main/registration.py, + synapse/storage/databases/main/session.py, synapse/storage/databases/main/stream.py, synapse/storage/databases/main/ui_auth.py, synapse/storage/database.py, diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index fd2626dbe1..9b71dd75e6 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -118,6 +118,7 @@ from synapse.storage.databases.main.monthly_active_users import ( from synapse.storage.databases.main.presence import PresenceStore from synapse.storage.databases.main.room import RoomWorkerStore from synapse.storage.databases.main.search import SearchStore +from synapse.storage.databases.main.session import SessionStore from synapse.storage.databases.main.stats import StatsStore from synapse.storage.databases.main.transactions import TransactionWorkerStore from synapse.storage.databases.main.ui_auth import UIAuthWorkerStore @@ -253,6 +254,7 @@ class GenericWorkerSlavedStore( SearchStore, TransactionWorkerStore, LockStore, + SessionStore, BaseSlavedStore, ): pass diff --git a/synapse/handlers/room_summary.py b/synapse/handlers/room_summary.py index ac6cfc0da9..906985c754 100644 --- a/synapse/handlers/room_summary.py +++ b/synapse/handlers/room_summary.py @@ -28,12 +28,11 @@ from synapse.api.constants import ( Membership, RoomTypes, ) -from synapse.api.errors import AuthError, Codes, NotFoundError, SynapseError +from synapse.api.errors import AuthError, Codes, NotFoundError, StoreError, SynapseError from synapse.events import EventBase from synapse.events.utils import format_event_for_client_v2 from synapse.types import JsonDict from synapse.util.caches.response_cache import ResponseCache -from synapse.util.stringutils import random_string if TYPE_CHECKING: from synapse.server import HomeServer @@ -76,6 +75,9 @@ class _PaginationSession: class RoomSummaryHandler: + # A unique key used for pagination sessions for the room hierarchy endpoint. + _PAGINATION_SESSION_TYPE = "room_hierarchy_pagination" + # The time a pagination session remains valid for. _PAGINATION_SESSION_VALIDITY_PERIOD_MS = 5 * 60 * 1000 @@ -87,12 +89,6 @@ class RoomSummaryHandler: self._server_name = hs.hostname self._federation_client = hs.get_federation_client() - # A map of query information to the current pagination state. - # - # TODO Allow for multiple workers to share this data. - # TODO Expire pagination tokens. - self._pagination_sessions: Dict[_PaginationKey, _PaginationSession] = {} - # If a user tries to fetch the same page multiple times in quick succession, # only process the first attempt and return its result to subsequent requests. self._pagination_response_cache: ResponseCache[ @@ -102,21 +98,6 @@ class RoomSummaryHandler: "get_room_hierarchy", ) - def _expire_pagination_sessions(self): - """Expire pagination session which are old.""" - expire_before = ( - self._clock.time_msec() - self._PAGINATION_SESSION_VALIDITY_PERIOD_MS - ) - to_expire = [] - - for key, value in self._pagination_sessions.items(): - if value.creation_time_ms < expire_before: - to_expire.append(key) - - for key in to_expire: - logger.debug("Expiring pagination session id %s", key) - del self._pagination_sessions[key] - async def get_space_summary( self, requester: str, @@ -327,18 +308,29 @@ class RoomSummaryHandler: # If this is continuing a previous session, pull the persisted data. if from_token: - self._expire_pagination_sessions() + try: + pagination_session = await self._store.get_session( + session_type=self._PAGINATION_SESSION_TYPE, + session_id=from_token, + ) + except StoreError: + raise SynapseError(400, "Unknown pagination token", Codes.INVALID_PARAM) - pagination_key = _PaginationKey( - requested_room_id, suggested_only, max_depth, from_token - ) - if pagination_key not in self._pagination_sessions: + # If the requester, room ID, suggested-only, or max depth were modified + # the session is invalid. + if ( + requester != pagination_session["requester"] + or requested_room_id != pagination_session["room_id"] + or suggested_only != pagination_session["suggested_only"] + or max_depth != pagination_session["max_depth"] + ): raise SynapseError(400, "Unknown pagination token", Codes.INVALID_PARAM) # Load the previous state. - pagination_session = self._pagination_sessions[pagination_key] - room_queue = pagination_session.room_queue - processed_rooms = pagination_session.processed_rooms + room_queue = [ + _RoomQueueEntry(*fields) for fields in pagination_session["room_queue"] + ] + processed_rooms = set(pagination_session["processed_rooms"]) else: # The queue of rooms to process, the next room is last on the stack. room_queue = [_RoomQueueEntry(requested_room_id, ())] @@ -456,13 +448,21 @@ class RoomSummaryHandler: # If there's additional data, generate a pagination token (and persist state). if room_queue: - next_batch = random_string(24) - result["next_batch"] = next_batch - pagination_key = _PaginationKey( - requested_room_id, suggested_only, max_depth, next_batch - ) - self._pagination_sessions[pagination_key] = _PaginationSession( - self._clock.time_msec(), room_queue, processed_rooms + result["next_batch"] = await self._store.create_session( + session_type=self._PAGINATION_SESSION_TYPE, + value={ + # Information which must be identical across pagination. + "requester": requester, + "room_id": requested_room_id, + "suggested_only": suggested_only, + "max_depth": max_depth, + # The stored state. + "room_queue": [ + attr.astuple(room_entry) for room_entry in room_queue + ], + "processed_rooms": list(processed_rooms), + }, + expiry_ms=self._PAGINATION_SESSION_VALIDITY_PERIOD_MS, ) return result diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py index 01b918e12e..00a644e8f7 100644 --- a/synapse/storage/databases/main/__init__.py +++ b/synapse/storage/databases/main/__init__.py @@ -63,6 +63,7 @@ from .relations import RelationsStore from .room import RoomStore from .roommember import RoomMemberStore from .search import SearchStore +from .session import SessionStore from .signatures import SignatureStore from .state import StateStore from .stats import StatsStore @@ -121,6 +122,7 @@ class DataStore( ServerMetricsStore, EventForwardExtremitiesStore, LockStore, + SessionStore, ): def __init__(self, database: DatabasePool, db_conn, hs): self.hs = hs diff --git a/synapse/storage/databases/main/session.py b/synapse/storage/databases/main/session.py new file mode 100644 index 0000000000..172f27d109 --- /dev/null +++ b/synapse/storage/databases/main/session.py @@ -0,0 +1,145 @@ +# -*- coding: utf-8 -*- +# Copyright 2021 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from typing import TYPE_CHECKING + +import synapse.util.stringutils as stringutils +from synapse.api.errors import StoreError +from synapse.metrics.background_process_metrics import wrap_as_background_process +from synapse.storage._base import SQLBaseStore, db_to_json +from synapse.storage.database import ( + DatabasePool, + LoggingDatabaseConnection, + LoggingTransaction, +) +from synapse.types import JsonDict +from synapse.util import json_encoder + +if TYPE_CHECKING: + from synapse.server import HomeServer + + +class SessionStore(SQLBaseStore): + """ + A store for generic session data. + + Each type of session should provide a unique type (to separate sessions). + + Sessions are automatically removed when they expire. + """ + + def __init__( + self, + database: DatabasePool, + db_conn: LoggingDatabaseConnection, + hs: "HomeServer", + ): + super().__init__(database, db_conn, hs) + + # Create a background job for culling expired sessions. + if hs.config.run_background_tasks: + self._clock.looping_call(self._delete_expired_sessions, 30 * 60 * 1000) + + async def create_session( + self, session_type: str, value: JsonDict, expiry_ms: int + ) -> str: + """ + Creates a new pagination session for the room hierarchy endpoint. + + Args: + session_type: The type for this session. + value: The value to store. + expiry_ms: How long before an item is evicted from the cache + in milliseconds. Default is 0, indicating items never get + evicted based on time. + + Returns: + The newly created session ID. + + Raises: + StoreError if a unique session ID cannot be generated. + """ + # autogen a session ID and try to create it. We may clash, so just + # try a few times till one goes through, giving up eventually. + attempts = 0 + while attempts < 5: + session_id = stringutils.random_string(24) + + try: + await self.db_pool.simple_insert( + table="sessions", + values={ + "session_id": session_id, + "session_type": session_type, + "value": json_encoder.encode(value), + "expiry_time_ms": self.hs.get_clock().time_msec() + expiry_ms, + }, + desc="create_session", + ) + + return session_id + except self.db_pool.engine.module.IntegrityError: + attempts += 1 + raise StoreError(500, "Couldn't generate a session ID.") + + async def get_session(self, session_type: str, session_id: str) -> JsonDict: + """ + Retrieve data stored with create_session + + Args: + session_type: The type for this session. + session_id: The session ID returned from create_session. + + Raises: + StoreError if the session cannot be found. + """ + + def _get_session( + txn: LoggingTransaction, session_type: str, session_id: str, ts: int + ) -> JsonDict: + # This includes the expiry time since items are only periodically + # deleted, not upon expiry. + select_sql = """ + SELECT value FROM sessions WHERE + session_type = ? AND session_id = ? AND expiry_time_ms > ? + """ + txn.execute(select_sql, [session_type, session_id, ts]) + row = txn.fetchone() + + if not row: + raise StoreError(404, "No session") + + return db_to_json(row[0]) + + return await self.db_pool.runInteraction( + "get_session", + _get_session, + session_type, + session_id, + self._clock.time_msec(), + ) + + @wrap_as_background_process("delete_expired_sessions") + async def _delete_expired_sessions(self) -> None: + """Remove sessions with expiry dates that have passed.""" + + def _delete_expired_sessions_txn(txn: LoggingTransaction, ts: int) -> None: + sql = "DELETE FROM sessions WHERE expiry_time_ms <= ?" + txn.execute(sql, (ts,)) + + await self.db_pool.runInteraction( + "delete_expired_sessions", + _delete_expired_sessions_txn, + self._clock.time_msec(), + ) diff --git a/synapse/storage/schema/main/delta/62/02session_store.sql b/synapse/storage/schema/main/delta/62/02session_store.sql new file mode 100644 index 0000000000..535fb34c10 --- /dev/null +++ b/synapse/storage/schema/main/delta/62/02session_store.sql @@ -0,0 +1,23 @@ +/* + * Copyright 2021 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE IF NOT EXISTS sessions( + session_type TEXT NOT NULL, -- The unique key for this type of session. + session_id TEXT NOT NULL, -- The session ID passed to the client. + value TEXT NOT NULL, -- A JSON dictionary to persist. + expiry_time_ms BIGINT NOT NULL, -- The time this session will expire (epoch time in milliseconds). + UNIQUE (session_type, session_id) +); |