summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/10613.feature1
-rw-r--r--mypy.ini1
-rw-r--r--synapse/app/generic_worker.py2
-rw-r--r--synapse/handlers/room_summary.py76
-rw-r--r--synapse/storage/databases/main/__init__.py2
-rw-r--r--synapse/storage/databases/main/session.py145
-rw-r--r--synapse/storage/schema/main/delta/62/02session_store.sql23
7 files changed, 212 insertions, 38 deletions
diff --git a/changelog.d/10613.feature b/changelog.d/10613.feature
new file mode 100644
index 0000000000..ffc4e4289c
--- /dev/null
+++ b/changelog.d/10613.feature
@@ -0,0 +1 @@
+Add pagination to the spaces summary based on updates to [MSC2946](https://github.com/matrix-org/matrix-doc/pull/2946).
diff --git a/mypy.ini b/mypy.ini
index b17872211e..745e6b78eb 100644
--- a/mypy.ini
+++ b/mypy.ini
@@ -57,6 +57,7 @@ files =
   synapse/storage/databases/main/keys.py,
   synapse/storage/databases/main/pusher.py,
   synapse/storage/databases/main/registration.py,
+  synapse/storage/databases/main/session.py,
   synapse/storage/databases/main/stream.py,
   synapse/storage/databases/main/ui_auth.py,
   synapse/storage/database.py,
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index fd2626dbe1..9b71dd75e6 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -118,6 +118,7 @@ from synapse.storage.databases.main.monthly_active_users import (
 from synapse.storage.databases.main.presence import PresenceStore
 from synapse.storage.databases.main.room import RoomWorkerStore
 from synapse.storage.databases.main.search import SearchStore
+from synapse.storage.databases.main.session import SessionStore
 from synapse.storage.databases.main.stats import StatsStore
 from synapse.storage.databases.main.transactions import TransactionWorkerStore
 from synapse.storage.databases.main.ui_auth import UIAuthWorkerStore
@@ -253,6 +254,7 @@ class GenericWorkerSlavedStore(
     SearchStore,
     TransactionWorkerStore,
     LockStore,
+    SessionStore,
     BaseSlavedStore,
 ):
     pass
diff --git a/synapse/handlers/room_summary.py b/synapse/handlers/room_summary.py
index ac6cfc0da9..906985c754 100644
--- a/synapse/handlers/room_summary.py
+++ b/synapse/handlers/room_summary.py
@@ -28,12 +28,11 @@ from synapse.api.constants import (
     Membership,
     RoomTypes,
 )
-from synapse.api.errors import AuthError, Codes, NotFoundError, SynapseError
+from synapse.api.errors import AuthError, Codes, NotFoundError, StoreError, SynapseError
 from synapse.events import EventBase
 from synapse.events.utils import format_event_for_client_v2
 from synapse.types import JsonDict
 from synapse.util.caches.response_cache import ResponseCache
-from synapse.util.stringutils import random_string
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
@@ -76,6 +75,9 @@ class _PaginationSession:
 
 
 class RoomSummaryHandler:
+    # A unique key used for pagination sessions for the room hierarchy endpoint.
+    _PAGINATION_SESSION_TYPE = "room_hierarchy_pagination"
+
     # The time a pagination session remains valid for.
     _PAGINATION_SESSION_VALIDITY_PERIOD_MS = 5 * 60 * 1000
 
@@ -87,12 +89,6 @@ class RoomSummaryHandler:
         self._server_name = hs.hostname
         self._federation_client = hs.get_federation_client()
 
-        # A map of query information to the current pagination state.
-        #
-        # TODO Allow for multiple workers to share this data.
-        # TODO Expire pagination tokens.
-        self._pagination_sessions: Dict[_PaginationKey, _PaginationSession] = {}
-
         # If a user tries to fetch the same page multiple times in quick succession,
         # only process the first attempt and return its result to subsequent requests.
         self._pagination_response_cache: ResponseCache[
@@ -102,21 +98,6 @@ class RoomSummaryHandler:
             "get_room_hierarchy",
         )
 
-    def _expire_pagination_sessions(self):
-        """Expire pagination session which are old."""
-        expire_before = (
-            self._clock.time_msec() - self._PAGINATION_SESSION_VALIDITY_PERIOD_MS
-        )
-        to_expire = []
-
-        for key, value in self._pagination_sessions.items():
-            if value.creation_time_ms < expire_before:
-                to_expire.append(key)
-
-        for key in to_expire:
-            logger.debug("Expiring pagination session id %s", key)
-            del self._pagination_sessions[key]
-
     async def get_space_summary(
         self,
         requester: str,
@@ -327,18 +308,29 @@ class RoomSummaryHandler:
 
         # If this is continuing a previous session, pull the persisted data.
         if from_token:
-            self._expire_pagination_sessions()
+            try:
+                pagination_session = await self._store.get_session(
+                    session_type=self._PAGINATION_SESSION_TYPE,
+                    session_id=from_token,
+                )
+            except StoreError:
+                raise SynapseError(400, "Unknown pagination token", Codes.INVALID_PARAM)
 
-            pagination_key = _PaginationKey(
-                requested_room_id, suggested_only, max_depth, from_token
-            )
-            if pagination_key not in self._pagination_sessions:
+            # If the requester, room ID, suggested-only, or max depth were modified
+            # the session is invalid.
+            if (
+                requester != pagination_session["requester"]
+                or requested_room_id != pagination_session["room_id"]
+                or suggested_only != pagination_session["suggested_only"]
+                or max_depth != pagination_session["max_depth"]
+            ):
                 raise SynapseError(400, "Unknown pagination token", Codes.INVALID_PARAM)
 
             # Load the previous state.
-            pagination_session = self._pagination_sessions[pagination_key]
-            room_queue = pagination_session.room_queue
-            processed_rooms = pagination_session.processed_rooms
+            room_queue = [
+                _RoomQueueEntry(*fields) for fields in pagination_session["room_queue"]
+            ]
+            processed_rooms = set(pagination_session["processed_rooms"])
         else:
             # The queue of rooms to process, the next room is last on the stack.
             room_queue = [_RoomQueueEntry(requested_room_id, ())]
@@ -456,13 +448,21 @@ class RoomSummaryHandler:
 
         # If there's additional data, generate a pagination token (and persist state).
         if room_queue:
-            next_batch = random_string(24)
-            result["next_batch"] = next_batch
-            pagination_key = _PaginationKey(
-                requested_room_id, suggested_only, max_depth, next_batch
-            )
-            self._pagination_sessions[pagination_key] = _PaginationSession(
-                self._clock.time_msec(), room_queue, processed_rooms
+            result["next_batch"] = await self._store.create_session(
+                session_type=self._PAGINATION_SESSION_TYPE,
+                value={
+                    # Information which must be identical across pagination.
+                    "requester": requester,
+                    "room_id": requested_room_id,
+                    "suggested_only": suggested_only,
+                    "max_depth": max_depth,
+                    # The stored state.
+                    "room_queue": [
+                        attr.astuple(room_entry) for room_entry in room_queue
+                    ],
+                    "processed_rooms": list(processed_rooms),
+                },
+                expiry_ms=self._PAGINATION_SESSION_VALIDITY_PERIOD_MS,
             )
 
         return result
diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py
index 01b918e12e..00a644e8f7 100644
--- a/synapse/storage/databases/main/__init__.py
+++ b/synapse/storage/databases/main/__init__.py
@@ -63,6 +63,7 @@ from .relations import RelationsStore
 from .room import RoomStore
 from .roommember import RoomMemberStore
 from .search import SearchStore
+from .session import SessionStore
 from .signatures import SignatureStore
 from .state import StateStore
 from .stats import StatsStore
@@ -121,6 +122,7 @@ class DataStore(
     ServerMetricsStore,
     EventForwardExtremitiesStore,
     LockStore,
+    SessionStore,
 ):
     def __init__(self, database: DatabasePool, db_conn, hs):
         self.hs = hs
diff --git a/synapse/storage/databases/main/session.py b/synapse/storage/databases/main/session.py
new file mode 100644
index 0000000000..172f27d109
--- /dev/null
+++ b/synapse/storage/databases/main/session.py
@@ -0,0 +1,145 @@
+# -*- coding: utf-8 -*-
+#  Copyright 2021 The Matrix.org Foundation C.I.C.
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+from typing import TYPE_CHECKING
+
+import synapse.util.stringutils as stringutils
+from synapse.api.errors import StoreError
+from synapse.metrics.background_process_metrics import wrap_as_background_process
+from synapse.storage._base import SQLBaseStore, db_to_json
+from synapse.storage.database import (
+    DatabasePool,
+    LoggingDatabaseConnection,
+    LoggingTransaction,
+)
+from synapse.types import JsonDict
+from synapse.util import json_encoder
+
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
+
+class SessionStore(SQLBaseStore):
+    """
+    A store for generic session data.
+
+    Each type of session should provide a unique type (to separate sessions).
+
+    Sessions are automatically removed when they expire.
+    """
+
+    def __init__(
+        self,
+        database: DatabasePool,
+        db_conn: LoggingDatabaseConnection,
+        hs: "HomeServer",
+    ):
+        super().__init__(database, db_conn, hs)
+
+        # Create a background job for culling expired sessions.
+        if hs.config.run_background_tasks:
+            self._clock.looping_call(self._delete_expired_sessions, 30 * 60 * 1000)
+
+    async def create_session(
+        self, session_type: str, value: JsonDict, expiry_ms: int
+    ) -> str:
+        """
+        Creates a new pagination session for the room hierarchy endpoint.
+
+        Args:
+            session_type: The type for this session.
+            value: The value to store.
+            expiry_ms: How long before an item is evicted from the cache
+                in milliseconds. Default is 0, indicating items never get
+                evicted based on time.
+
+        Returns:
+            The newly created session ID.
+
+        Raises:
+            StoreError if a unique session ID cannot be generated.
+        """
+        # autogen a session ID and try to create it. We may clash, so just
+        # try a few times till one goes through, giving up eventually.
+        attempts = 0
+        while attempts < 5:
+            session_id = stringutils.random_string(24)
+
+            try:
+                await self.db_pool.simple_insert(
+                    table="sessions",
+                    values={
+                        "session_id": session_id,
+                        "session_type": session_type,
+                        "value": json_encoder.encode(value),
+                        "expiry_time_ms": self.hs.get_clock().time_msec() + expiry_ms,
+                    },
+                    desc="create_session",
+                )
+
+                return session_id
+            except self.db_pool.engine.module.IntegrityError:
+                attempts += 1
+        raise StoreError(500, "Couldn't generate a session ID.")
+
+    async def get_session(self, session_type: str, session_id: str) -> JsonDict:
+        """
+        Retrieve data stored with create_session
+
+        Args:
+            session_type: The type for this session.
+            session_id: The session ID returned from create_session.
+
+        Raises:
+            StoreError if the session cannot be found.
+        """
+
+        def _get_session(
+            txn: LoggingTransaction, session_type: str, session_id: str, ts: int
+        ) -> JsonDict:
+            # This includes the expiry time since items are only periodically
+            # deleted, not upon expiry.
+            select_sql = """
+            SELECT value FROM sessions WHERE
+            session_type = ? AND session_id = ? AND expiry_time_ms > ?
+            """
+            txn.execute(select_sql, [session_type, session_id, ts])
+            row = txn.fetchone()
+
+            if not row:
+                raise StoreError(404, "No session")
+
+            return db_to_json(row[0])
+
+        return await self.db_pool.runInteraction(
+            "get_session",
+            _get_session,
+            session_type,
+            session_id,
+            self._clock.time_msec(),
+        )
+
+    @wrap_as_background_process("delete_expired_sessions")
+    async def _delete_expired_sessions(self) -> None:
+        """Remove sessions with expiry dates that have passed."""
+
+        def _delete_expired_sessions_txn(txn: LoggingTransaction, ts: int) -> None:
+            sql = "DELETE FROM sessions WHERE expiry_time_ms <= ?"
+            txn.execute(sql, (ts,))
+
+        await self.db_pool.runInteraction(
+            "delete_expired_sessions",
+            _delete_expired_sessions_txn,
+            self._clock.time_msec(),
+        )
diff --git a/synapse/storage/schema/main/delta/62/02session_store.sql b/synapse/storage/schema/main/delta/62/02session_store.sql
new file mode 100644
index 0000000000..535fb34c10
--- /dev/null
+++ b/synapse/storage/schema/main/delta/62/02session_store.sql
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2021 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE IF NOT EXISTS sessions(
+    session_type TEXT NOT NULL,  -- The unique key for this type of session.
+    session_id TEXT NOT NULL,  -- The session ID passed to the client.
+    value TEXT NOT NULL, -- A JSON dictionary to persist.
+    expiry_time_ms BIGINT NOT NULL,  -- The time this session will expire (epoch time in milliseconds).
+    UNIQUE (session_type, session_id)
+);