diff --git a/changelog.d/18214.feature b/changelog.d/18214.feature
new file mode 100644
index 0000000000..751cb7d383
--- /dev/null
+++ b/changelog.d/18214.feature
@@ -0,0 +1 @@
+Add an Admin API endpoint `GET /_synapse/admin/v1/scheduled_tasks` to fetch scheduled tasks.
\ No newline at end of file
diff --git a/docs/admin_api/scheduled_tasks.md b/docs/admin_api/scheduled_tasks.md
new file mode 100644
index 0000000000..1708871a6d
--- /dev/null
+++ b/docs/admin_api/scheduled_tasks.md
@@ -0,0 +1,54 @@
+# Show scheduled tasks
+
+This API returns information about scheduled tasks.
+
+To use it, you will need to authenticate by providing an `access_token`
+for a server admin: see [Admin API](../usage/administration/admin_api/).
+
+The api is:
+```
+GET /_synapse/admin/v1/scheduled_tasks
+```
+
+It returns a JSON body like the following:
+
+```json
+{
+ "scheduled_tasks": [
+ {
+ "id": "GSA124oegf1",
+ "action": "shutdown_room",
+ "status": "complete",
+ "timestamp": 23423523,
+ "resource_id": "!roomid",
+ "result": "some result",
+ "error": null
+ }
+ ]
+}
+```
+
+**Query parameters:**
+
+* `action_name`: string - Is optional. Returns only the scheduled tasks with the given action name.
+* `resource_id`: string - Is optional. Returns only the scheduled tasks with the given resource id.
+* `status`: string - Is optional. Returns only the scheduled tasks matching the given status, one of
+ - "scheduled" - Task is scheduled but not active
+ - "active" - Task is active and probably running, and if not will be run on next scheduler loop run
+ - "complete" - Task has completed successfully
+ - "failed" - Task is over and either returned a failed status, or had an exception
+
+* `max_timestamp`: int - Is optional. Returns only the scheduled tasks with a timestamp inferior to the specified one.
+
+**Response**
+
+The following fields are returned in the JSON response body along with a `200` HTTP status code:
+
+* `id`: string - ID of scheduled task.
+* `action`: string - The name of the scheduled task's action.
+* `status`: string - The status of the scheduled task.
+* `timestamp_ms`: integer - The timestamp (in milliseconds since the unix epoch) of the given task - If the status is "scheduled" then this represents when it should be launched.
+ Otherwise it represents the last time this task got a change of state.
+* `resource_id`: Optional string - The resource id of the scheduled task, if it possesses one
+* `result`: Optional Json - Any result of the scheduled task, if given
+* `error`: Optional string - If the task has the status "failed", the error associated with this failure
diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py
index 5977ded4a0..cf809d1a27 100644
--- a/synapse/rest/admin/__init__.py
+++ b/synapse/rest/admin/__init__.py
@@ -86,6 +86,7 @@ from synapse.rest.admin.rooms import (
RoomStateRestServlet,
RoomTimestampToEventRestServlet,
)
+from synapse.rest.admin.scheduled_tasks import ScheduledTasksRestServlet
from synapse.rest.admin.server_notice_servlet import SendServerNoticeServlet
from synapse.rest.admin.statistics import (
LargestRoomsStatistics,
@@ -338,6 +339,7 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
BackgroundUpdateStartJobRestServlet(hs).register(http_server)
ExperimentalFeaturesRestServlet(hs).register(http_server)
SuspendAccountRestServlet(hs).register(http_server)
+ ScheduledTasksRestServlet(hs).register(http_server)
def register_servlets_for_client_rest_resource(
diff --git a/synapse/rest/admin/scheduled_tasks.py b/synapse/rest/admin/scheduled_tasks.py
new file mode 100644
index 0000000000..2ae13021b9
--- /dev/null
+++ b/synapse/rest/admin/scheduled_tasks.py
@@ -0,0 +1,70 @@
+#
+# This file is licensed under the Affero General Public License (AGPL) version 3.
+#
+# Copyright (C) 2025 New Vector, Ltd
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# See the GNU Affero General Public License for more details:
+# <https://www.gnu.org/licenses/agpl-3.0.html>.
+#
+#
+#
+from typing import TYPE_CHECKING, Tuple
+
+from synapse.http.servlet import RestServlet, parse_integer, parse_string
+from synapse.http.site import SynapseRequest
+from synapse.rest.admin import admin_patterns, assert_requester_is_admin
+from synapse.types import JsonDict, TaskStatus
+
+if TYPE_CHECKING:
+ from synapse.server import HomeServer
+
+
+class ScheduledTasksRestServlet(RestServlet):
+ """Get a list of scheduled tasks and their statuses
+ optionally filtered by action name, resource id, status, and max timestamp
+ """
+
+ PATTERNS = admin_patterns("/scheduled_tasks$")
+
+ def __init__(self, hs: "HomeServer"):
+ self._auth = hs.get_auth()
+ self._store = hs.get_datastores().main
+
+ async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
+ await assert_requester_is_admin(self._auth, request)
+
+ # extract query params
+ action_name = parse_string(request, "action_name")
+ resource_id = parse_string(request, "resource_id")
+ status = parse_string(request, "job_status")
+ max_timestamp = parse_integer(request, "max_timestamp")
+
+ actions = [action_name] if action_name else None
+ statuses = [TaskStatus(status)] if status else None
+
+ tasks = await self._store.get_scheduled_tasks(
+ actions=actions,
+ resource_id=resource_id,
+ statuses=statuses,
+ max_timestamp=max_timestamp,
+ )
+
+ json_tasks = []
+ for task in tasks:
+ result_task = {
+ "id": task.id,
+ "action": task.action,
+ "status": task.status,
+ "timestamp_ms": task.timestamp,
+ "resource_id": task.resource_id,
+ "result": task.result,
+ "error": task.error,
+ }
+ json_tasks.append(result_task)
+
+ return 200, {"scheduled_tasks": json_tasks}
diff --git a/tests/rest/admin/test_scheduled_tasks.py b/tests/rest/admin/test_scheduled_tasks.py
new file mode 100644
index 0000000000..9654e9322b
--- /dev/null
+++ b/tests/rest/admin/test_scheduled_tasks.py
@@ -0,0 +1,192 @@
+#
+# This file is licensed under the Affero General Public License (AGPL) version 3.
+#
+# Copyright (C) 2025 New Vector, Ltd
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# See the GNU Affero General Public License for more details:
+# <https://www.gnu.org/licenses/agpl-3.0.html>.
+#
+#
+#
+from typing import Mapping, Optional, Tuple
+
+from twisted.test.proto_helpers import MemoryReactor
+
+import synapse.rest.admin
+from synapse.api.errors import Codes
+from synapse.rest.client import login
+from synapse.server import HomeServer
+from synapse.types import JsonMapping, ScheduledTask, TaskStatus
+from synapse.util import Clock
+
+from tests import unittest
+
+
+class ScheduledTasksAdminApiTestCase(unittest.HomeserverTestCase):
+ servlets = [
+ synapse.rest.admin.register_servlets,
+ login.register_servlets,
+ ]
+
+ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
+ self.store = hs.get_datastores().main
+ self.admin_user = self.register_user("admin", "pass", admin=True)
+ self.admin_user_tok = self.login("admin", "pass")
+ self._task_scheduler = hs.get_task_scheduler()
+
+ # create and schedule a few tasks
+ async def _test_task(
+ task: ScheduledTask,
+ ) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]:
+ return TaskStatus.ACTIVE, None, None
+
+ async def _finished_test_task(
+ task: ScheduledTask,
+ ) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]:
+ return TaskStatus.COMPLETE, None, None
+
+ async def _failed_test_task(
+ task: ScheduledTask,
+ ) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]:
+ return TaskStatus.FAILED, None, "Everything failed"
+
+ self._task_scheduler.register_action(_test_task, "test_task")
+ self.get_success(
+ self._task_scheduler.schedule_task("test_task", resource_id="test")
+ )
+
+ self._task_scheduler.register_action(_finished_test_task, "finished_test_task")
+ self.get_success(
+ self._task_scheduler.schedule_task(
+ "finished_test_task", resource_id="finished_task"
+ )
+ )
+
+ self._task_scheduler.register_action(_failed_test_task, "failed_test_task")
+ self.get_success(
+ self._task_scheduler.schedule_task(
+ "failed_test_task", resource_id="failed_task"
+ )
+ )
+
+ def check_scheduled_tasks_response(self, scheduled_tasks: Mapping) -> list:
+ result = []
+ for task in scheduled_tasks:
+ if task["resource_id"] == "test":
+ self.assertEqual(task["status"], TaskStatus.ACTIVE)
+ self.assertEqual(task["action"], "test_task")
+ result.append(task)
+ if task["resource_id"] == "finished_task":
+ self.assertEqual(task["status"], TaskStatus.COMPLETE)
+ self.assertEqual(task["action"], "finished_test_task")
+ result.append(task)
+ if task["resource_id"] == "failed_task":
+ self.assertEqual(task["status"], TaskStatus.FAILED)
+ self.assertEqual(task["action"], "failed_test_task")
+ result.append(task)
+
+ return result
+
+ def test_requester_is_not_admin(self) -> None:
+ """
+ If the user is not a server admin, an error 403 is returned.
+ """
+
+ self.register_user("user", "pass", admin=False)
+ other_user_tok = self.login("user", "pass")
+
+ channel = self.make_request(
+ "GET",
+ "/_synapse/admin/v1/scheduled_tasks",
+ content={},
+ access_token=other_user_tok,
+ )
+
+ self.assertEqual(403, channel.code, msg=channel.json_body)
+ self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
+
+ def test_scheduled_tasks(self) -> None:
+ """
+ Test that endpoint returns scheduled tasks.
+ """
+
+ channel = self.make_request(
+ "GET",
+ "/_synapse/admin/v1/scheduled_tasks",
+ content={},
+ access_token=self.admin_user_tok,
+ )
+ self.assertEqual(200, channel.code, msg=channel.json_body)
+ scheduled_tasks = channel.json_body["scheduled_tasks"]
+
+ # make sure we got back all the scheduled tasks
+ found_tasks = self.check_scheduled_tasks_response(scheduled_tasks)
+ self.assertEqual(len(found_tasks), 3)
+
+ def test_filtering_scheduled_tasks(self) -> None:
+ """
+ Test that filtering the scheduled tasks response via query params works as expected.
+ """
+ # filter via job_status
+ channel = self.make_request(
+ "GET",
+ "/_synapse/admin/v1/scheduled_tasks?job_status=active",
+ content={},
+ access_token=self.admin_user_tok,
+ )
+ self.assertEqual(200, channel.code, msg=channel.json_body)
+ scheduled_tasks = channel.json_body["scheduled_tasks"]
+ found_tasks = self.check_scheduled_tasks_response(scheduled_tasks)
+
+ # only the active task should have been returned
+ self.assertEqual(len(found_tasks), 1)
+ self.assertEqual(found_tasks[0]["status"], "active")
+
+ # filter via action_name
+ channel = self.make_request(
+ "GET",
+ "/_synapse/admin/v1/scheduled_tasks?action_name=test_task",
+ content={},
+ access_token=self.admin_user_tok,
+ )
+ self.assertEqual(200, channel.code, msg=channel.json_body)
+ scheduled_tasks = channel.json_body["scheduled_tasks"]
+
+ # only test_task should have been returned
+ found_tasks = self.check_scheduled_tasks_response(scheduled_tasks)
+ self.assertEqual(len(found_tasks), 1)
+ self.assertEqual(found_tasks[0]["action"], "test_task")
+
+ # filter via max_timestamp
+ channel = self.make_request(
+ "GET",
+ "/_synapse/admin/v1/scheduled_tasks?max_timestamp=0",
+ content={},
+ access_token=self.admin_user_tok,
+ )
+ self.assertEqual(200, channel.code, msg=channel.json_body)
+ scheduled_tasks = channel.json_body["scheduled_tasks"]
+ found_tasks = self.check_scheduled_tasks_response(scheduled_tasks)
+
+ # none should have been returned
+ self.assertEqual(len(found_tasks), 0)
+
+ # filter via resource id
+ channel = self.make_request(
+ "GET",
+ "/_synapse/admin/v1/scheduled_tasks?resource_id=failed_task",
+ content={},
+ access_token=self.admin_user_tok,
+ )
+ self.assertEqual(200, channel.code, msg=channel.json_body)
+ scheduled_tasks = channel.json_body["scheduled_tasks"]
+ found_tasks = self.check_scheduled_tasks_response(scheduled_tasks)
+
+ # only the task with the matching resource id should have been returned
+ self.assertEqual(len(found_tasks), 1)
+ self.assertEqual(found_tasks[0]["resource_id"], "failed_task")
|