diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py
index 448960b297..4683d09cd7 100644
--- a/synapse/util/task_scheduler.py
+++ b/synapse/util/task_scheduler.py
@@ -46,33 +46,43 @@ logger = logging.getLogger(__name__)
class TaskScheduler:
"""
- This is a simple task sheduler aimed at resumable tasks: usually we use `run_in_background`
- to launch a background task, or Twisted `deferLater` if we want to do so later on.
-
- The problem with that is that the tasks will just stop and never be resumed if synapse
- is stopped for whatever reason.
-
- How this works:
- - A function mapped to a named action should first be registered with `register_action`.
- This function will be called when trying to resuming tasks after a synapse shutdown,
- so this registration should happen when synapse is initialised, NOT right before scheduling
- a task.
- - A task can then be launched using this named action with `schedule_task`. A `params` dict
- can be passed, and it will be available to the registered function when launched. This task
- can be launch either now-ish, or later on by giving a `timestamp` parameter.
-
- The function may call `update_task` at any time to update the `result` of the task,
- and this can be used to resume the task at a specific point and/or to convey a result to
- the code launching the task.
- You can also specify the `result` (and/or an `error`) when returning from the function.
-
- The reconciliation loop runs every minute, so this is not a precise scheduler.
- There is a limit of 10 concurrent tasks, so tasks may be delayed if the pool is already
- full. In this regard, please take great care that scheduled tasks can actually finished.
- For now there is no mechanism to stop a running task if it is stuck.
-
- Tasks will be run on the worker specified with `run_background_tasks_on` config,
- or the main one by default.
+ This is a simple task scheduler designed for resumable tasks. Normally,
+ you'd use `run_in_background` to start a background task or Twisted's
+ `deferLater` if you want to run it later.
+
+ The issue is that these tasks stop completely and won't resume if Synapse is
+ shut down for any reason.
+
+ Here's how it works:
+
+ - Register an Action: First, you need to register a function to a named
+ action using `register_action`. This function will be called to resume tasks
+ after a Synapse shutdown. Make sure to register it when Synapse initializes,
+ not right before scheduling the task.
+
+ - Schedule a Task: You can launch a task linked to the named action
+ using `schedule_task`. You can pass a `params` dictionary, which will be
+ passed to the registered function when it's executed. Tasks can be scheduled
+ to run either immediately or later by specifying a `timestamp`.
+
+ - Update Task: The function handling the task can call `update_task` at
+ any point to update the task's `result`. This lets you resume the task from
+ a specific point or pass results back to the code that scheduled it. When
+ the function completes, you can also return a `result` or an `error`.
+
+ Things to keep in mind:
+
+ - The reconciliation loop runs every minute, so this is not a high-precision
+ scheduler.
+
+ - Only 10 tasks can run at the same time. If the pool is full, tasks may be
+ delayed. Make sure your scheduled tasks can actually finish.
+
+ - Currently, there's no way to stop a task if it gets stuck.
+
+ - Tasks will run on the worker defined by the `run_background_tasks_on`
+ setting in your configuration. If no worker is specified, they'll run on
+ the main one by default.
"""
# Precision of the scheduler, evaluation of tasks to run will only happen
@@ -157,7 +167,7 @@ class TaskScheduler:
params: Optional[JsonMapping] = None,
) -> str:
"""Schedule a new potentially resumable task. A function matching the specified
- `action` should have be registered with `register_action` before the task is run.
+ `action` should've been registered with `register_action` before the task is run.
Args:
action: the name of a previously registered action
@@ -174,9 +184,10 @@ class TaskScheduler:
The id of the scheduled task
"""
status = TaskStatus.SCHEDULED
+ start_now = False
if timestamp is None or timestamp < self._clock.time_msec():
timestamp = self._clock.time_msec()
- status = TaskStatus.ACTIVE
+ start_now = True
task = ScheduledTask(
random_string(16),
@@ -190,9 +201,11 @@ class TaskScheduler:
)
await self._store.insert_scheduled_task(task)
- if status == TaskStatus.ACTIVE:
+ # If the task is ready to run immediately, run the scheduling algorithm now
+ # rather than waiting
+ if start_now:
if self._run_background_tasks:
- await self._launch_task(task)
+ self._launch_scheduled_tasks()
else:
self._hs.get_replication_command_handler().send_new_active_task(task.id)
@@ -207,15 +220,15 @@ class TaskScheduler:
result: Optional[JsonMapping] = None,
error: Optional[str] = None,
) -> bool:
- """Update some task associated values. This is exposed publicly so it can
- be used inside task functions, mainly to update the result and be able to
- resume a task at a specific step after a restart of synapse.
+ """Update some task-associated values. This is exposed publicly so it can
+ be used inside task functions, mainly to update the result or resume
+ a task at a specific step after a restart of synapse.
It can also be used to stage a task, by setting the `status` to `SCHEDULED` with
a new timestamp.
- The `status` can only be set to `ACTIVE` or `SCHEDULED`, `COMPLETE` and `FAILED`
- are terminal status and can only be set by returning it in the function.
+ The `status` can only be set to `ACTIVE` or `SCHEDULED`. `COMPLETE` and `FAILED`
+ are terminal statuses and can only be set by returning them from the function.
Args:
id: the id of the task to update
@@ -223,6 +236,12 @@ class TaskScheduler:
status: the new `TaskStatus` of the task
result: the new result of the task
error: the new error of the task
+
+ Returns:
+ True if the update was successful, False otherwise.
+
+ Raises:
+ Exception: If a status other than `ACTIVE` or `SCHEDULED` was passed.
"""
if status == TaskStatus.COMPLETE or status == TaskStatus.FAILED:
raise Exception(
@@ -260,9 +279,9 @@ class TaskScheduler:
max_timestamp: Optional[int] = None,
limit: Optional[int] = None,
) -> List[ScheduledTask]:
- """Get a list of tasks. Returns all the tasks if no args is provided.
+ """Get a list of tasks. Returns all the tasks if no args are provided.
- If an arg is `None` all tasks matching the other args will be selected.
+ If an arg is `None`, all tasks matching the other args will be selected.
If an arg is an empty list, the corresponding value of the task needs
to be `None` to be selected.
@@ -274,8 +293,8 @@ class TaskScheduler:
a timestamp inferior to the specified one
limit: Only return `limit` number of rows if set.
- Returns
- A list of `ScheduledTask`, ordered by increasing timestamps
+ Returns:
+ A list of `ScheduledTask`, ordered by increasing timestamps.
"""
return await self._store.get_scheduled_tasks(
actions=actions,
@@ -300,23 +319,13 @@ class TaskScheduler:
raise Exception(f"Task {id} is currently ACTIVE and can't be deleted")
await self._store.delete_scheduled_task(id)
- def launch_task_by_id(self, id: str) -> None:
- """Try launching the task with the given ID."""
- # Don't bother trying to launch new tasks if we're already at capacity.
- if len(self._running_tasks) >= TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS:
- return
-
- run_as_background_process("launch_task_by_id", self._launch_task_by_id, id)
+ def on_new_task(self, task_id: str) -> None:
+ """Handle a notification that a new ready-to-run task has been added to the queue"""
+ # Just run the scheduler
+ self._launch_scheduled_tasks()
- async def _launch_task_by_id(self, id: str) -> None:
- """Helper async function for `launch_task_by_id`."""
- task = await self.get_task(id)
- if task:
- await self._launch_task(task)
-
- @wrap_as_background_process("launch_scheduled_tasks")
- async def _launch_scheduled_tasks(self) -> None:
- """Retrieve and launch scheduled tasks that should be running at that time."""
+ def _launch_scheduled_tasks(self) -> None:
+ """Retrieve and launch scheduled tasks that should be running at this time."""
# Don't bother trying to launch new tasks if we're already at capacity.
if len(self._running_tasks) >= TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS:
return
@@ -326,20 +335,26 @@ class TaskScheduler:
self._launching_new_tasks = True
- try:
- for task in await self.get_tasks(
- statuses=[TaskStatus.ACTIVE], limit=self.MAX_CONCURRENT_RUNNING_TASKS
- ):
- await self._launch_task(task)
- for task in await self.get_tasks(
- statuses=[TaskStatus.SCHEDULED],
- max_timestamp=self._clock.time_msec(),
- limit=self.MAX_CONCURRENT_RUNNING_TASKS,
- ):
- await self._launch_task(task)
-
- finally:
- self._launching_new_tasks = False
+ async def inner() -> None:
+ try:
+ for task in await self.get_tasks(
+ statuses=[TaskStatus.ACTIVE],
+ limit=self.MAX_CONCURRENT_RUNNING_TASKS,
+ ):
+ # _launch_task will ignore tasks that we're already running, and
+ # will also do nothing if we're already at the maximum capacity.
+ await self._launch_task(task)
+ for task in await self.get_tasks(
+ statuses=[TaskStatus.SCHEDULED],
+ max_timestamp=self._clock.time_msec(),
+ limit=self.MAX_CONCURRENT_RUNNING_TASKS,
+ ):
+ await self._launch_task(task)
+
+ finally:
+ self._launching_new_tasks = False
+
+ run_as_background_process("launch_scheduled_tasks", inner)
@wrap_as_background_process("clean_scheduled_tasks")
async def _clean_scheduled_tasks(self) -> None:
|