diff --git a/changelog.d/17962.misc b/changelog.d/17962.misc
new file mode 100644
index 0000000000..adf6348707
--- /dev/null
+++ b/changelog.d/17962.misc
@@ -0,0 +1 @@
+Fix new scheduled tasks jumping the queue.
diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
index 7d51441e91..6ab5356660 100644
--- a/synapse/replication/tcp/commands.py
+++ b/synapse/replication/tcp/commands.py
@@ -495,7 +495,7 @@ class LockReleasedCommand(Command):
class NewActiveTaskCommand(_SimpleCommand):
- """Sent to inform instance handling background tasks that a new active task is available to run.
+ """Sent to inform instance handling background tasks that a new task is ready to run.
Format::
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index 6101226938..1fafbb48c3 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -727,7 +727,7 @@ class ReplicationCommandHandler:
) -> None:
"""Called when get a new NEW_ACTIVE_TASK command."""
if self._task_scheduler:
- self._task_scheduler.launch_task_by_id(cmd.data)
+ self._task_scheduler.on_new_task(cmd.data)
def new_connection(self, connection: IReplicationConnection) -> None:
"""Called when we have a new connection."""
diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py
index 448960b297..3ed457bd30 100644
--- a/synapse/util/task_scheduler.py
+++ b/synapse/util/task_scheduler.py
@@ -174,9 +174,10 @@ class TaskScheduler:
The id of the scheduled task
"""
status = TaskStatus.SCHEDULED
+ start_now = False
if timestamp is None or timestamp < self._clock.time_msec():
timestamp = self._clock.time_msec()
- status = TaskStatus.ACTIVE
+ start_now = True
task = ScheduledTask(
random_string(16),
@@ -190,9 +191,11 @@ class TaskScheduler:
)
await self._store.insert_scheduled_task(task)
- if status == TaskStatus.ACTIVE:
+ # If the task is ready to run immediately, run the scheduling algorithm now
+ # rather than waiting
+ if start_now:
if self._run_background_tasks:
- await self._launch_task(task)
+ self._launch_scheduled_tasks()
else:
self._hs.get_replication_command_handler().send_new_active_task(task.id)
@@ -300,23 +303,13 @@ class TaskScheduler:
raise Exception(f"Task {id} is currently ACTIVE and can't be deleted")
await self._store.delete_scheduled_task(id)
- def launch_task_by_id(self, id: str) -> None:
- """Try launching the task with the given ID."""
- # Don't bother trying to launch new tasks if we're already at capacity.
- if len(self._running_tasks) >= TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS:
- return
-
- run_as_background_process("launch_task_by_id", self._launch_task_by_id, id)
-
- async def _launch_task_by_id(self, id: str) -> None:
- """Helper async function for `launch_task_by_id`."""
- task = await self.get_task(id)
- if task:
- await self._launch_task(task)
+ def on_new_task(self, task_id: str) -> None:
+ """Handle a notification that a new ready-to-run task has been added to the queue"""
+ # Just run the scheduler
+ self._launch_scheduled_tasks()
- @wrap_as_background_process("launch_scheduled_tasks")
- async def _launch_scheduled_tasks(self) -> None:
- """Retrieve and launch scheduled tasks that should be running at that time."""
+ def _launch_scheduled_tasks(self) -> None:
+ """Retrieve and launch scheduled tasks that should be running at this time."""
# Don't bother trying to launch new tasks if we're already at capacity.
if len(self._running_tasks) >= TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS:
return
@@ -326,20 +319,26 @@ class TaskScheduler:
self._launching_new_tasks = True
- try:
- for task in await self.get_tasks(
- statuses=[TaskStatus.ACTIVE], limit=self.MAX_CONCURRENT_RUNNING_TASKS
- ):
- await self._launch_task(task)
- for task in await self.get_tasks(
- statuses=[TaskStatus.SCHEDULED],
- max_timestamp=self._clock.time_msec(),
- limit=self.MAX_CONCURRENT_RUNNING_TASKS,
- ):
- await self._launch_task(task)
-
- finally:
- self._launching_new_tasks = False
+ async def inner() -> None:
+ try:
+ for task in await self.get_tasks(
+ statuses=[TaskStatus.ACTIVE],
+ limit=self.MAX_CONCURRENT_RUNNING_TASKS,
+ ):
+ # _launch_task will ignore tasks that we're already running, and
+ # will also do nothing if we're already at the maximum capacity.
+ await self._launch_task(task)
+ for task in await self.get_tasks(
+ statuses=[TaskStatus.SCHEDULED],
+ max_timestamp=self._clock.time_msec(),
+ limit=self.MAX_CONCURRENT_RUNNING_TASKS,
+ ):
+ await self._launch_task(task)
+
+ finally:
+ self._launching_new_tasks = False
+
+ run_as_background_process("launch_scheduled_tasks", inner)
@wrap_as_background_process("clean_scheduled_tasks")
async def _clean_scheduled_tasks(self) -> None:
diff --git a/tests/util/test_task_scheduler.py b/tests/util/test_task_scheduler.py
index 30f0510c9f..9e403b948b 100644
--- a/tests/util/test_task_scheduler.py
+++ b/tests/util/test_task_scheduler.py
@@ -18,8 +18,7 @@
# [This file includes modifications made by New Vector Limited]
#
#
-
-from typing import Optional, Tuple
+from typing import List, Optional, Tuple
from twisted.internet.task import deferLater
from twisted.test.proto_helpers import MemoryReactor
@@ -104,33 +103,43 @@ class TestTaskScheduler(HomeserverTestCase):
)
)
- # This is to give the time to the active tasks to finish
+ def get_tasks_of_status(status: TaskStatus) -> List[ScheduledTask]:
+ tasks = (
+ self.get_success(self.task_scheduler.get_task(task_id))
+ for task_id in task_ids
+ )
+ return [t for t in tasks if t is not None and t.status == status]
+
+ # At this point, there should be MAX_CONCURRENT_RUNNING_TASKS active tasks and
+ # one scheduled task.
+ self.assertEquals(
+ len(get_tasks_of_status(TaskStatus.ACTIVE)),
+ TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS,
+ )
+ self.assertEquals(
+ len(get_tasks_of_status(TaskStatus.SCHEDULED)),
+ 1,
+ )
+
+ # Give the time to the active tasks to finish
self.reactor.advance(1)
- # Check that only MAX_CONCURRENT_RUNNING_TASKS tasks has run and that one
+ # Check that MAX_CONCURRENT_RUNNING_TASKS tasks have run and that one
# is still scheduled.
- tasks = [
- self.get_success(self.task_scheduler.get_task(task_id))
- for task_id in task_ids
- ]
-
self.assertEquals(
- len(
- [t for t in tasks if t is not None and t.status == TaskStatus.COMPLETE]
- ),
+ len(get_tasks_of_status(TaskStatus.COMPLETE)),
TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS,
)
-
- scheduled_tasks = [
- t for t in tasks if t is not None and t.status == TaskStatus.ACTIVE
- ]
+ scheduled_tasks = get_tasks_of_status(TaskStatus.SCHEDULED)
self.assertEquals(len(scheduled_tasks), 1)
- # We need to wait for the next run of the scheduler loop
- self.reactor.advance((TaskScheduler.SCHEDULE_INTERVAL_MS / 1000))
- self.reactor.advance(1)
+ # The scheduled task should start 0.1s after the first of the active tasks
+ # finishes
+ self.reactor.advance(0.1)
+ self.assertEquals(len(get_tasks_of_status(TaskStatus.ACTIVE)), 1)
- # Check that the last task has been properly executed after the next scheduler loop run
+ # ... and should finally complete after another second
+ self.reactor.advance(1)
prev_scheduled_task = self.get_success(
self.task_scheduler.get_task(scheduled_tasks[0].id)
)
|