summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/17219.feature1
-rw-r--r--synapse/util/task_scheduler.py69
2 files changed, 68 insertions, 2 deletions
diff --git a/changelog.d/17219.feature b/changelog.d/17219.feature
new file mode 100644
index 0000000000..f8277a89d8
--- /dev/null
+++ b/changelog.d/17219.feature
@@ -0,0 +1 @@
+Add logging to tasks managed by the task scheduler, showing CPU and database usage.
\ No newline at end of file
diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py
index 01d05c9ed6..448960b297 100644
--- a/synapse/util/task_scheduler.py
+++ b/synapse/util/task_scheduler.py
@@ -24,7 +24,12 @@ from typing import TYPE_CHECKING, Awaitable, Callable, Dict, List, Optional, Set
 
 from twisted.python.failure import Failure
 
-from synapse.logging.context import nested_logging_context
+from synapse.logging.context import (
+    ContextResourceUsage,
+    LoggingContext,
+    nested_logging_context,
+    set_current_context,
+)
 from synapse.metrics import LaterGauge
 from synapse.metrics.background_process_metrics import (
     run_as_background_process,
@@ -81,6 +86,8 @@ class TaskScheduler:
     MAX_CONCURRENT_RUNNING_TASKS = 5
     # Time from the last task update after which we will log a warning
     LAST_UPDATE_BEFORE_WARNING_MS = 24 * 60 * 60 * 1000  # 24hrs
+    # Report a running task's status and usage every so often.
+    OCCASIONAL_REPORT_INTERVAL_MS = 5 * 60 * 1000  # 5 minutes
 
     def __init__(self, hs: "HomeServer"):
         self._hs = hs
@@ -346,6 +353,33 @@ class TaskScheduler:
             assert task.id not in self._running_tasks
             await self._store.delete_scheduled_task(task.id)
 
+    @staticmethod
+    def _log_task_usage(
+        state: str, task: ScheduledTask, usage: ContextResourceUsage, active_time: float
+    ) -> None:
+        """
+        Log a line describing the state and usage of a task.
+        The log line is inspired by / a copy of the request log line format,
+        but with irrelevant fields removed.
+
+        active_time: Time that the task has been running for, in seconds.
+        """
+
+        logger.info(
+            "Task %s: %.3fsec (%.3fsec, %.3fsec) (%.3fsec/%.3fsec/%d)"
+            " [%d dbevts] %r, %r",
+            state,
+            active_time,
+            usage.ru_utime,
+            usage.ru_stime,
+            usage.db_sched_duration_sec,
+            usage.db_txn_duration_sec,
+            int(usage.db_txn_count),
+            usage.evt_db_fetch_count,
+            task.resource_id,
+            task.params,
+        )
+
     async def _launch_task(self, task: ScheduledTask) -> None:
         """Launch a scheduled task now.
 
@@ -360,8 +394,32 @@ class TaskScheduler:
             )
         function = self._actions[task.action]
 
+        def _occasional_report(
+            task_log_context: LoggingContext, start_time: float
+        ) -> None:
+            """
+            Helper to log a 'Task continuing' line every so often.
+            """
+
+            current_time = self._clock.time()
+            calling_context = set_current_context(task_log_context)
+            try:
+                usage = task_log_context.get_resource_usage()
+                TaskScheduler._log_task_usage(
+                    "continuing", task, usage, current_time - start_time
+                )
+            finally:
+                set_current_context(calling_context)
+
         async def wrapper() -> None:
-            with nested_logging_context(task.id):
+            with nested_logging_context(task.id) as log_context:
+                start_time = self._clock.time()
+                occasional_status_call = self._clock.looping_call(
+                    _occasional_report,
+                    TaskScheduler.OCCASIONAL_REPORT_INTERVAL_MS,
+                    log_context,
+                    start_time,
+                )
                 try:
                     (status, result, error) = await function(task)
                 except Exception:
@@ -383,6 +441,13 @@ class TaskScheduler:
                 )
                 self._running_tasks.remove(task.id)
 
+                current_time = self._clock.time()
+                usage = log_context.get_resource_usage()
+                TaskScheduler._log_task_usage(
+                    status.value, task, usage, current_time - start_time
+                )
+                occasional_status_call.stop()
+
             # Try launch a new task since we've finished with this one.
             self._clock.call_later(0.1, self._launch_scheduled_tasks)