diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index 5642666411..b668bb5da1 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -672,14 +672,12 @@ class ReplicationCommandHandler:
cmd.instance_name, cmd.lock_name, cmd.lock_key
)
- async def on_NEW_ACTIVE_TASK(
+ def on_NEW_ACTIVE_TASK(
self, conn: IReplicationConnection, cmd: NewActiveTaskCommand
) -> None:
"""Called when get a new NEW_ACTIVE_TASK command."""
if self._task_scheduler:
- task = await self._task_scheduler.get_task(cmd.data)
- if task:
- await self._task_scheduler._launch_task(task)
+ self._task_scheduler.launch_task_by_id(cmd.data)
def new_connection(self, connection: IReplicationConnection) -> None:
"""Called when we have a new connection."""
diff --git a/synapse/storage/databases/main/task_scheduler.py b/synapse/storage/databases/main/task_scheduler.py
index 9ab120eea9..5c5372a825 100644
--- a/synapse/storage/databases/main/task_scheduler.py
+++ b/synapse/storage/databases/main/task_scheduler.py
@@ -53,6 +53,7 @@ class TaskSchedulerWorkerStore(SQLBaseStore):
resource_id: Optional[str] = None,
statuses: Optional[List[TaskStatus]] = None,
max_timestamp: Optional[int] = None,
+ limit: Optional[int] = None,
) -> List[ScheduledTask]:
"""Get a list of scheduled tasks from the DB.
@@ -62,6 +63,7 @@ class TaskSchedulerWorkerStore(SQLBaseStore):
statuses: Limit the returned tasks to the specific statuses
max_timestamp: Limit the returned tasks to the ones that have
a timestamp inferior to the specified one
+ limit: Only return `limit` number of rows if set.
Returns: a list of `ScheduledTask`, ordered by increasing timestamps
"""
@@ -94,6 +96,10 @@ class TaskSchedulerWorkerStore(SQLBaseStore):
sql = sql + " ORDER BY timestamp"
+ if limit is not None:
+ sql += " LIMIT ?"
+ args.append(limit)
+
txn.execute(sql, args)
return self.db_pool.cursor_to_dict(txn)
diff --git a/synapse/storage/schema/main/delta/82/02_scheduled_tasks_index.sql b/synapse/storage/schema/main/delta/82/02_scheduled_tasks_index.sql
new file mode 100644
index 0000000000..6b90275139
--- /dev/null
+++ b/synapse/storage/schema/main/delta/82/02_scheduled_tasks_index.sql
@@ -0,0 +1,16 @@
+/* Copyright 2023 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE INDEX IF NOT EXISTS scheduled_tasks_timestamp ON scheduled_tasks(timestamp);
diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py
index b7de201bde..caf13b3474 100644
--- a/synapse/util/task_scheduler.py
+++ b/synapse/util/task_scheduler.py
@@ -15,12 +15,14 @@
import logging
from typing import TYPE_CHECKING, Awaitable, Callable, Dict, List, Optional, Set, Tuple
-from prometheus_client import Gauge
-
from twisted.python.failure import Failure
from synapse.logging.context import nested_logging_context
-from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.metrics import LaterGauge
+from synapse.metrics.background_process_metrics import (
+ run_as_background_process,
+ wrap_as_background_process,
+)
from synapse.types import JsonMapping, ScheduledTask, TaskStatus
from synapse.util.stringutils import random_string
@@ -30,12 +32,6 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
-running_tasks_gauge = Gauge(
- "synapse_scheduler_running_tasks",
- "The number of concurrent running tasks handled by the TaskScheduler",
-)
-
-
class TaskScheduler:
"""
This is a simple task sheduler aimed at resumable tasks: usually we use `run_in_background`
@@ -70,6 +66,8 @@ class TaskScheduler:
# Precision of the scheduler, evaluation of tasks to run will only happen
# every `SCHEDULE_INTERVAL_MS` ms
SCHEDULE_INTERVAL_MS = 1 * 60 * 1000 # 1mn
+ # How often to clean up old tasks.
+ CLEANUP_INTERVAL_MS = 30 * 60 * 1000
# Time before a complete or failed task is deleted from the DB
KEEP_TASKS_FOR_MS = 7 * 24 * 60 * 60 * 1000 # 1 week
# Maximum number of tasks that can run at the same time
@@ -92,14 +90,26 @@ class TaskScheduler:
] = {}
self._run_background_tasks = hs.config.worker.run_background_tasks
+ # Flag to make sure we only try and launch new tasks once at a time.
+ self._launching_new_tasks = False
+
if self._run_background_tasks:
self._clock.looping_call(
- run_as_background_process,
+ self._launch_scheduled_tasks,
+ TaskScheduler.SCHEDULE_INTERVAL_MS,
+ )
+ self._clock.looping_call(
+ self._clean_scheduled_tasks,
TaskScheduler.SCHEDULE_INTERVAL_MS,
- "handle_scheduled_tasks",
- self._handle_scheduled_tasks,
)
+ LaterGauge(
+ "synapse_scheduler_running_tasks",
+ "The number of concurrent running tasks handled by the TaskScheduler",
+ labels=None,
+ caller=lambda: len(self._running_tasks),
+ )
+
def register_action(
self,
function: Callable[
@@ -234,6 +244,7 @@ class TaskScheduler:
resource_id: Optional[str] = None,
statuses: Optional[List[TaskStatus]] = None,
max_timestamp: Optional[int] = None,
+ limit: Optional[int] = None,
) -> List[ScheduledTask]:
"""Get a list of tasks. Returns all the tasks if no args is provided.
@@ -247,6 +258,7 @@ class TaskScheduler:
statuses: Limit the returned tasks to the specific statuses
max_timestamp: Limit the returned tasks to the ones that have
a timestamp inferior to the specified one
+ limit: Only return `limit` number of rows if set.
Returns
A list of `ScheduledTask`, ordered by increasing timestamps
@@ -256,6 +268,7 @@ class TaskScheduler:
resource_id=resource_id,
statuses=statuses,
max_timestamp=max_timestamp,
+ limit=limit,
)
async def delete_task(self, id: str) -> None:
@@ -273,34 +286,58 @@ class TaskScheduler:
raise Exception(f"Task {id} is currently ACTIVE and can't be deleted")
await self._store.delete_scheduled_task(id)
- async def _handle_scheduled_tasks(self) -> None:
- """Main loop taking care of launching tasks and cleaning up old ones."""
- await self._launch_scheduled_tasks()
- await self._clean_scheduled_tasks()
+ def launch_task_by_id(self, id: str) -> None:
+ """Try launching the task with the given ID."""
+ # Don't bother trying to launch new tasks if we're already at capacity.
+ if len(self._running_tasks) >= TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS:
+ return
+
+ run_as_background_process("launch_task_by_id", self._launch_task_by_id, id)
+
+ async def _launch_task_by_id(self, id: str) -> None:
+ """Helper async function for `launch_task_by_id`."""
+ task = await self.get_task(id)
+ if task:
+ await self._launch_task(task)
+ @wrap_as_background_process("launch_scheduled_tasks")
async def _launch_scheduled_tasks(self) -> None:
"""Retrieve and launch scheduled tasks that should be running at that time."""
- for task in await self.get_tasks(statuses=[TaskStatus.ACTIVE]):
- await self._launch_task(task)
- for task in await self.get_tasks(
- statuses=[TaskStatus.SCHEDULED], max_timestamp=self._clock.time_msec()
- ):
- await self._launch_task(task)
+ # Don't bother trying to launch new tasks if we're already at capacity.
+ if len(self._running_tasks) >= TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS:
+ return
+
+ if self._launching_new_tasks:
+ return
- running_tasks_gauge.set(len(self._running_tasks))
+ self._launching_new_tasks = True
+ try:
+ for task in await self.get_tasks(
+ statuses=[TaskStatus.ACTIVE], limit=self.MAX_CONCURRENT_RUNNING_TASKS
+ ):
+ await self._launch_task(task)
+ for task in await self.get_tasks(
+ statuses=[TaskStatus.SCHEDULED],
+ max_timestamp=self._clock.time_msec(),
+ limit=self.MAX_CONCURRENT_RUNNING_TASKS,
+ ):
+ await self._launch_task(task)
+
+ finally:
+ self._launching_new_tasks = False
+
+ @wrap_as_background_process("clean_scheduled_tasks")
async def _clean_scheduled_tasks(self) -> None:
"""Clean old complete or failed jobs to avoid clutter the DB."""
+ now = self._clock.time_msec()
for task in await self._store.get_scheduled_tasks(
- statuses=[TaskStatus.FAILED, TaskStatus.COMPLETE]
+ statuses=[TaskStatus.FAILED, TaskStatus.COMPLETE],
+ max_timestamp=now - TaskScheduler.KEEP_TASKS_FOR_MS,
):
# FAILED and COMPLETE tasks should never be running
assert task.id not in self._running_tasks
- if (
- self._clock.time_msec()
- > task.timestamp + TaskScheduler.KEEP_TASKS_FOR_MS
- ):
- await self._store.delete_scheduled_task(task.id)
+ await self._store.delete_scheduled_task(task.id)
async def _launch_task(self, task: ScheduledTask) -> None:
"""Launch a scheduled task now.
@@ -339,6 +376,9 @@ class TaskScheduler:
)
self._running_tasks.remove(task.id)
+ # Try launch a new task since we've finished with this one.
+ self._clock.call_later(1, self._launch_scheduled_tasks)
+
if len(self._running_tasks) >= TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS:
return
@@ -355,4 +395,4 @@ class TaskScheduler:
self._running_tasks.add(task.id)
await self.update_task(task.id, status=TaskStatus.ACTIVE)
- run_as_background_process(task.action, wrapper)
+ run_as_background_process(f"task-{task.action}", wrapper)
|