diff options
author | Mathieu Velten <mathieuv@matrix.org> | 2023-07-24 21:50:02 +0200 |
---|---|---|
committer | Mathieu Velten <mathieuv@matrix.org> | 2023-07-24 21:53:13 +0200 |
commit | 0961f52c57184083d823b2b98f2b2dd1c23c8eee (patch) | |
tree | dc99aaebe42e47c6bfea940adb2d6e2b8821696a | |
parent | Implements a task scheduler for resumable potentially long running tasks (diff) | |
download | synapse-0961f52c57184083d823b2b98f2b2dd1c23c8eee.tar.xz |
Add filters to task retrieval + clean less often
-rw-r--r-- | synapse/storage/databases/main/task_scheduler.py | 76 | ||||
-rw-r--r-- | synapse/util/task_scheduler.py | 46 |
2 files changed, 83 insertions, 39 deletions
diff --git a/synapse/storage/databases/main/task_scheduler.py b/synapse/storage/databases/main/task_scheduler.py index 37c2110bbd..93b7ac8fc2 100644 --- a/synapse/storage/databases/main/task_scheduler.py +++ b/synapse/storage/databases/main/task_scheduler.py @@ -16,7 +16,12 @@ import json from typing import TYPE_CHECKING, Any, Dict, List, Optional from synapse.storage._base import SQLBaseStore -from synapse.storage.database import DatabasePool, LoggingDatabaseConnection +from synapse.storage.database import ( + DatabasePool, + LoggingDatabaseConnection, + LoggingTransaction, + make_in_list_sql_clause, +) from synapse.types import JsonDict, JsonMapping, ScheduledTask, TaskStatus if TYPE_CHECKING: @@ -42,40 +47,56 @@ class TaskSchedulerWorkerStore(SQLBaseStore): return ScheduledTask(**row) async def get_scheduled_tasks( - self, action: Optional[str] = None, resource_id: Optional[str] = None + self, + actions: Optional[List[str]] = None, + resource_ids: Optional[List[str]] = None, + statuses: Optional[List[TaskStatus]] = None, ) -> List[ScheduledTask]: """Get a list of scheduled tasks from the DB. - If the parameters are `None` all the tasks are returned. + If an arg is `None` all tasks matching the other args will be selected. + If an arg is an empty list, the value needs to be NULL in DB to be selected. Args: - action: Limit the returned tasks to this specific action name - resource_id: Limit the returned tasks to this specific resource id + actions: Limit the returned tasks to those specific action names + resource_ids: Limit the returned tasks to thoe specific resource ids + statuses: Limit the returned tasks to thoe specific statuses Returns: a list of `ScheduledTask` """ - keyvalues = {} - if action: - keyvalues["action"] = action - if resource_id: - keyvalues["resource_id"] = resource_id - rows = await self.db_pool.simple_select_list( - table="scheduled_tasks", - keyvalues=keyvalues, - retcols=( - "id", - "action", - "status", - "timestamp", - "resource_id", - "params", - "result", - "error", - ), - desc="get_scheduled_tasks", + def get_scheduled_tasks_txn(txn: LoggingTransaction) -> List[Dict[str, Any]]: + clauses = [] + args = [] + if actions is not None: + clause, temp_args = make_in_list_sql_clause( + txn.database_engine, "action", actions + ) + clauses.append(clause) + args.extend(temp_args) + if resource_ids is not None: + clause, temp_args = make_in_list_sql_clause( + txn.database_engine, "resource_id", resource_ids + ) + clauses.append(clause) + args.extend(temp_args) + if statuses is not None: + clause, temp_args = make_in_list_sql_clause( + txn.database_engine, "status", statuses + ) + clauses.append(clause) + args.extend(temp_args) + + sql = "SELECT * FROM scheduled_tasks" + if clauses: + sql = sql + " WHERE " + " AND ".join(clauses) + + txn.execute(sql, args) + return self.db_pool.cursor_to_dict(txn) + + rows = await self.db_pool.runInteraction( + "get_scheduled_tasks", get_scheduled_tasks_txn ) - return [TaskSchedulerWorkerStore._convert_row_to_task(row) for row in rows] async def upsert_scheduled_task(self, task: ScheduledTask) -> None: @@ -107,7 +128,7 @@ class TaskSchedulerWorkerStore(SQLBaseStore): status: Optional[TaskStatus] = None, result: Optional[JsonMapping] = None, error: Optional[str] = None, - ) -> None: + ) -> bool: """Update a scheduled task in the DB with some new value(s). Args: @@ -126,12 +147,13 @@ class TaskSchedulerWorkerStore(SQLBaseStore): updatevalues["result"] = json.dumps(result) if error is not None: updatevalues["error"] = error - await self.db_pool.simple_update( + nb_rows = await self.db_pool.simple_update( "scheduled_tasks", {"id": id}, updatevalues, desc="update_scheduled_task", ) + return nb_rows > 0 async def get_scheduled_task(self, id: str) -> Optional[ScheduledTask]: """Get a specific `ScheduledTask` from its id. diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py index 2c3abaaedf..034a986ff2 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py @@ -31,6 +31,7 @@ class TaskScheduler: # Precision of the scheduler, evaluation of tasks to run will only happen # every `SCHEDULE_INTERVAL_MS` ms SCHEDULE_INTERVAL_MS = 5 * 60 * 1000 # 5mn + CLEAN_INTERVAL_MS = 60 * 60 * 1000 # 1hr # Time before a complete or failed task is deleted from the DB KEEP_TASKS_FOR_MS = 7 * 24 * 60 * 60 * 1000 # 1 week @@ -51,8 +52,14 @@ class TaskScheduler: self.clock.looping_call( run_as_background_process, TaskScheduler.SCHEDULE_INTERVAL_MS, - "scheduled_tasks_loop", - self._scheduled_tasks_loop, + "run_scheduled_tasks", + self._run_scheduled_tasks, + ) + self.clock.looping_call( + run_as_background_process, + TaskScheduler.CLEAN_INTERVAL_MS, + "clean_scheduled_tasks", + self._clean_scheduled_tasks, ) def register_action( @@ -135,10 +142,11 @@ class TaskScheduler: self, id: str, *, + timestamp: Optional[int] = None, status: Optional[TaskStatus] = None, result: Optional[JsonMapping] = None, error: Optional[str] = None, - ) -> None: + ) -> bool: """Update some task associated values. This is used internally, and also exposed publically so it can be used inside task functions. @@ -150,9 +158,11 @@ class TaskScheduler: result: the new result of the task error: the new error of the task """ - await self.store.update_scheduled_task( + if timestamp is None: + timestamp = self.clock.time_msec() + return await self.store.update_scheduled_task( id, - timestamp=self.clock.time_msec(), + timestamp=timestamp, status=status, result=result, error=error, @@ -170,10 +180,13 @@ class TaskScheduler: return await self.store.get_scheduled_task(id) async def get_tasks( - self, action: str, resource_id: Optional[str] + self, + actions: Optional[List[str]] = None, + resource_ids: Optional[List[str]] = None, + statuses: Optional[List[TaskStatus]] = None, ) -> List[ScheduledTask]: - """Get a list of tasks associated with an action name, and - optionally with a resource id. + """Get a list of tasks associated with some action name(s) and/or + with some resource id(s). Args: action: the action name of the tasks to retrieve @@ -182,11 +195,13 @@ class TaskScheduler: Returns: a list of `ScheduledTask` """ - return await self.store.get_scheduled_tasks(action, resource_id) + return await self.store.get_scheduled_tasks(actions, resource_ids, statuses) - async def _scheduled_tasks_loop(self) -> None: + async def _run_scheduled_tasks(self) -> None: """Main loop taking care of launching the scheduled tasks when needed.""" - for task in await self.store.get_scheduled_tasks(): + for task in await self.store.get_scheduled_tasks( + statuses=[TaskStatus.SCHEDULED, TaskStatus.ACTIVE] + ): if task.id not in self.running_tasks: if ( task.status == TaskStatus.SCHEDULED @@ -195,7 +210,14 @@ class TaskScheduler: await self._launch_task(task, True) elif task.status == TaskStatus.ACTIVE: await self._launch_task(task, False) - elif ( + + async def _clean_scheduled_tasks(self) -> None: + """Clean loop taking care of removing old complete or failed jobs to avoid clutter the DB.""" + for task in await self.store.get_scheduled_tasks( + statuses=[TaskStatus.FAILED, TaskStatus.COMPLETE] + ): + if task.id not in self.running_tasks: + if ( task.status == TaskStatus.COMPLETE or task.status == TaskStatus.FAILED ) and self.clock.time_msec() > task.timestamp + TaskScheduler.KEEP_TASKS_FOR_MS: |