diff --git a/synapse/storage/databases/main/task_scheduler.py b/synapse/storage/databases/main/task_scheduler.py
index 37c2110bbd..93b7ac8fc2 100644
--- a/synapse/storage/databases/main/task_scheduler.py
+++ b/synapse/storage/databases/main/task_scheduler.py
@@ -16,7 +16,12 @@ import json
from typing import TYPE_CHECKING, Any, Dict, List, Optional
from synapse.storage._base import SQLBaseStore
-from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
+from synapse.storage.database import (
+ DatabasePool,
+ LoggingDatabaseConnection,
+ LoggingTransaction,
+ make_in_list_sql_clause,
+)
from synapse.types import JsonDict, JsonMapping, ScheduledTask, TaskStatus
if TYPE_CHECKING:
@@ -42,40 +47,56 @@ class TaskSchedulerWorkerStore(SQLBaseStore):
return ScheduledTask(**row)
async def get_scheduled_tasks(
- self, action: Optional[str] = None, resource_id: Optional[str] = None
+ self,
+ actions: Optional[List[str]] = None,
+ resource_ids: Optional[List[str]] = None,
+ statuses: Optional[List[TaskStatus]] = None,
) -> List[ScheduledTask]:
"""Get a list of scheduled tasks from the DB.
- If the parameters are `None` all the tasks are returned.
+ If an arg is `None` all tasks matching the other args will be selected.
+ If an arg is an empty list, the value needs to be NULL in DB to be selected.
Args:
- action: Limit the returned tasks to this specific action name
- resource_id: Limit the returned tasks to this specific resource id
+ actions: Limit the returned tasks to those specific action names
+ resource_ids: Limit the returned tasks to thoe specific resource ids
+ statuses: Limit the returned tasks to thoe specific statuses
Returns: a list of `ScheduledTask`
"""
- keyvalues = {}
- if action:
- keyvalues["action"] = action
- if resource_id:
- keyvalues["resource_id"] = resource_id
- rows = await self.db_pool.simple_select_list(
- table="scheduled_tasks",
- keyvalues=keyvalues,
- retcols=(
- "id",
- "action",
- "status",
- "timestamp",
- "resource_id",
- "params",
- "result",
- "error",
- ),
- desc="get_scheduled_tasks",
+ def get_scheduled_tasks_txn(txn: LoggingTransaction) -> List[Dict[str, Any]]:
+ clauses = []
+ args = []
+ if actions is not None:
+ clause, temp_args = make_in_list_sql_clause(
+ txn.database_engine, "action", actions
+ )
+ clauses.append(clause)
+ args.extend(temp_args)
+ if resource_ids is not None:
+ clause, temp_args = make_in_list_sql_clause(
+ txn.database_engine, "resource_id", resource_ids
+ )
+ clauses.append(clause)
+ args.extend(temp_args)
+ if statuses is not None:
+ clause, temp_args = make_in_list_sql_clause(
+ txn.database_engine, "status", statuses
+ )
+ clauses.append(clause)
+ args.extend(temp_args)
+
+ sql = "SELECT * FROM scheduled_tasks"
+ if clauses:
+ sql = sql + " WHERE " + " AND ".join(clauses)
+
+ txn.execute(sql, args)
+ return self.db_pool.cursor_to_dict(txn)
+
+ rows = await self.db_pool.runInteraction(
+ "get_scheduled_tasks", get_scheduled_tasks_txn
)
-
return [TaskSchedulerWorkerStore._convert_row_to_task(row) for row in rows]
async def upsert_scheduled_task(self, task: ScheduledTask) -> None:
@@ -107,7 +128,7 @@ class TaskSchedulerWorkerStore(SQLBaseStore):
status: Optional[TaskStatus] = None,
result: Optional[JsonMapping] = None,
error: Optional[str] = None,
- ) -> None:
+ ) -> bool:
"""Update a scheduled task in the DB with some new value(s).
Args:
@@ -126,12 +147,13 @@ class TaskSchedulerWorkerStore(SQLBaseStore):
updatevalues["result"] = json.dumps(result)
if error is not None:
updatevalues["error"] = error
- await self.db_pool.simple_update(
+ nb_rows = await self.db_pool.simple_update(
"scheduled_tasks",
{"id": id},
updatevalues,
desc="update_scheduled_task",
)
+ return nb_rows > 0
async def get_scheduled_task(self, id: str) -> Optional[ScheduledTask]:
"""Get a specific `ScheduledTask` from its id.
diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py
index 2c3abaaedf..034a986ff2 100644
--- a/synapse/util/task_scheduler.py
+++ b/synapse/util/task_scheduler.py
@@ -31,6 +31,7 @@ class TaskScheduler:
# Precision of the scheduler, evaluation of tasks to run will only happen
# every `SCHEDULE_INTERVAL_MS` ms
SCHEDULE_INTERVAL_MS = 5 * 60 * 1000 # 5mn
+ CLEAN_INTERVAL_MS = 60 * 60 * 1000 # 1hr
# Time before a complete or failed task is deleted from the DB
KEEP_TASKS_FOR_MS = 7 * 24 * 60 * 60 * 1000 # 1 week
@@ -51,8 +52,14 @@ class TaskScheduler:
self.clock.looping_call(
run_as_background_process,
TaskScheduler.SCHEDULE_INTERVAL_MS,
- "scheduled_tasks_loop",
- self._scheduled_tasks_loop,
+ "run_scheduled_tasks",
+ self._run_scheduled_tasks,
+ )
+ self.clock.looping_call(
+ run_as_background_process,
+ TaskScheduler.CLEAN_INTERVAL_MS,
+ "clean_scheduled_tasks",
+ self._clean_scheduled_tasks,
)
def register_action(
@@ -135,10 +142,11 @@ class TaskScheduler:
self,
id: str,
*,
+ timestamp: Optional[int] = None,
status: Optional[TaskStatus] = None,
result: Optional[JsonMapping] = None,
error: Optional[str] = None,
- ) -> None:
+ ) -> bool:
"""Update some task associated values.
This is used internally, and also exposed publically so it can be used inside task functions.
@@ -150,9 +158,11 @@ class TaskScheduler:
result: the new result of the task
error: the new error of the task
"""
- await self.store.update_scheduled_task(
+ if timestamp is None:
+ timestamp = self.clock.time_msec()
+ return await self.store.update_scheduled_task(
id,
- timestamp=self.clock.time_msec(),
+ timestamp=timestamp,
status=status,
result=result,
error=error,
@@ -170,10 +180,13 @@ class TaskScheduler:
return await self.store.get_scheduled_task(id)
async def get_tasks(
- self, action: str, resource_id: Optional[str]
+ self,
+ actions: Optional[List[str]] = None,
+ resource_ids: Optional[List[str]] = None,
+ statuses: Optional[List[TaskStatus]] = None,
) -> List[ScheduledTask]:
- """Get a list of tasks associated with an action name, and
- optionally with a resource id.
+ """Get a list of tasks associated with some action name(s) and/or
+ with some resource id(s).
Args:
action: the action name of the tasks to retrieve
@@ -182,11 +195,13 @@ class TaskScheduler:
Returns: a list of `ScheduledTask`
"""
- return await self.store.get_scheduled_tasks(action, resource_id)
+ return await self.store.get_scheduled_tasks(actions, resource_ids, statuses)
- async def _scheduled_tasks_loop(self) -> None:
+ async def _run_scheduled_tasks(self) -> None:
"""Main loop taking care of launching the scheduled tasks when needed."""
- for task in await self.store.get_scheduled_tasks():
+ for task in await self.store.get_scheduled_tasks(
+ statuses=[TaskStatus.SCHEDULED, TaskStatus.ACTIVE]
+ ):
if task.id not in self.running_tasks:
if (
task.status == TaskStatus.SCHEDULED
@@ -195,7 +210,14 @@ class TaskScheduler:
await self._launch_task(task, True)
elif task.status == TaskStatus.ACTIVE:
await self._launch_task(task, False)
- elif (
+
+ async def _clean_scheduled_tasks(self) -> None:
+ """Clean loop taking care of removing old complete or failed jobs to avoid clutter the DB."""
+ for task in await self.store.get_scheduled_tasks(
+ statuses=[TaskStatus.FAILED, TaskStatus.COMPLETE]
+ ):
+ if task.id not in self.running_tasks:
+ if (
task.status == TaskStatus.COMPLETE
or task.status == TaskStatus.FAILED
) and self.clock.time_msec() > task.timestamp + TaskScheduler.KEEP_TASKS_FOR_MS:
|