summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/16278.misc1
-rw-r--r--synapse/util/task_scheduler.py43
2 files changed, 23 insertions, 21 deletions
diff --git a/changelog.d/16278.misc b/changelog.d/16278.misc
new file mode 100644
index 0000000000..e82a470c45
--- /dev/null
+++ b/changelog.d/16278.misc
@@ -0,0 +1 @@
+Fix using the new task scheduler causing lots of CPU to be used.
diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py
index 9b2581e51a..b7de201bde 100644
--- a/synapse/util/task_scheduler.py
+++ b/synapse/util/task_scheduler.py
@@ -19,6 +19,7 @@ from prometheus_client import Gauge
 
 from twisted.python.failure import Failure
 
+from synapse.logging.context import nested_logging_context
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.types import JsonMapping, ScheduledTask, TaskStatus
 from synapse.util.stringutils import random_string
@@ -316,26 +317,27 @@ class TaskScheduler:
         function = self._actions[task.action]
 
         async def wrapper() -> None:
-            try:
-                (status, result, error) = await function(task)
-            except Exception:
-                f = Failure()
-                logger.error(
-                    f"scheduled task {task.id} failed",
-                    exc_info=(f.type, f.value, f.getTracebackObject()),
+            with nested_logging_context(task.id):
+                try:
+                    (status, result, error) = await function(task)
+                except Exception:
+                    f = Failure()
+                    logger.error(
+                        f"scheduled task {task.id} failed",
+                        exc_info=(f.type, f.value, f.getTracebackObject()),
+                    )
+                    status = TaskStatus.FAILED
+                    result = None
+                    error = f.getErrorMessage()
+
+                await self._store.update_scheduled_task(
+                    task.id,
+                    self._clock.time_msec(),
+                    status=status,
+                    result=result,
+                    error=error,
                 )
-                status = TaskStatus.FAILED
-                result = None
-                error = f.getErrorMessage()
-
-            await self._store.update_scheduled_task(
-                task.id,
-                self._clock.time_msec(),
-                status=status,
-                result=result,
-                error=error,
-            )
-            self._running_tasks.remove(task.id)
+                self._running_tasks.remove(task.id)
 
         if len(self._running_tasks) >= TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS:
             return
@@ -353,5 +355,4 @@ class TaskScheduler:
 
         self._running_tasks.add(task.id)
         await self.update_task(task.id, status=TaskStatus.ACTIVE)
-        description = f"{task.id}-{task.action}"
-        run_as_background_process(description, wrapper)
+        run_as_background_process(task.action, wrapper)