diff options
author | Mathieu Velten <mathieuv@matrix.org> | 2023-08-21 14:17:13 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-08-21 14:17:13 +0200 |
commit | 358896e1b835bf693ef40d4cf9f10077432e935b (patch) | |
tree | c53830870b51f7d81e320cb5ce7a1f85df357116 /synapse/storage | |
parent | Bump ijson from 3.2.1 to 3.2.3 (#16143) (diff) | |
download | synapse-358896e1b835bf693ef40d4cf9f10077432e935b.tar.xz |
Implements a task scheduler for resumable potentially long running tasks (#15891)
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/databases/main/__init__.py | 2 | ||||
-rw-r--r-- | synapse/storage/databases/main/task_scheduler.py | 202 | ||||
-rw-r--r-- | synapse/storage/schema/__init__.py | 1 | ||||
-rw-r--r-- | synapse/storage/schema/main/delta/80/02_scheduled_tasks.sql | 28 |
4 files changed, 233 insertions, 0 deletions
diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py index e17f25e87a..a85633efcd 100644 --- a/synapse/storage/databases/main/__init__.py +++ b/synapse/storage/databases/main/__init__.py @@ -70,6 +70,7 @@ from .state import StateStore from .stats import StatsStore from .stream import StreamWorkerStore from .tags import TagsStore +from .task_scheduler import TaskSchedulerWorkerStore from .transactions import TransactionWorkerStore from .ui_auth import UIAuthStore from .user_directory import UserDirectoryStore @@ -127,6 +128,7 @@ class DataStore( CacheInvalidationWorkerStore, LockStore, SessionStore, + TaskSchedulerWorkerStore, ): def __init__( self, diff --git a/synapse/storage/databases/main/task_scheduler.py b/synapse/storage/databases/main/task_scheduler.py new file mode 100644 index 0000000000..1fb3180c3c --- /dev/null +++ b/synapse/storage/databases/main/task_scheduler.py @@ -0,0 +1,202 @@ +# Copyright 2023 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import TYPE_CHECKING, Any, Dict, List, Optional + +from synapse.storage._base import SQLBaseStore, db_to_json +from synapse.storage.database import ( + DatabasePool, + LoggingDatabaseConnection, + LoggingTransaction, + make_in_list_sql_clause, +) +from synapse.types import JsonDict, JsonMapping, ScheduledTask, TaskStatus +from synapse.util import json_encoder + +if TYPE_CHECKING: + from synapse.server import HomeServer + + +class TaskSchedulerWorkerStore(SQLBaseStore): + def __init__( + self, + database: DatabasePool, + db_conn: LoggingDatabaseConnection, + hs: "HomeServer", + ): + super().__init__(database, db_conn, hs) + + @staticmethod + def _convert_row_to_task(row: Dict[str, Any]) -> ScheduledTask: + row["status"] = TaskStatus(row["status"]) + if row["params"] is not None: + row["params"] = db_to_json(row["params"]) + if row["result"] is not None: + row["result"] = db_to_json(row["result"]) + return ScheduledTask(**row) + + async def get_scheduled_tasks( + self, + *, + actions: Optional[List[str]] = None, + resource_id: Optional[str] = None, + statuses: Optional[List[TaskStatus]] = None, + max_timestamp: Optional[int] = None, + ) -> List[ScheduledTask]: + """Get a list of scheduled tasks from the DB. + + Args: + actions: Limit the returned tasks to those specific action names + resource_id: Limit the returned tasks to the specific resource id, if specified + statuses: Limit the returned tasks to the specific statuses + max_timestamp: Limit the returned tasks to the ones that have + a timestamp inferior to the specified one + + Returns: a list of `ScheduledTask`, ordered by increasing timestamps + """ + + def get_scheduled_tasks_txn(txn: LoggingTransaction) -> List[Dict[str, Any]]: + clauses: List[str] = [] + args: List[Any] = [] + if resource_id: + clauses.append("resource_id = ?") + args.append(resource_id) + if actions is not None: + clause, temp_args = make_in_list_sql_clause( + txn.database_engine, "action", actions + ) + clauses.append(clause) + args.extend(temp_args) + if statuses is not None: + clause, temp_args = make_in_list_sql_clause( + txn.database_engine, "status", statuses + ) + clauses.append(clause) + args.extend(temp_args) + if max_timestamp is not None: + clauses.append("timestamp <= ?") + args.append(max_timestamp) + + sql = "SELECT * FROM scheduled_tasks" + if clauses: + sql = sql + " WHERE " + " AND ".join(clauses) + + sql = sql + "ORDER BY timestamp" + + txn.execute(sql, args) + return self.db_pool.cursor_to_dict(txn) + + rows = await self.db_pool.runInteraction( + "get_scheduled_tasks", get_scheduled_tasks_txn + ) + return [TaskSchedulerWorkerStore._convert_row_to_task(row) for row in rows] + + async def insert_scheduled_task(self, task: ScheduledTask) -> None: + """Insert a specified `ScheduledTask` in the DB. + + Args: + task: the `ScheduledTask` to insert + """ + await self.db_pool.simple_insert( + "scheduled_tasks", + { + "id": task.id, + "action": task.action, + "status": task.status, + "timestamp": task.timestamp, + "resource_id": task.resource_id, + "params": None + if task.params is None + else json_encoder.encode(task.params), + "result": None + if task.result is None + else json_encoder.encode(task.result), + "error": task.error, + }, + desc="insert_scheduled_task", + ) + + async def update_scheduled_task( + self, + id: str, + timestamp: int, + *, + status: Optional[TaskStatus] = None, + result: Optional[JsonMapping] = None, + error: Optional[str] = None, + ) -> bool: + """Update a scheduled task in the DB with some new value(s). + + Args: + id: id of the `ScheduledTask` to update + timestamp: new timestamp of the task + status: new status of the task + result: new result of the task + error: new error of the task + + Returns: `False` if no matching row was found, `True` otherwise + """ + updatevalues: JsonDict = {"timestamp": timestamp} + if status is not None: + updatevalues["status"] = status + if result is not None: + updatevalues["result"] = json_encoder.encode(result) + if error is not None: + updatevalues["error"] = error + nb_rows = await self.db_pool.simple_update( + "scheduled_tasks", + {"id": id}, + updatevalues, + desc="update_scheduled_task", + ) + return nb_rows > 0 + + async def get_scheduled_task(self, id: str) -> Optional[ScheduledTask]: + """Get a specific `ScheduledTask` from its id. + + Args: + id: the id of the task to retrieve + + Returns: the task if available, `None` otherwise + """ + row = await self.db_pool.simple_select_one( + table="scheduled_tasks", + keyvalues={"id": id}, + retcols=( + "id", + "action", + "status", + "timestamp", + "resource_id", + "params", + "result", + "error", + ), + allow_none=True, + desc="get_scheduled_task", + ) + + return TaskSchedulerWorkerStore._convert_row_to_task(row) if row else None + + async def delete_scheduled_task(self, id: str) -> None: + """Delete a specific task from its id. + + Args: + id: the id of the task to delete + """ + await self.db_pool.simple_delete( + "scheduled_tasks", + keyvalues={"id": id}, + desc="delete_scheduled_task", + ) diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py index 7de9949a5b..649d3c8e9f 100644 --- a/synapse/storage/schema/__init__.py +++ b/synapse/storage/schema/__init__.py @@ -113,6 +113,7 @@ Changes in SCHEMA_VERSION = 79 Changes in SCHEMA_VERSION = 80 - The event_txn_id_device_id is always written to for new events. + - Add tables for the task scheduler. """ diff --git a/synapse/storage/schema/main/delta/80/02_scheduled_tasks.sql b/synapse/storage/schema/main/delta/80/02_scheduled_tasks.sql new file mode 100644 index 0000000000..286d109ed7 --- /dev/null +++ b/synapse/storage/schema/main/delta/80/02_scheduled_tasks.sql @@ -0,0 +1,28 @@ +/* Copyright 2023 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- cf ScheduledTask docstring for the meaning of the fields. +CREATE TABLE IF NOT EXISTS scheduled_tasks( + id TEXT PRIMARY KEY, + action TEXT NOT NULL, + status TEXT NOT NULL, + timestamp BIGINT NOT NULL, + resource_id TEXT, + params TEXT, + result TEXT, + error TEXT +); + +CREATE INDEX IF NOT EXISTS scheduled_tasks_status ON scheduled_tasks(status); |