summary refs log tree commit diff
diff options
context:
space:
mode:
authorMathieu Velten <mathieuv@matrix.org>2023-07-24 21:54:40 +0200
committerMathieu Velten <mathieuv@matrix.org>2023-07-24 21:54:40 +0200
commitc8b8c96b6e9d819608abca6fdb7c22f6ba074a29 (patch)
tree6ca40a5d05398545e45151030ffe781095499c29
parentMerge branch 'mv/task-scheduler' into mv/purge-room-when-forgotten (diff)
parentAdd filters to task retrieval + clean less often (diff)
downloadsynapse-c8b8c96b6e9d819608abca6fdb7c22f6ba074a29.tar.xz
Merge branch 'mv/task-scheduler' into mv/purge-room-when-forgotten-wip
-rw-r--r--synapse/storage/databases/main/task_scheduler.py76
-rw-r--r--synapse/util/task_scheduler.py46
2 files changed, 83 insertions, 39 deletions
diff --git a/synapse/storage/databases/main/task_scheduler.py b/synapse/storage/databases/main/task_scheduler.py
index 37c2110bbd..93b7ac8fc2 100644
--- a/synapse/storage/databases/main/task_scheduler.py
+++ b/synapse/storage/databases/main/task_scheduler.py
@@ -16,7 +16,12 @@ import json
 from typing import TYPE_CHECKING, Any, Dict, List, Optional
 
 from synapse.storage._base import SQLBaseStore
-from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
+from synapse.storage.database import (
+    DatabasePool,
+    LoggingDatabaseConnection,
+    LoggingTransaction,
+    make_in_list_sql_clause,
+)
 from synapse.types import JsonDict, JsonMapping, ScheduledTask, TaskStatus
 
 if TYPE_CHECKING:
@@ -42,40 +47,56 @@ class TaskSchedulerWorkerStore(SQLBaseStore):
         return ScheduledTask(**row)
 
     async def get_scheduled_tasks(
-        self, action: Optional[str] = None, resource_id: Optional[str] = None
+        self,
+        actions: Optional[List[str]] = None,
+        resource_ids: Optional[List[str]] = None,
+        statuses: Optional[List[TaskStatus]] = None,
     ) -> List[ScheduledTask]:
         """Get a list of scheduled tasks from the DB.
 
-        If the parameters are `None` all the tasks are returned.
+        If an arg is `None` all tasks matching the other args will be selected.
+        If an arg is an empty list, the value needs to be NULL in DB to be selected.
 
         Args:
-            action: Limit the returned tasks to this specific action name
-            resource_id: Limit the returned tasks to this specific resource id
+            actions: Limit the returned tasks to those specific action names
+            resource_ids: Limit the returned tasks to thoe specific resource ids
+            statuses: Limit the returned tasks to thoe specific statuses
 
         Returns: a list of `ScheduledTask`
         """
-        keyvalues = {}
-        if action:
-            keyvalues["action"] = action
-        if resource_id:
-            keyvalues["resource_id"] = resource_id
 
-        rows = await self.db_pool.simple_select_list(
-            table="scheduled_tasks",
-            keyvalues=keyvalues,
-            retcols=(
-                "id",
-                "action",
-                "status",
-                "timestamp",
-                "resource_id",
-                "params",
-                "result",
-                "error",
-            ),
-            desc="get_scheduled_tasks",
+        def get_scheduled_tasks_txn(txn: LoggingTransaction) -> List[Dict[str, Any]]:
+            clauses = []
+            args = []
+            if actions is not None:
+                clause, temp_args = make_in_list_sql_clause(
+                    txn.database_engine, "action", actions
+                )
+                clauses.append(clause)
+                args.extend(temp_args)
+            if resource_ids is not None:
+                clause, temp_args = make_in_list_sql_clause(
+                    txn.database_engine, "resource_id", resource_ids
+                )
+                clauses.append(clause)
+                args.extend(temp_args)
+            if statuses is not None:
+                clause, temp_args = make_in_list_sql_clause(
+                    txn.database_engine, "status", statuses
+                )
+                clauses.append(clause)
+                args.extend(temp_args)
+
+            sql = "SELECT * FROM scheduled_tasks"
+            if clauses:
+                sql = sql + " WHERE " + " AND ".join(clauses)
+
+            txn.execute(sql, args)
+            return self.db_pool.cursor_to_dict(txn)
+
+        rows = await self.db_pool.runInteraction(
+            "get_scheduled_tasks", get_scheduled_tasks_txn
         )
-
         return [TaskSchedulerWorkerStore._convert_row_to_task(row) for row in rows]
 
     async def upsert_scheduled_task(self, task: ScheduledTask) -> None:
@@ -107,7 +128,7 @@ class TaskSchedulerWorkerStore(SQLBaseStore):
         status: Optional[TaskStatus] = None,
         result: Optional[JsonMapping] = None,
         error: Optional[str] = None,
-    ) -> None:
+    ) -> bool:
         """Update a scheduled task in the DB with some new value(s).
 
         Args:
@@ -126,12 +147,13 @@ class TaskSchedulerWorkerStore(SQLBaseStore):
             updatevalues["result"] = json.dumps(result)
         if error is not None:
             updatevalues["error"] = error
-        await self.db_pool.simple_update(
+        nb_rows = await self.db_pool.simple_update(
             "scheduled_tasks",
             {"id": id},
             updatevalues,
             desc="update_scheduled_task",
         )
+        return nb_rows > 0
 
     async def get_scheduled_task(self, id: str) -> Optional[ScheduledTask]:
         """Get a specific `ScheduledTask` from its id.
diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py
index 2c3abaaedf..034a986ff2 100644
--- a/synapse/util/task_scheduler.py
+++ b/synapse/util/task_scheduler.py
@@ -31,6 +31,7 @@ class TaskScheduler:
     # Precision of the scheduler, evaluation of tasks to run will only happen
     # every `SCHEDULE_INTERVAL_MS` ms
     SCHEDULE_INTERVAL_MS = 5 * 60 * 1000  # 5mn
+    CLEAN_INTERVAL_MS = 60 * 60 * 1000  # 1hr
     # Time before a complete or failed task is deleted from the DB
     KEEP_TASKS_FOR_MS = 7 * 24 * 60 * 60 * 1000  # 1 week
 
@@ -51,8 +52,14 @@ class TaskScheduler:
             self.clock.looping_call(
                 run_as_background_process,
                 TaskScheduler.SCHEDULE_INTERVAL_MS,
-                "scheduled_tasks_loop",
-                self._scheduled_tasks_loop,
+                "run_scheduled_tasks",
+                self._run_scheduled_tasks,
+            )
+            self.clock.looping_call(
+                run_as_background_process,
+                TaskScheduler.CLEAN_INTERVAL_MS,
+                "clean_scheduled_tasks",
+                self._clean_scheduled_tasks,
             )
 
     def register_action(
@@ -135,10 +142,11 @@ class TaskScheduler:
         self,
         id: str,
         *,
+        timestamp: Optional[int] = None,
         status: Optional[TaskStatus] = None,
         result: Optional[JsonMapping] = None,
         error: Optional[str] = None,
-    ) -> None:
+    ) -> bool:
         """Update some task associated values.
 
         This is used internally, and also exposed publically so it can be used inside task functions.
@@ -150,9 +158,11 @@ class TaskScheduler:
             result: the new result of the task
             error: the new error of the task
         """
-        await self.store.update_scheduled_task(
+        if timestamp is None:
+            timestamp = self.clock.time_msec()
+        return await self.store.update_scheduled_task(
             id,
-            timestamp=self.clock.time_msec(),
+            timestamp=timestamp,
             status=status,
             result=result,
             error=error,
@@ -170,10 +180,13 @@ class TaskScheduler:
         return await self.store.get_scheduled_task(id)
 
     async def get_tasks(
-        self, action: str, resource_id: Optional[str]
+        self,
+        actions: Optional[List[str]] = None,
+        resource_ids: Optional[List[str]] = None,
+        statuses: Optional[List[TaskStatus]] = None,
     ) -> List[ScheduledTask]:
-        """Get a list of tasks associated with an action name, and
-        optionally with a resource id.
+        """Get a list of tasks associated with some action name(s) and/or
+        with some resource id(s).
 
         Args:
             action: the action name of the tasks to retrieve
@@ -182,11 +195,13 @@ class TaskScheduler:
 
         Returns: a list of `ScheduledTask`
         """
-        return await self.store.get_scheduled_tasks(action, resource_id)
+        return await self.store.get_scheduled_tasks(actions, resource_ids, statuses)
 
-    async def _scheduled_tasks_loop(self) -> None:
+    async def _run_scheduled_tasks(self) -> None:
         """Main loop taking care of launching the scheduled tasks when needed."""
-        for task in await self.store.get_scheduled_tasks():
+        for task in await self.store.get_scheduled_tasks(
+            statuses=[TaskStatus.SCHEDULED, TaskStatus.ACTIVE]
+        ):
             if task.id not in self.running_tasks:
                 if (
                     task.status == TaskStatus.SCHEDULED
@@ -195,7 +210,14 @@ class TaskScheduler:
                     await self._launch_task(task, True)
                 elif task.status == TaskStatus.ACTIVE:
                     await self._launch_task(task, False)
-                elif (
+
+    async def _clean_scheduled_tasks(self) -> None:
+        """Clean loop taking care of removing old complete or failed jobs to avoid clutter the DB."""
+        for task in await self.store.get_scheduled_tasks(
+            statuses=[TaskStatus.FAILED, TaskStatus.COMPLETE]
+        ):
+            if task.id not in self.running_tasks:
+                if (
                     task.status == TaskStatus.COMPLETE
                     or task.status == TaskStatus.FAILED
                 ) and self.clock.time_msec() > task.timestamp + TaskScheduler.KEEP_TASKS_FOR_MS: