summary refs log tree commit diff
path: root/synapse/util/task_scheduler.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/util/task_scheduler.py')
-rw-r--r--synapse/util/task_scheduler.py60
1 files changed, 29 insertions, 31 deletions
diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py

index 9e89aeb748..b7de201bde 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py
@@ -19,6 +19,7 @@ from prometheus_client import Gauge from twisted.python.failure import Failure +from synapse.logging.context import nested_logging_context from synapse.metrics.background_process_metrics import run_as_background_process from synapse.types import JsonMapping, ScheduledTask, TaskStatus from synapse.util.stringutils import random_string @@ -77,6 +78,7 @@ class TaskScheduler: LAST_UPDATE_BEFORE_WARNING_MS = 24 * 60 * 60 * 1000 # 24hrs def __init__(self, hs: "HomeServer"): + self._hs = hs self._store = hs.get_datastores().main self._clock = hs.get_clock() self._running_tasks: Set[str] = set() @@ -97,8 +99,6 @@ class TaskScheduler: "handle_scheduled_tasks", self._handle_scheduled_tasks, ) - else: - self.replication_client = hs.get_replication_command_handler() def register_action( self, @@ -133,7 +133,7 @@ class TaskScheduler: params: Optional[JsonMapping] = None, ) -> str: """Schedule a new potentially resumable task. A function matching the specified - `action` should have been previously registered with `register_action`. + `action` should have be registered with `register_action` before the task is run. Args: action: the name of a previously registered action @@ -149,11 +149,6 @@ class TaskScheduler: Returns: The id of the scheduled task """ - if action not in self._actions: - raise Exception( - f"No function associated with action {action} of the scheduled task" - ) - status = TaskStatus.SCHEDULED if timestamp is None or timestamp < self._clock.time_msec(): timestamp = self._clock.time_msec() @@ -175,7 +170,7 @@ class TaskScheduler: if self._run_background_tasks: await self._launch_task(task) else: - self.replication_client.send_new_active_task(task.id) + self._hs.get_replication_command_handler().send_new_active_task(task.id) return task.id @@ -315,30 +310,34 @@ class TaskScheduler: """ assert self._run_background_tasks - assert task.action in self._actions + if task.action not in self._actions: + raise Exception( + f"No function associated with action {task.action} of the scheduled task {task.id}" + ) function = self._actions[task.action] async def wrapper() -> None: - try: - (status, result, error) = await function(task) - except Exception: - f = Failure() - logger.error( - f"scheduled task {task.id} failed", - exc_info=(f.type, f.value, f.getTracebackObject()), + with nested_logging_context(task.id): + try: + (status, result, error) = await function(task) + except Exception: + f = Failure() + logger.error( + f"scheduled task {task.id} failed", + exc_info=(f.type, f.value, f.getTracebackObject()), + ) + status = TaskStatus.FAILED + result = None + error = f.getErrorMessage() + + await self._store.update_scheduled_task( + task.id, + self._clock.time_msec(), + status=status, + result=result, + error=error, ) - status = TaskStatus.FAILED - result = None - error = f.getErrorMessage() - - await self._store.update_scheduled_task( - task.id, - self._clock.time_msec(), - status=status, - result=result, - error=error, - ) - self._running_tasks.remove(task.id) + self._running_tasks.remove(task.id) if len(self._running_tasks) >= TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS: return @@ -356,5 +355,4 @@ class TaskScheduler: self._running_tasks.add(task.id) await self.update_task(task.id, status=TaskStatus.ACTIVE) - description = f"{task.id}-{task.action}" - run_as_background_process(description, wrapper) + run_as_background_process(task.action, wrapper)