diff options
author | Mathieu Velten <mathieuv@matrix.org> | 2023-08-21 14:17:13 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-08-21 14:17:13 +0200 |
commit | 358896e1b835bf693ef40d4cf9f10077432e935b (patch) | |
tree | c53830870b51f7d81e320cb5ce7a1f85df357116 /synapse/storage/databases | |
parent | Bump ijson from 3.2.1 to 3.2.3 (#16143) (diff) | |
download | synapse-358896e1b835bf693ef40d4cf9f10077432e935b.tar.xz |
Implements a task scheduler for resumable potentially long running tasks (#15891)
Diffstat (limited to 'synapse/storage/databases')
-rw-r--r-- | synapse/storage/databases/main/__init__.py | 2 | ||||
-rw-r--r-- | synapse/storage/databases/main/task_scheduler.py | 202 |
2 files changed, 204 insertions, 0 deletions
diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py index e17f25e87a..a85633efcd 100644 --- a/synapse/storage/databases/main/__init__.py +++ b/synapse/storage/databases/main/__init__.py @@ -70,6 +70,7 @@ from .state import StateStore from .stats import StatsStore from .stream import StreamWorkerStore from .tags import TagsStore +from .task_scheduler import TaskSchedulerWorkerStore from .transactions import TransactionWorkerStore from .ui_auth import UIAuthStore from .user_directory import UserDirectoryStore @@ -127,6 +128,7 @@ class DataStore( CacheInvalidationWorkerStore, LockStore, SessionStore, + TaskSchedulerWorkerStore, ): def __init__( self, diff --git a/synapse/storage/databases/main/task_scheduler.py b/synapse/storage/databases/main/task_scheduler.py new file mode 100644 index 0000000000..1fb3180c3c --- /dev/null +++ b/synapse/storage/databases/main/task_scheduler.py @@ -0,0 +1,202 @@ +# Copyright 2023 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import TYPE_CHECKING, Any, Dict, List, Optional + +from synapse.storage._base import SQLBaseStore, db_to_json +from synapse.storage.database import ( + DatabasePool, + LoggingDatabaseConnection, + LoggingTransaction, + make_in_list_sql_clause, +) +from synapse.types import JsonDict, JsonMapping, ScheduledTask, TaskStatus +from synapse.util import json_encoder + +if TYPE_CHECKING: + from synapse.server import HomeServer + + +class TaskSchedulerWorkerStore(SQLBaseStore): + def __init__( + self, + database: DatabasePool, + db_conn: LoggingDatabaseConnection, + hs: "HomeServer", + ): + super().__init__(database, db_conn, hs) + + @staticmethod + def _convert_row_to_task(row: Dict[str, Any]) -> ScheduledTask: + row["status"] = TaskStatus(row["status"]) + if row["params"] is not None: + row["params"] = db_to_json(row["params"]) + if row["result"] is not None: + row["result"] = db_to_json(row["result"]) + return ScheduledTask(**row) + + async def get_scheduled_tasks( + self, + *, + actions: Optional[List[str]] = None, + resource_id: Optional[str] = None, + statuses: Optional[List[TaskStatus]] = None, + max_timestamp: Optional[int] = None, + ) -> List[ScheduledTask]: + """Get a list of scheduled tasks from the DB. + + Args: + actions: Limit the returned tasks to those specific action names + resource_id: Limit the returned tasks to the specific resource id, if specified + statuses: Limit the returned tasks to the specific statuses + max_timestamp: Limit the returned tasks to the ones that have + a timestamp inferior to the specified one + + Returns: a list of `ScheduledTask`, ordered by increasing timestamps + """ + + def get_scheduled_tasks_txn(txn: LoggingTransaction) -> List[Dict[str, Any]]: + clauses: List[str] = [] + args: List[Any] = [] + if resource_id: + clauses.append("resource_id = ?") + args.append(resource_id) + if actions is not None: + clause, temp_args = make_in_list_sql_clause( + txn.database_engine, "action", actions + ) + clauses.append(clause) + args.extend(temp_args) + if statuses is not None: + clause, temp_args = make_in_list_sql_clause( + txn.database_engine, "status", statuses + ) + clauses.append(clause) + args.extend(temp_args) + if max_timestamp is not None: + clauses.append("timestamp <= ?") + args.append(max_timestamp) + + sql = "SELECT * FROM scheduled_tasks" + if clauses: + sql = sql + " WHERE " + " AND ".join(clauses) + + sql = sql + "ORDER BY timestamp" + + txn.execute(sql, args) + return self.db_pool.cursor_to_dict(txn) + + rows = await self.db_pool.runInteraction( + "get_scheduled_tasks", get_scheduled_tasks_txn + ) + return [TaskSchedulerWorkerStore._convert_row_to_task(row) for row in rows] + + async def insert_scheduled_task(self, task: ScheduledTask) -> None: + """Insert a specified `ScheduledTask` in the DB. + + Args: + task: the `ScheduledTask` to insert + """ + await self.db_pool.simple_insert( + "scheduled_tasks", + { + "id": task.id, + "action": task.action, + "status": task.status, + "timestamp": task.timestamp, + "resource_id": task.resource_id, + "params": None + if task.params is None + else json_encoder.encode(task.params), + "result": None + if task.result is None + else json_encoder.encode(task.result), + "error": task.error, + }, + desc="insert_scheduled_task", + ) + + async def update_scheduled_task( + self, + id: str, + timestamp: int, + *, + status: Optional[TaskStatus] = None, + result: Optional[JsonMapping] = None, + error: Optional[str] = None, + ) -> bool: + """Update a scheduled task in the DB with some new value(s). + + Args: + id: id of the `ScheduledTask` to update + timestamp: new timestamp of the task + status: new status of the task + result: new result of the task + error: new error of the task + + Returns: `False` if no matching row was found, `True` otherwise + """ + updatevalues: JsonDict = {"timestamp": timestamp} + if status is not None: + updatevalues["status"] = status + if result is not None: + updatevalues["result"] = json_encoder.encode(result) + if error is not None: + updatevalues["error"] = error + nb_rows = await self.db_pool.simple_update( + "scheduled_tasks", + {"id": id}, + updatevalues, + desc="update_scheduled_task", + ) + return nb_rows > 0 + + async def get_scheduled_task(self, id: str) -> Optional[ScheduledTask]: + """Get a specific `ScheduledTask` from its id. + + Args: + id: the id of the task to retrieve + + Returns: the task if available, `None` otherwise + """ + row = await self.db_pool.simple_select_one( + table="scheduled_tasks", + keyvalues={"id": id}, + retcols=( + "id", + "action", + "status", + "timestamp", + "resource_id", + "params", + "result", + "error", + ), + allow_none=True, + desc="get_scheduled_task", + ) + + return TaskSchedulerWorkerStore._convert_row_to_task(row) if row else None + + async def delete_scheduled_task(self, id: str) -> None: + """Delete a specific task from its id. + + Args: + id: the id of the task to delete + """ + await self.db_pool.simple_delete( + "scheduled_tasks", + keyvalues={"id": id}, + desc="delete_scheduled_task", + ) |