summary refs log tree commit diff
path: root/synapse/util/task_scheduler.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/util/task_scheduler.py')
-rw-r--r--synapse/util/task_scheduler.py100
1 files changed, 70 insertions, 30 deletions
diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py
index b7de201bde..caf13b3474 100644
--- a/synapse/util/task_scheduler.py
+++ b/synapse/util/task_scheduler.py
@@ -15,12 +15,14 @@
 import logging
 from typing import TYPE_CHECKING, Awaitable, Callable, Dict, List, Optional, Set, Tuple
 
-from prometheus_client import Gauge
-
 from twisted.python.failure import Failure
 
 from synapse.logging.context import nested_logging_context
-from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.metrics import LaterGauge
+from synapse.metrics.background_process_metrics import (
+    run_as_background_process,
+    wrap_as_background_process,
+)
 from synapse.types import JsonMapping, ScheduledTask, TaskStatus
 from synapse.util.stringutils import random_string
 
@@ -30,12 +32,6 @@ if TYPE_CHECKING:
 logger = logging.getLogger(__name__)
 
 
-running_tasks_gauge = Gauge(
-    "synapse_scheduler_running_tasks",
-    "The number of concurrent running tasks handled by the TaskScheduler",
-)
-
-
 class TaskScheduler:
     """
     This is a simple task sheduler aimed at resumable tasks: usually we use `run_in_background`
@@ -70,6 +66,8 @@ class TaskScheduler:
     # Precision of the scheduler, evaluation of tasks to run will only happen
     # every `SCHEDULE_INTERVAL_MS` ms
     SCHEDULE_INTERVAL_MS = 1 * 60 * 1000  # 1mn
+    # How often to clean up old tasks.
+    CLEANUP_INTERVAL_MS = 30 * 60 * 1000
     # Time before a complete or failed task is deleted from the DB
     KEEP_TASKS_FOR_MS = 7 * 24 * 60 * 60 * 1000  # 1 week
     # Maximum number of tasks that can run at the same time
@@ -92,14 +90,26 @@ class TaskScheduler:
         ] = {}
         self._run_background_tasks = hs.config.worker.run_background_tasks
 
+        # Flag to make sure we only try and launch new tasks once at a time.
+        self._launching_new_tasks = False
+
         if self._run_background_tasks:
             self._clock.looping_call(
-                run_as_background_process,
+                self._launch_scheduled_tasks,
+                TaskScheduler.SCHEDULE_INTERVAL_MS,
+            )
+            self._clock.looping_call(
+                self._clean_scheduled_tasks,
                 TaskScheduler.SCHEDULE_INTERVAL_MS,
-                "handle_scheduled_tasks",
-                self._handle_scheduled_tasks,
             )
 
+        LaterGauge(
+            "synapse_scheduler_running_tasks",
+            "The number of concurrent running tasks handled by the TaskScheduler",
+            labels=None,
+            caller=lambda: len(self._running_tasks),
+        )
+
     def register_action(
         self,
         function: Callable[
@@ -234,6 +244,7 @@ class TaskScheduler:
         resource_id: Optional[str] = None,
         statuses: Optional[List[TaskStatus]] = None,
         max_timestamp: Optional[int] = None,
+        limit: Optional[int] = None,
     ) -> List[ScheduledTask]:
         """Get a list of tasks. Returns all the tasks if no args is provided.
 
@@ -247,6 +258,7 @@ class TaskScheduler:
             statuses: Limit the returned tasks to the specific statuses
             max_timestamp: Limit the returned tasks to the ones that have
                 a timestamp inferior to the specified one
+            limit: Only return `limit` number of rows if set.
 
         Returns
             A list of `ScheduledTask`, ordered by increasing timestamps
@@ -256,6 +268,7 @@ class TaskScheduler:
             resource_id=resource_id,
             statuses=statuses,
             max_timestamp=max_timestamp,
+            limit=limit,
         )
 
     async def delete_task(self, id: str) -> None:
@@ -273,34 +286,58 @@ class TaskScheduler:
             raise Exception(f"Task {id} is currently ACTIVE and can't be deleted")
         await self._store.delete_scheduled_task(id)
 
-    async def _handle_scheduled_tasks(self) -> None:
-        """Main loop taking care of launching tasks and cleaning up old ones."""
-        await self._launch_scheduled_tasks()
-        await self._clean_scheduled_tasks()
+    def launch_task_by_id(self, id: str) -> None:
+        """Try launching the task with the given ID."""
+        # Don't bother trying to launch new tasks if we're already at capacity.
+        if len(self._running_tasks) >= TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS:
+            return
+
+        run_as_background_process("launch_task_by_id", self._launch_task_by_id, id)
+
+    async def _launch_task_by_id(self, id: str) -> None:
+        """Helper async function for `launch_task_by_id`."""
+        task = await self.get_task(id)
+        if task:
+            await self._launch_task(task)
 
+    @wrap_as_background_process("launch_scheduled_tasks")
     async def _launch_scheduled_tasks(self) -> None:
         """Retrieve and launch scheduled tasks that should be running at that time."""
-        for task in await self.get_tasks(statuses=[TaskStatus.ACTIVE]):
-            await self._launch_task(task)
-        for task in await self.get_tasks(
-            statuses=[TaskStatus.SCHEDULED], max_timestamp=self._clock.time_msec()
-        ):
-            await self._launch_task(task)
+        # Don't bother trying to launch new tasks if we're already at capacity.
+        if len(self._running_tasks) >= TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS:
+            return
+
+        if self._launching_new_tasks:
+            return
 
-        running_tasks_gauge.set(len(self._running_tasks))
+        self._launching_new_tasks = True
 
+        try:
+            for task in await self.get_tasks(
+                statuses=[TaskStatus.ACTIVE], limit=self.MAX_CONCURRENT_RUNNING_TASKS
+            ):
+                await self._launch_task(task)
+            for task in await self.get_tasks(
+                statuses=[TaskStatus.SCHEDULED],
+                max_timestamp=self._clock.time_msec(),
+                limit=self.MAX_CONCURRENT_RUNNING_TASKS,
+            ):
+                await self._launch_task(task)
+
+        finally:
+            self._launching_new_tasks = False
+
+    @wrap_as_background_process("clean_scheduled_tasks")
     async def _clean_scheduled_tasks(self) -> None:
         """Clean old complete or failed jobs to avoid clutter the DB."""
+        now = self._clock.time_msec()
         for task in await self._store.get_scheduled_tasks(
-            statuses=[TaskStatus.FAILED, TaskStatus.COMPLETE]
+            statuses=[TaskStatus.FAILED, TaskStatus.COMPLETE],
+            max_timestamp=now - TaskScheduler.KEEP_TASKS_FOR_MS,
         ):
             # FAILED and COMPLETE tasks should never be running
             assert task.id not in self._running_tasks
-            if (
-                self._clock.time_msec()
-                > task.timestamp + TaskScheduler.KEEP_TASKS_FOR_MS
-            ):
-                await self._store.delete_scheduled_task(task.id)
+            await self._store.delete_scheduled_task(task.id)
 
     async def _launch_task(self, task: ScheduledTask) -> None:
         """Launch a scheduled task now.
@@ -339,6 +376,9 @@ class TaskScheduler:
                 )
                 self._running_tasks.remove(task.id)
 
+            # Try launch a new task since we've finished with this one.
+            self._clock.call_later(1, self._launch_scheduled_tasks)
+
         if len(self._running_tasks) >= TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS:
             return
 
@@ -355,4 +395,4 @@ class TaskScheduler:
 
         self._running_tasks.add(task.id)
         await self.update_task(task.id, status=TaskStatus.ACTIVE)
-        run_as_background_process(task.action, wrapper)
+        run_as_background_process(f"task-{task.action}", wrapper)