summary refs log tree commit diff
path: root/synapse/util/task_scheduler.py
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/util/task_scheduler.py155
1 files changed, 85 insertions, 70 deletions
diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py

index 448960b297..4683d09cd7 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py
@@ -46,33 +46,43 @@ logger = logging.getLogger(__name__) class TaskScheduler: """ - This is a simple task sheduler aimed at resumable tasks: usually we use `run_in_background` - to launch a background task, or Twisted `deferLater` if we want to do so later on. - - The problem with that is that the tasks will just stop and never be resumed if synapse - is stopped for whatever reason. - - How this works: - - A function mapped to a named action should first be registered with `register_action`. - This function will be called when trying to resuming tasks after a synapse shutdown, - so this registration should happen when synapse is initialised, NOT right before scheduling - a task. - - A task can then be launched using this named action with `schedule_task`. A `params` dict - can be passed, and it will be available to the registered function when launched. This task - can be launch either now-ish, or later on by giving a `timestamp` parameter. - - The function may call `update_task` at any time to update the `result` of the task, - and this can be used to resume the task at a specific point and/or to convey a result to - the code launching the task. - You can also specify the `result` (and/or an `error`) when returning from the function. - - The reconciliation loop runs every minute, so this is not a precise scheduler. - There is a limit of 10 concurrent tasks, so tasks may be delayed if the pool is already - full. In this regard, please take great care that scheduled tasks can actually finished. - For now there is no mechanism to stop a running task if it is stuck. - - Tasks will be run on the worker specified with `run_background_tasks_on` config, - or the main one by default. + This is a simple task scheduler designed for resumable tasks. Normally, + you'd use `run_in_background` to start a background task or Twisted's + `deferLater` if you want to run it later. + + The issue is that these tasks stop completely and won't resume if Synapse is + shut down for any reason. + + Here's how it works: + + - Register an Action: First, you need to register a function to a named + action using `register_action`. This function will be called to resume tasks + after a Synapse shutdown. Make sure to register it when Synapse initializes, + not right before scheduling the task. + + - Schedule a Task: You can launch a task linked to the named action + using `schedule_task`. You can pass a `params` dictionary, which will be + passed to the registered function when it's executed. Tasks can be scheduled + to run either immediately or later by specifying a `timestamp`. + + - Update Task: The function handling the task can call `update_task` at + any point to update the task's `result`. This lets you resume the task from + a specific point or pass results back to the code that scheduled it. When + the function completes, you can also return a `result` or an `error`. + + Things to keep in mind: + + - The reconciliation loop runs every minute, so this is not a high-precision + scheduler. + + - Only 10 tasks can run at the same time. If the pool is full, tasks may be + delayed. Make sure your scheduled tasks can actually finish. + + - Currently, there's no way to stop a task if it gets stuck. + + - Tasks will run on the worker defined by the `run_background_tasks_on` + setting in your configuration. If no worker is specified, they'll run on + the main one by default. """ # Precision of the scheduler, evaluation of tasks to run will only happen @@ -157,7 +167,7 @@ class TaskScheduler: params: Optional[JsonMapping] = None, ) -> str: """Schedule a new potentially resumable task. A function matching the specified - `action` should have be registered with `register_action` before the task is run. + `action` should've been registered with `register_action` before the task is run. Args: action: the name of a previously registered action @@ -174,9 +184,10 @@ class TaskScheduler: The id of the scheduled task """ status = TaskStatus.SCHEDULED + start_now = False if timestamp is None or timestamp < self._clock.time_msec(): timestamp = self._clock.time_msec() - status = TaskStatus.ACTIVE + start_now = True task = ScheduledTask( random_string(16), @@ -190,9 +201,11 @@ class TaskScheduler: ) await self._store.insert_scheduled_task(task) - if status == TaskStatus.ACTIVE: + # If the task is ready to run immediately, run the scheduling algorithm now + # rather than waiting + if start_now: if self._run_background_tasks: - await self._launch_task(task) + self._launch_scheduled_tasks() else: self._hs.get_replication_command_handler().send_new_active_task(task.id) @@ -207,15 +220,15 @@ class TaskScheduler: result: Optional[JsonMapping] = None, error: Optional[str] = None, ) -> bool: - """Update some task associated values. This is exposed publicly so it can - be used inside task functions, mainly to update the result and be able to - resume a task at a specific step after a restart of synapse. + """Update some task-associated values. This is exposed publicly so it can + be used inside task functions, mainly to update the result or resume + a task at a specific step after a restart of synapse. It can also be used to stage a task, by setting the `status` to `SCHEDULED` with a new timestamp. - The `status` can only be set to `ACTIVE` or `SCHEDULED`, `COMPLETE` and `FAILED` - are terminal status and can only be set by returning it in the function. + The `status` can only be set to `ACTIVE` or `SCHEDULED`. `COMPLETE` and `FAILED` + are terminal statuses and can only be set by returning them from the function. Args: id: the id of the task to update @@ -223,6 +236,12 @@ class TaskScheduler: status: the new `TaskStatus` of the task result: the new result of the task error: the new error of the task + + Returns: + True if the update was successful, False otherwise. + + Raises: + Exception: If a status other than `ACTIVE` or `SCHEDULED` was passed. """ if status == TaskStatus.COMPLETE or status == TaskStatus.FAILED: raise Exception( @@ -260,9 +279,9 @@ class TaskScheduler: max_timestamp: Optional[int] = None, limit: Optional[int] = None, ) -> List[ScheduledTask]: - """Get a list of tasks. Returns all the tasks if no args is provided. + """Get a list of tasks. Returns all the tasks if no args are provided. - If an arg is `None` all tasks matching the other args will be selected. + If an arg is `None`, all tasks matching the other args will be selected. If an arg is an empty list, the corresponding value of the task needs to be `None` to be selected. @@ -274,8 +293,8 @@ class TaskScheduler: a timestamp inferior to the specified one limit: Only return `limit` number of rows if set. - Returns - A list of `ScheduledTask`, ordered by increasing timestamps + Returns: + A list of `ScheduledTask`, ordered by increasing timestamps. """ return await self._store.get_scheduled_tasks( actions=actions, @@ -300,23 +319,13 @@ class TaskScheduler: raise Exception(f"Task {id} is currently ACTIVE and can't be deleted") await self._store.delete_scheduled_task(id) - def launch_task_by_id(self, id: str) -> None: - """Try launching the task with the given ID.""" - # Don't bother trying to launch new tasks if we're already at capacity. - if len(self._running_tasks) >= TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS: - return - - run_as_background_process("launch_task_by_id", self._launch_task_by_id, id) + def on_new_task(self, task_id: str) -> None: + """Handle a notification that a new ready-to-run task has been added to the queue""" + # Just run the scheduler + self._launch_scheduled_tasks() - async def _launch_task_by_id(self, id: str) -> None: - """Helper async function for `launch_task_by_id`.""" - task = await self.get_task(id) - if task: - await self._launch_task(task) - - @wrap_as_background_process("launch_scheduled_tasks") - async def _launch_scheduled_tasks(self) -> None: - """Retrieve and launch scheduled tasks that should be running at that time.""" + def _launch_scheduled_tasks(self) -> None: + """Retrieve and launch scheduled tasks that should be running at this time.""" # Don't bother trying to launch new tasks if we're already at capacity. if len(self._running_tasks) >= TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS: return @@ -326,20 +335,26 @@ class TaskScheduler: self._launching_new_tasks = True - try: - for task in await self.get_tasks( - statuses=[TaskStatus.ACTIVE], limit=self.MAX_CONCURRENT_RUNNING_TASKS - ): - await self._launch_task(task) - for task in await self.get_tasks( - statuses=[TaskStatus.SCHEDULED], - max_timestamp=self._clock.time_msec(), - limit=self.MAX_CONCURRENT_RUNNING_TASKS, - ): - await self._launch_task(task) - - finally: - self._launching_new_tasks = False + async def inner() -> None: + try: + for task in await self.get_tasks( + statuses=[TaskStatus.ACTIVE], + limit=self.MAX_CONCURRENT_RUNNING_TASKS, + ): + # _launch_task will ignore tasks that we're already running, and + # will also do nothing if we're already at the maximum capacity. + await self._launch_task(task) + for task in await self.get_tasks( + statuses=[TaskStatus.SCHEDULED], + max_timestamp=self._clock.time_msec(), + limit=self.MAX_CONCURRENT_RUNNING_TASKS, + ): + await self._launch_task(task) + + finally: + self._launching_new_tasks = False + + run_as_background_process("launch_scheduled_tasks", inner) @wrap_as_background_process("clean_scheduled_tasks") async def _clean_scheduled_tasks(self) -> None: