diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py
index 9e89aeb748..b7de201bde 100644
--- a/synapse/util/task_scheduler.py
+++ b/synapse/util/task_scheduler.py
@@ -19,6 +19,7 @@ from prometheus_client import Gauge
from twisted.python.failure import Failure
+from synapse.logging.context import nested_logging_context
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import JsonMapping, ScheduledTask, TaskStatus
from synapse.util.stringutils import random_string
@@ -77,6 +78,7 @@ class TaskScheduler:
LAST_UPDATE_BEFORE_WARNING_MS = 24 * 60 * 60 * 1000 # 24hrs
def __init__(self, hs: "HomeServer"):
+ self._hs = hs
self._store = hs.get_datastores().main
self._clock = hs.get_clock()
self._running_tasks: Set[str] = set()
@@ -97,8 +99,6 @@ class TaskScheduler:
"handle_scheduled_tasks",
self._handle_scheduled_tasks,
)
- else:
- self.replication_client = hs.get_replication_command_handler()
def register_action(
self,
@@ -133,7 +133,7 @@ class TaskScheduler:
params: Optional[JsonMapping] = None,
) -> str:
"""Schedule a new potentially resumable task. A function matching the specified
- `action` should have been previously registered with `register_action`.
+ `action` should have be registered with `register_action` before the task is run.
Args:
action: the name of a previously registered action
@@ -149,11 +149,6 @@ class TaskScheduler:
Returns:
The id of the scheduled task
"""
- if action not in self._actions:
- raise Exception(
- f"No function associated with action {action} of the scheduled task"
- )
-
status = TaskStatus.SCHEDULED
if timestamp is None or timestamp < self._clock.time_msec():
timestamp = self._clock.time_msec()
@@ -175,7 +170,7 @@ class TaskScheduler:
if self._run_background_tasks:
await self._launch_task(task)
else:
- self.replication_client.send_new_active_task(task.id)
+ self._hs.get_replication_command_handler().send_new_active_task(task.id)
return task.id
@@ -315,30 +310,34 @@ class TaskScheduler:
"""
assert self._run_background_tasks
- assert task.action in self._actions
+ if task.action not in self._actions:
+ raise Exception(
+ f"No function associated with action {task.action} of the scheduled task {task.id}"
+ )
function = self._actions[task.action]
async def wrapper() -> None:
- try:
- (status, result, error) = await function(task)
- except Exception:
- f = Failure()
- logger.error(
- f"scheduled task {task.id} failed",
- exc_info=(f.type, f.value, f.getTracebackObject()),
+ with nested_logging_context(task.id):
+ try:
+ (status, result, error) = await function(task)
+ except Exception:
+ f = Failure()
+ logger.error(
+ f"scheduled task {task.id} failed",
+ exc_info=(f.type, f.value, f.getTracebackObject()),
+ )
+ status = TaskStatus.FAILED
+ result = None
+ error = f.getErrorMessage()
+
+ await self._store.update_scheduled_task(
+ task.id,
+ self._clock.time_msec(),
+ status=status,
+ result=result,
+ error=error,
)
- status = TaskStatus.FAILED
- result = None
- error = f.getErrorMessage()
-
- await self._store.update_scheduled_task(
- task.id,
- self._clock.time_msec(),
- status=status,
- result=result,
- error=error,
- )
- self._running_tasks.remove(task.id)
+ self._running_tasks.remove(task.id)
if len(self._running_tasks) >= TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS:
return
@@ -356,5 +355,4 @@ class TaskScheduler:
self._running_tasks.add(task.id)
await self.update_task(task.id, status=TaskStatus.ACTIVE)
- description = f"{task.id}-{task.action}"
- run_as_background_process(description, wrapper)
+ run_as_background_process(task.action, wrapper)
|