diff options
Diffstat (limited to 'synapse/util/task_scheduler.py')
-rw-r--r-- | synapse/util/task_scheduler.py | 100 |
1 files changed, 70 insertions, 30 deletions
diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py index b7de201bde..caf13b3474 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py @@ -15,12 +15,14 @@ import logging from typing import TYPE_CHECKING, Awaitable, Callable, Dict, List, Optional, Set, Tuple -from prometheus_client import Gauge - from twisted.python.failure import Failure from synapse.logging.context import nested_logging_context -from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.metrics import LaterGauge +from synapse.metrics.background_process_metrics import ( + run_as_background_process, + wrap_as_background_process, +) from synapse.types import JsonMapping, ScheduledTask, TaskStatus from synapse.util.stringutils import random_string @@ -30,12 +32,6 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) -running_tasks_gauge = Gauge( - "synapse_scheduler_running_tasks", - "The number of concurrent running tasks handled by the TaskScheduler", -) - - class TaskScheduler: """ This is a simple task sheduler aimed at resumable tasks: usually we use `run_in_background` @@ -70,6 +66,8 @@ class TaskScheduler: # Precision of the scheduler, evaluation of tasks to run will only happen # every `SCHEDULE_INTERVAL_MS` ms SCHEDULE_INTERVAL_MS = 1 * 60 * 1000 # 1mn + # How often to clean up old tasks. + CLEANUP_INTERVAL_MS = 30 * 60 * 1000 # Time before a complete or failed task is deleted from the DB KEEP_TASKS_FOR_MS = 7 * 24 * 60 * 60 * 1000 # 1 week # Maximum number of tasks that can run at the same time @@ -92,14 +90,26 @@ class TaskScheduler: ] = {} self._run_background_tasks = hs.config.worker.run_background_tasks + # Flag to make sure we only try and launch new tasks once at a time. + self._launching_new_tasks = False + if self._run_background_tasks: self._clock.looping_call( - run_as_background_process, + self._launch_scheduled_tasks, + TaskScheduler.SCHEDULE_INTERVAL_MS, + ) + self._clock.looping_call( + self._clean_scheduled_tasks, TaskScheduler.SCHEDULE_INTERVAL_MS, - "handle_scheduled_tasks", - self._handle_scheduled_tasks, ) + LaterGauge( + "synapse_scheduler_running_tasks", + "The number of concurrent running tasks handled by the TaskScheduler", + labels=None, + caller=lambda: len(self._running_tasks), + ) + def register_action( self, function: Callable[ @@ -234,6 +244,7 @@ class TaskScheduler: resource_id: Optional[str] = None, statuses: Optional[List[TaskStatus]] = None, max_timestamp: Optional[int] = None, + limit: Optional[int] = None, ) -> List[ScheduledTask]: """Get a list of tasks. Returns all the tasks if no args is provided. @@ -247,6 +258,7 @@ class TaskScheduler: statuses: Limit the returned tasks to the specific statuses max_timestamp: Limit the returned tasks to the ones that have a timestamp inferior to the specified one + limit: Only return `limit` number of rows if set. Returns A list of `ScheduledTask`, ordered by increasing timestamps @@ -256,6 +268,7 @@ class TaskScheduler: resource_id=resource_id, statuses=statuses, max_timestamp=max_timestamp, + limit=limit, ) async def delete_task(self, id: str) -> None: @@ -273,34 +286,58 @@ class TaskScheduler: raise Exception(f"Task {id} is currently ACTIVE and can't be deleted") await self._store.delete_scheduled_task(id) - async def _handle_scheduled_tasks(self) -> None: - """Main loop taking care of launching tasks and cleaning up old ones.""" - await self._launch_scheduled_tasks() - await self._clean_scheduled_tasks() + def launch_task_by_id(self, id: str) -> None: + """Try launching the task with the given ID.""" + # Don't bother trying to launch new tasks if we're already at capacity. + if len(self._running_tasks) >= TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS: + return + + run_as_background_process("launch_task_by_id", self._launch_task_by_id, id) + + async def _launch_task_by_id(self, id: str) -> None: + """Helper async function for `launch_task_by_id`.""" + task = await self.get_task(id) + if task: + await self._launch_task(task) + @wrap_as_background_process("launch_scheduled_tasks") async def _launch_scheduled_tasks(self) -> None: """Retrieve and launch scheduled tasks that should be running at that time.""" - for task in await self.get_tasks(statuses=[TaskStatus.ACTIVE]): - await self._launch_task(task) - for task in await self.get_tasks( - statuses=[TaskStatus.SCHEDULED], max_timestamp=self._clock.time_msec() - ): - await self._launch_task(task) + # Don't bother trying to launch new tasks if we're already at capacity. + if len(self._running_tasks) >= TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS: + return + + if self._launching_new_tasks: + return - running_tasks_gauge.set(len(self._running_tasks)) + self._launching_new_tasks = True + try: + for task in await self.get_tasks( + statuses=[TaskStatus.ACTIVE], limit=self.MAX_CONCURRENT_RUNNING_TASKS + ): + await self._launch_task(task) + for task in await self.get_tasks( + statuses=[TaskStatus.SCHEDULED], + max_timestamp=self._clock.time_msec(), + limit=self.MAX_CONCURRENT_RUNNING_TASKS, + ): + await self._launch_task(task) + + finally: + self._launching_new_tasks = False + + @wrap_as_background_process("clean_scheduled_tasks") async def _clean_scheduled_tasks(self) -> None: """Clean old complete or failed jobs to avoid clutter the DB.""" + now = self._clock.time_msec() for task in await self._store.get_scheduled_tasks( - statuses=[TaskStatus.FAILED, TaskStatus.COMPLETE] + statuses=[TaskStatus.FAILED, TaskStatus.COMPLETE], + max_timestamp=now - TaskScheduler.KEEP_TASKS_FOR_MS, ): # FAILED and COMPLETE tasks should never be running assert task.id not in self._running_tasks - if ( - self._clock.time_msec() - > task.timestamp + TaskScheduler.KEEP_TASKS_FOR_MS - ): - await self._store.delete_scheduled_task(task.id) + await self._store.delete_scheduled_task(task.id) async def _launch_task(self, task: ScheduledTask) -> None: """Launch a scheduled task now. @@ -339,6 +376,9 @@ class TaskScheduler: ) self._running_tasks.remove(task.id) + # Try launch a new task since we've finished with this one. + self._clock.call_later(1, self._launch_scheduled_tasks) + if len(self._running_tasks) >= TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS: return @@ -355,4 +395,4 @@ class TaskScheduler: self._running_tasks.add(task.id) await self.update_task(task.id, status=TaskStatus.ACTIVE) - run_as_background_process(task.action, wrapper) + run_as_background_process(f"task-{task.action}", wrapper) |