summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/handlers/pagination.py362
-rw-r--r--synapse/handlers/room.py91
-rw-r--r--synapse/handlers/room_member.py12
-rw-r--r--synapse/rest/admin/__init__.py17
-rw-r--r--synapse/rest/admin/rooms.py49
-rw-r--r--synapse/storage/databases/main/roommember.py108
-rw-r--r--synapse/storage/schema/main/delta/78/03_rooms_to_delete.sql27
-rw-r--r--tests/rest/admin/test_room.py112
-rw-r--r--tests/rest/admin/test_server_notice.py5
-rw-r--r--tests/rest/client/test_rooms.py5
10 files changed, 213 insertions, 575 deletions
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 982b38cc43..c52676709d 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -12,9 +12,8 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-import json
 import logging
-from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set
+from typing import TYPE_CHECKING, List, Optional, Set, Tuple
 
 from twisted.python.failure import Failure
 
@@ -22,12 +21,19 @@ from synapse.api.constants import Direction, EventTypes, Membership
 from synapse.api.errors import SynapseError
 from synapse.api.filtering import Filter
 from synapse.events.utils import SerializeEventConfig
-from synapse.handlers.room import DeleteStatus, ShutdownRoomParams, ShutdownRoomResponse
+from synapse.handlers.room import ShutdownRoomParams
 from synapse.logging.opentracing import trace
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.rest.admin._base import assert_user_is_admin
 from synapse.streams.config import PaginationConfig
-from synapse.types import JsonDict, Requester, StreamKeyType
+from synapse.types import (
+    JsonDict,
+    JsonMapping,
+    Requester,
+    ScheduledTask,
+    StreamKeyType,
+    TaskStatus,
+)
 from synapse.types.state import StateFilter
 from synapse.util.async_helpers import ReadWriteLock
 from synapse.util.stringutils import random_string
@@ -52,12 +58,6 @@ class PaginationHandler:
     paginating during a purge.
     """
 
-    # when to remove a completed deletion/purge from the results map
-    CLEAR_PURGE_AFTER_MS = 1000 * 3600 * 24  # 24 hours
-
-    # how often to run the purge rooms loop
-    PURGE_ROOMS_INTERVAL_MS = 1000 * 3600  # 1 hour
-
     def __init__(self, hs: "HomeServer"):
         self.hs = hs
         self.auth = hs.get_auth()
@@ -68,6 +68,7 @@ class PaginationHandler:
         self._server_name = hs.hostname
         self._room_shutdown_handler = hs.get_room_shutdown_handler()
         self._relations_handler = hs.get_relations_handler()
+        self._task_scheduler = hs.get_task_scheduler()
 
         self.pagination_lock = ReadWriteLock()
         # IDs of rooms in which there currently an active purge *or delete* operation.
@@ -101,95 +102,11 @@ class PaginationHandler:
                     job.longest_max_lifetime,
                 )
 
-        if self._is_master:
-            self.clock.looping_call(
-                run_as_background_process,
-                PaginationHandler.PURGE_ROOMS_INTERVAL_MS,
-                "purge_rooms",
-                self.purge_rooms,
-            )
-
-    async def purge_rooms(self) -> None:
-        """This takes care of restoring unfinished purge/shutdown rooms from the DB.
-        It also takes care to launch scheduled ones, like rooms that has been fully
-        forgotten.
-
-        It should be run regularly.
-        """
-        rooms_to_delete = await self.store.get_rooms_to_delete()
-        for r in rooms_to_delete:
-            room_id = r["room_id"]
-            delete_id = r["delete_id"]
-            status = r["status"]
-            action = r["action"]
-            timestamp = r["timestamp"]
-
-            if (
-                status == DeleteStatus.STATUS_COMPLETE
-                or status == DeleteStatus.STATUS_FAILED
-            ):
-                # remove the delete from the list 24 hours after it completes or fails
-                ms_since_completed = self.clock.time_msec() - timestamp
-                if ms_since_completed >= PaginationHandler.CLEAR_PURGE_AFTER_MS:
-                    await self.store.delete_room_to_delete(room_id, delete_id)
-
-                continue
-
-            if room_id in self._purges_in_progress_by_room:
-                # a delete background task is already running (or has run)
-                # for this room id, let's ignore it for now
-                continue
-
-            # If the database says we were last in the middle of shutting down the room,
-            # let's continue the shutdown process.
-            shutdown_response = None
-            if (
-                action == DeleteStatus.ACTION_SHUTDOWN
-                and status == DeleteStatus.STATUS_SHUTTING_DOWN
-            ):
-                shutdown_params = json.loads(r["params"])
-                if r["response"]:
-                    shutdown_response = json.loads(r["response"])
-                await self._shutdown_and_purge_room(
-                    room_id,
-                    delete_id,
-                    shutdown_params=shutdown_params,
-                    shutdown_response=shutdown_response,
-                )
-                continue
-
-            # If the database says we were last in the middle of purging the room,
-            # let's continue the purge process.
-            if status == DeleteStatus.STATUS_PURGING:
-                purge_now = True
-            # Or if we're at or past the scheduled purge time, let's start that one as well
-            elif status == DeleteStatus.STATUS_SCHEDULED and (
-                timestamp is None or self.clock.time_msec() >= timestamp
-            ):
-                purge_now = True
-
-            # TODO 2 stages purge, keep memberships for a while so we don't "break" sync
-            if purge_now:
-                params = {}
-                if r["params"]:
-                    params = json.loads(r["params"])
-
-                if action == DeleteStatus.ACTION_PURGE_HISTORY:
-                    if "token" in params:
-                        await self._purge_history(
-                            delete_id,
-                            room_id,
-                            params["token"],
-                            params.get("delete_local_events", False),
-                            True,
-                        )
-                elif action == DeleteStatus.ACTION_PURGE:
-                    await self.purge_room(
-                        room_id,
-                        delete_id,
-                        params.get("force", False),
-                        shutdown_response=shutdown_response,
-                    )
+        self._task_scheduler.register_action(self._purge_history, "purge_history")
+        self._task_scheduler.register_action(self._purge_room, "purge_room")
+        self._task_scheduler.register_action(
+            self._shutdown_and_purge_room, "shutdown_and_purge_room"
+        )
 
     async def purge_history_for_rooms_in_range(
         self, min_ms: Optional[int], max_ms: Optional[int]
@@ -241,14 +158,6 @@ class PaginationHandler:
         for room_id, retention_policy in rooms.items():
             logger.info("[purge] Attempting to purge messages in room %s", room_id)
 
-            if room_id in self._purges_in_progress_by_room:
-                logger.warning(
-                    "[purge] not purging room %s as there's an ongoing purge running"
-                    " for this room",
-                    room_id,
-                )
-                continue
-
             # If max_lifetime is None, it means that the room has no retention policy.
             # Given we only retrieve such rooms when there's a default retention policy
             # defined in the server's configuration, we can safely assume that's the
@@ -330,46 +239,49 @@ class PaginationHandler:
         Returns:
             unique ID for this purge transaction.
         """
-        if room_id in self._purges_in_progress_by_room:
-            raise SynapseError(
-                400, "History purge already in progress for %s" % (room_id,)
-            )
-
-        purge_id = random_string(16)
+        purge_id = await self._task_scheduler.schedule_task(
+            "purge_history",
+            resource_id=room_id,
+            params={"token": token, "delete_local_events": delete_local_events},
+        )
 
         # we log the purge_id here so that it can be tied back to the
         # request id in the log lines.
         logger.info("[purge] starting purge_id %s", purge_id)
 
-        await self.store.upsert_room_to_delete(
-            room_id,
-            purge_id,
-            DeleteStatus.ACTION_PURGE_HISTORY,
-            DeleteStatus.STATUS_PURGING,
-            params=json.dumps(
-                {"token": token, "delete_local_events": delete_local_events}
-            ),
-        )
-
-        run_as_background_process(
-            "purge_history",
-            self._purge_history,
-            purge_id,
-            room_id,
-            token,
-            delete_local_events,
-            True,
-        )
         return purge_id
 
     async def _purge_history(
         self,
-        purge_id: str,
+        task: ScheduledTask,
+        first_launch: bool,
+    ) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]:
+        if (
+            task.resource_id is None
+            or task.params is None
+            or "token" not in task.params
+            or "delete_local_events" not in task.params
+        ):
+            return (
+                TaskStatus.FAILED,
+                None,
+                "Not enough parameters passed to _purge_history",
+            )
+        err = await self.purge_history(
+            task.resource_id,
+            task.params["token"],
+            task.params["delete_local_events"],
+        )
+        if err is not None:
+            return TaskStatus.FAILED, None, err
+        return TaskStatus.COMPLETE, None, None
+
+    async def purge_history(
+        self,
         room_id: str,
         token: str,
         delete_local_events: bool,
-        update_rooms_to_delete_table: bool,
-    ) -> None:
+    ) -> Optional[str]:
         """Carry out a history purge on a room.
 
         Args:
@@ -382,88 +294,54 @@ class PaginationHandler:
                 functionality since we don't need to explicitly restore those, they
                 will be relaunch by the retention logic.
         """
-        self._purges_in_progress_by_room.add(room_id)
         try:
             async with self.pagination_lock.write(room_id):
                 await self._storage_controllers.purge_events.purge_history(
                     room_id, token, delete_local_events
                 )
             logger.info("[purge] complete")
-            if update_rooms_to_delete_table:
-                await self.store.upsert_room_to_delete(
-                    room_id,
-                    purge_id,
-                    DeleteStatus.ACTION_PURGE_HISTORY,
-                    DeleteStatus.STATUS_COMPLETE,
-                    timestamp=self.clock.time_msec(),
-                )
+            return None
         except Exception:
             f = Failure()
             logger.error(
                 "[purge] failed", exc_info=(f.type, f.value, f.getTracebackObject())
             )
-            if update_rooms_to_delete_table:
-                await self.store.upsert_room_to_delete(
-                    room_id,
-                    purge_id,
-                    DeleteStatus.ACTION_PURGE_HISTORY,
-                    DeleteStatus.STATUS_FAILED,
-                    error=f.getErrorMessage(),
-                    timestamp=self.clock.time_msec(),
-                )
-        finally:
-            self._purges_in_progress_by_room.discard(room_id)
-
-            if update_rooms_to_delete_table:
-                # remove the purge from the list 24 hours after it completes
-                async def clear_purge() -> None:
-                    await self.store.delete_room_to_delete(room_id, purge_id)
-
-                self.hs.get_reactor().callLater(
-                    PaginationHandler.CLEAR_PURGE_AFTER_MS / 1000, clear_purge
-                )
+            return f.getErrorMessage()
 
-    @staticmethod
-    def _convert_to_delete_status(res: Dict[str, Any]) -> DeleteStatus:
-        status = DeleteStatus()
-        status.delete_id = res["delete_id"]
-        status.action = res["action"]
-        status.status = res["status"]
-        if "error" in res:
-            status.error = res["error"]
-
-        if status.action == DeleteStatus.ACTION_SHUTDOWN and res["response"]:
-            status.shutdown_room = json.loads(res["response"])
-
-        return status
-
-    async def get_delete_status(self, delete_id: str) -> Optional[DeleteStatus]:
+    async def get_delete_task(self, delete_id: str) -> Optional[ScheduledTask]:
         """Get the current status of an active deleting
 
         Args:
             delete_id: delete_id returned by start_shutdown_and_purge_room
                 or start_purge_history.
         """
-        res = await self.store.get_room_to_delete(delete_id)
-        if res:
-            return PaginationHandler._convert_to_delete_status(res)
-        return None
+        return await self._task_scheduler.get_task(delete_id)
 
-    async def get_delete_statuses_by_room(self, room_id: str) -> List[DeleteStatus]:
+    async def get_delete_tasks_by_room(self, room_id: str) -> List[ScheduledTask]:
         """Get all active delete statuses by room
 
         Args:
             room_id: room_id that is deleted
         """
-        res = await self.store.get_rooms_to_delete(room_id)
-        return [PaginationHandler._convert_to_delete_status(r) for r in res]
+        return await self._task_scheduler.get_tasks(
+            actions=["purge_room", "shutdown_and_purge_room"], resource_ids=[room_id]
+        )
+
+    async def _purge_room(
+        self,
+        task: ScheduledTask,
+        first_launch: bool,
+    ) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]:
+        if not task.resource_id:
+            raise Exception("No room id passed to purge_room task")
+        params = task.params if task.params else {}
+        await self.purge_room(task.resource_id, params.get("force", False))
+        return TaskStatus.COMPLETE, None, None
 
     async def purge_room(
         self,
         room_id: str,
-        delete_id: str,
-        force: bool = False,
-        shutdown_response: Optional[ShutdownRoomResponse] = None,
+        force: bool,
     ) -> None:
         """Purge the given room from the database.
 
@@ -475,10 +353,6 @@ class PaginationHandler:
         """
         logger.info("starting purge room_id=%s force=%s", room_id, force)
 
-        action = DeleteStatus.ACTION_PURGE
-        if shutdown_response:
-            action = DeleteStatus.ACTION_SHUTDOWN
-
         async with self.pagination_lock.write(room_id):
             # first check that we have no users in this room
             joined = await self.store.is_host_joined(room_id, self._server_name)
@@ -491,25 +365,8 @@ class PaginationHandler:
                 else:
                     raise SynapseError(400, "Users are still joined to this room")
 
-            await self.store.upsert_room_to_delete(
-                room_id,
-                delete_id,
-                action,
-                DeleteStatus.STATUS_PURGING,
-                response=json.dumps(shutdown_response),
-            )
-
             await self._storage_controllers.purge_events.purge_room(room_id)
 
-            await self.store.upsert_room_to_delete(
-                room_id,
-                delete_id,
-                action,
-                DeleteStatus.STATUS_COMPLETE,
-                timestamp=self.clock.time_msec(),
-                response=json.dumps(shutdown_response),
-            )
-
         logger.info("purge complete for room_id %s", room_id)
 
     @trace
@@ -789,11 +646,9 @@ class PaginationHandler:
 
     async def _shutdown_and_purge_room(
         self,
-        room_id: str,
-        delete_id: str,
-        shutdown_params: ShutdownRoomParams,
-        shutdown_response: Optional[ShutdownRoomResponse] = None,
-    ) -> None:
+        task: ScheduledTask,
+        first_launch: bool,
+    ) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]:
         """
         Shuts down and purges a room.
 
@@ -807,50 +662,36 @@ class PaginationHandler:
 
         Keeps track of the `DeleteStatus` (and `ShutdownRoomResponse`) in `self._delete_by_id` and persisted in DB
         """
-
-        self._purges_in_progress_by_room.add(room_id)
-        try:
-            shutdown_response = await self._room_shutdown_handler.shutdown_room(
-                room_id=room_id,
-                delete_id=delete_id,
-                shutdown_params=shutdown_params,
-                shutdown_response=shutdown_response,
+        if task.resource_id is None or task.params is None:
+            raise Exception(
+                "No room id and/or no parameters passed to shutdown_and_purge_room task"
             )
 
-            if shutdown_params["purge"]:
-                await self.purge_room(
-                    room_id,
-                    delete_id,
-                    shutdown_params["force_purge"],
-                    shutdown_response=shutdown_response,
-                )
+        room_id = task.resource_id
 
-            await self.store.upsert_room_to_delete(
-                room_id,
-                delete_id,
-                DeleteStatus.ACTION_SHUTDOWN,
-                DeleteStatus.STATUS_COMPLETE,
-                timestamp=self.clock.time_msec(),
-                response=json.dumps(shutdown_response),
-            )
-        except Exception:
-            f = Failure()
-            logger.error(
-                "failed",
-                exc_info=(f.type, f.value, f.getTracebackObject()),
-            )
-            await self.store.upsert_room_to_delete(
+        async def update_result(result: Optional[JsonMapping]) -> None:
+            await self._task_scheduler.update_task(task.id, result=result)
+
+        shutdown_result = await self._room_shutdown_handler.shutdown_room(
+            room_id, task.params, task.result, update_result
+        )
+
+        if task.params.get("purge", False):
+            await self.purge_room(
                 room_id,
-                delete_id,
-                DeleteStatus.ACTION_SHUTDOWN,
-                DeleteStatus.STATUS_FAILED,
-                timestamp=self.clock.time_msec(),
-                error=f.getErrorMessage(),
+                task.params.get("force_purge", False),
             )
-        finally:
-            self._purges_in_progress_by_room.discard(room_id)
 
-    def start_shutdown_and_purge_room(
+        return (TaskStatus.COMPLETE, shutdown_result, None)
+
+    async def get_current_delete_tasks(self, room_id: str) -> List[ScheduledTask]:
+        return await self._task_scheduler.get_tasks(
+            actions=["purge_history", "purge_room", "shutdown_and_purge_room"],
+            resource_ids=[room_id],
+            statuses=[TaskStatus.ACTIVE, TaskStatus.SCHEDULED],
+        )
+
+    async def start_shutdown_and_purge_room(
         self,
         room_id: str,
         shutdown_params: ShutdownRoomParams,
@@ -864,7 +705,7 @@ class PaginationHandler:
         Returns:
             unique ID for this delete transaction.
         """
-        if room_id in self._purges_in_progress_by_room:
+        if len(await self.get_current_delete_tasks(room_id)) > 0:
             raise SynapseError(400, "Purge already in progress for %s" % (room_id,))
 
         # This check is double to `RoomShutdownHandler.shutdown_room`
@@ -877,7 +718,11 @@ class PaginationHandler:
                     400, "User must be our own: %s" % (new_room_user_id,)
                 )
 
-        delete_id = random_string(16)
+        delete_id = await self._task_scheduler.schedule_task(
+            "shutdown_and_purge_room",
+            resource_id=room_id,
+            params=shutdown_params,
+        )
 
         # we log the delete_id here so that it can be tied back to the
         # request id in the log lines.
@@ -887,11 +732,4 @@ class PaginationHandler:
             delete_id,
         )
 
-        run_as_background_process(
-            "shutdown_and_purge_room",
-            self._shutdown_and_purge_room,
-            room_id,
-            delete_id,
-            shutdown_params,
-        )
         return delete_id
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index f7f9d9d2f5..ce678d41fc 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -14,14 +14,13 @@
 
 """Contains functions for performing actions on rooms."""
 import itertools
-import json
 import logging
 import math
 import random
 import string
 from collections import OrderedDict
 from http import HTTPStatus
-from typing import TYPE_CHECKING, Any, Awaitable, Dict, List, Optional, Tuple
+from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, List, Optional, Tuple
 
 import attr
 from typing_extensions import TypedDict
@@ -59,6 +58,7 @@ from synapse.rest.admin._base import assert_user_is_admin
 from synapse.streams import EventSource
 from synapse.types import (
     JsonDict,
+    JsonMapping,
     MutableStateMap,
     Requester,
     RoomAlias,
@@ -1883,10 +1883,12 @@ class RoomShutdownHandler:
     async def shutdown_room(
         self,
         room_id: str,
-        delete_id: str,
-        shutdown_params: ShutdownRoomParams,
-        shutdown_response: Optional[ShutdownRoomResponse] = None,
-    ) -> ShutdownRoomResponse:
+        params: JsonMapping,
+        result: Optional[JsonMapping] = None,
+        update_result_fct: Optional[
+            Callable[[Optional[JsonMapping]], Awaitable[None]]
+        ] = None,
+    ) -> Optional[JsonMapping]:
         """
         Shuts down a room. Moves all local users and room aliases automatically
         to a new room if `new_room_user_id` is set. Otherwise local users only
@@ -1908,21 +1910,16 @@ class RoomShutdownHandler:
 
         Returns: a dict matching `ShutdownRoomResponse`.
         """
-
-        requester_user_id = shutdown_params["requester_user_id"]
-        new_room_user_id = shutdown_params["new_room_user_id"]
-        block = shutdown_params["block"]
+        requester_user_id = params["requester_user_id"]
+        new_room_user_id = params["new_room_user_id"]
+        block = params["block"]
 
         new_room_name = (
-            shutdown_params["new_room_name"]
-            if shutdown_params["new_room_name"]
+            params["new_room_name"]
+            if params["new_room_name"]
             else self.DEFAULT_ROOM_NAME
         )
-        message = (
-            shutdown_params["message"]
-            if shutdown_params["message"]
-            else self.DEFAULT_MESSAGE
-        )
+        message = params["message"] if params["message"] else self.DEFAULT_MESSAGE
 
         if not RoomID.is_valid(room_id):
             raise SynapseError(400, "%s is not a legal room ID" % (room_id,))
@@ -1934,21 +1931,15 @@ class RoomShutdownHandler:
                 403, "Shutdown of this room is forbidden", Codes.FORBIDDEN
             )
 
-        if not shutdown_response:
-            shutdown_response = {
+        result = (
+            dict(result)
+            if result
+            else {
                 "kicked_users": [],
                 "failed_to_kick_users": [],
                 "local_aliases": [],
                 "new_room_id": None,
             }
-
-        await self.store.upsert_room_to_delete(
-            room_id,
-            delete_id,
-            DeleteStatus.ACTION_SHUTDOWN,
-            DeleteStatus.STATUS_SHUTTING_DOWN,
-            params=json.dumps(shutdown_params),
-            response=json.dumps(shutdown_response),
         )
 
         # Action the block first (even if the room doesn't exist yet)
@@ -1959,9 +1950,9 @@ class RoomShutdownHandler:
 
         if not await self.store.get_room(room_id):
             # if we don't know about the room, there is nothing left to do.
-            return shutdown_response
+            return result
 
-        new_room_id = shutdown_response.get("new_room_id")
+        new_room_id = result.get("new_room_id")
         if new_room_user_id is not None and new_room_id is None:
             if not self.hs.is_mine_id(new_room_user_id):
                 raise SynapseError(
@@ -1982,15 +1973,9 @@ class RoomShutdownHandler:
                 ratelimit=False,
             )
 
-            shutdown_response["new_room_id"] = new_room_id
-            await self.store.upsert_room_to_delete(
-                room_id,
-                delete_id,
-                DeleteStatus.ACTION_SHUTDOWN,
-                DeleteStatus.STATUS_SHUTTING_DOWN,
-                params=json.dumps(shutdown_params),
-                response=json.dumps(shutdown_response),
-            )
+            result["new_room_id"] = new_room_id
+            if update_result_fct:
+                await update_result_fct(result)
 
             logger.info(
                 "Shutting down room %r, joining to new room: %r", room_id, new_room_id
@@ -2053,28 +2038,16 @@ class RoomShutdownHandler:
                         require_consent=False,
                     )
 
-                shutdown_response["kicked_users"].append(user_id)
-                await self.store.upsert_room_to_delete(
-                    room_id,
-                    delete_id,
-                    DeleteStatus.ACTION_SHUTDOWN,
-                    DeleteStatus.STATUS_SHUTTING_DOWN,
-                    params=json.dumps(shutdown_params),
-                    response=json.dumps(shutdown_response),
-                )
+                result["kicked_users"].append(user_id)
+                if update_result_fct:
+                    await update_result_fct(result)
             except Exception:
                 logger.exception(
                     "Failed to leave old room and join new room for %r", user_id
                 )
-                shutdown_response["failed_to_kick_users"].append(user_id)
-                await self.store.upsert_room_to_delete(
-                    room_id,
-                    delete_id,
-                    DeleteStatus.ACTION_SHUTDOWN,
-                    DeleteStatus.STATUS_SHUTTING_DOWN,
-                    params=json.dumps(shutdown_params),
-                    response=json.dumps(shutdown_response),
-                )
+                result["failed_to_kick_users"].append(user_id)
+                if update_result_fct:
+                    await update_result_fct(result)
 
         # Send message in new room and move aliases
         if new_room_user_id:
@@ -2093,7 +2066,7 @@ class RoomShutdownHandler:
                 ratelimit=False,
             )
 
-            shutdown_response["local_aliases"] = list(
+            result["local_aliases"] = list(
                 await self.store.get_aliases_for_room(room_id)
             )
 
@@ -2102,6 +2075,6 @@ class RoomShutdownHandler:
                 room_id, new_room_id, requester_user_id
             )
         else:
-            shutdown_response["local_aliases"] = []
+            result["local_aliases"] = []
 
-        return shutdown_response
+        return result
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index e3147924f7..db98cf856c 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -38,7 +38,6 @@ from synapse.event_auth import get_named_level, get_power_level_event
 from synapse.events import EventBase
 from synapse.events.snapshot import EventContext
 from synapse.handlers.profile import MAX_AVATAR_URL_LEN, MAX_DISPLAYNAME_LEN
-from synapse.handlers.room import DeleteStatus
 from synapse.handlers.state_deltas import MatchChange, StateDeltasHandler
 from synapse.logging import opentracing
 from synapse.metrics import event_processing_positions
@@ -57,7 +56,6 @@ from synapse.types import (
 from synapse.types.state import StateFilter
 from synapse.util.async_helpers import Linearizer
 from synapse.util.distributor import user_left_room
-from synapse.util.stringutils import random_string
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
@@ -96,6 +94,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
         self.event_creation_handler = hs.get_event_creation_handler()
         self.account_data_handler = hs.get_account_data_handler()
         self.event_auth_handler = hs.get_event_auth_handler()
+        self.task_scheduler = hs.get_task_scheduler()
 
         self.member_linearizer: Linearizer = Linearizer(name="member")
         self.member_as_limiter = Linearizer(max_count=10, name="member_as_limiter")
@@ -318,12 +317,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
             and self._purge_retention_period
             and await self.store.is_locally_forgotten_room(room_id)
         ):
-            delete_id = random_string(16)
-            await self.store.upsert_room_to_delete(
-                room_id,
-                delete_id,
-                DeleteStatus.ACTION_PURGE,
-                DeleteStatus.STATUS_SCHEDULED,
+            await self.task_scheduler.schedule_task(
+                "purge_room",
+                resource_id=room_id,
                 timestamp=self.clock.time_msec() + self._purge_retention_period,
             )
 
diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py
index 0cabdd1dc6..fafa9ea428 100644
--- a/synapse/rest/admin/__init__.py
+++ b/synapse/rest/admin/__init__.py
@@ -93,7 +93,7 @@ from synapse.rest.admin.users import (
     UserTokenRestServlet,
     WhoisRestServlet,
 )
-from synapse.types import JsonDict, RoomStreamToken
+from synapse.types import JsonDict, RoomStreamToken, TaskStatus
 from synapse.util import SYNAPSE_VERSION
 
 if TYPE_CHECKING:
@@ -215,12 +215,21 @@ class PurgeHistoryStatusRestServlet(RestServlet):
     ) -> Tuple[int, JsonDict]:
         await assert_requester_is_admin(self.auth, request)
 
-        purge_status = await self.pagination_handler.get_delete_status(purge_id)
-        if purge_status is None:
+        purge_task = await self.pagination_handler.get_delete_task(purge_id)
+        if purge_task is None or purge_task.action != "purge_history":
             raise NotFoundError("purge id '%s' not found" % purge_id)
 
+        result = {
+            "status": purge_task.status
+            if purge_task.status == TaskStatus.COMPLETE
+            or purge_task.status == TaskStatus.FAILED
+            else "active",
+        }
+        if purge_task.error:
+            result["error"] = purge_task.error
+
         # TODO active vs purging etc
-        return HTTPStatus.OK, purge_status.asdict(use_purge_history_format=True)
+        return HTTPStatus.OK, result
 
 
 ########################################################################################
diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py
index 9e31d018b1..62cdb9af38 100644
--- a/synapse/rest/admin/rooms.py
+++ b/synapse/rest/admin/rooms.py
@@ -19,7 +19,6 @@ from urllib import parse as urlparse
 from synapse.api.constants import Direction, EventTypes, JoinRules, Membership
 from synapse.api.errors import AuthError, Codes, NotFoundError, SynapseError
 from synapse.api.filtering import Filter
-from synapse.handlers.room import DeleteStatus
 from synapse.http.servlet import (
     ResolveRoomIdMixin,
     RestServlet,
@@ -37,10 +36,16 @@ from synapse.rest.admin._base import (
 )
 from synapse.storage.databases.main.room import RoomSortOrder
 from synapse.streams.config import PaginationConfig
-from synapse.types import JsonDict, RoomID, UserID, create_requester
+from synapse.types import (
+    JsonDict,
+    RoomID,
+    ScheduledTask,
+    TaskStatus,
+    UserID,
+    create_requester,
+)
 from synapse.types.state import StateFilter
 from synapse.util import json_decoder
-from synapse.util.stringutils import random_string
 
 if TYPE_CHECKING:
     from synapse.api.auth import Auth
@@ -119,7 +124,7 @@ class RoomRestV2Servlet(RestServlet):
                 403, "Shutdown of this room is forbidden", Codes.FORBIDDEN
             )
 
-        delete_id = self._pagination_handler.start_shutdown_and_purge_room(
+        delete_id = await self._pagination_handler.start_shutdown_and_purge_room(
             room_id=room_id,
             shutdown_params={
                 "new_room_user_id": content.get("new_room_user_id"),
@@ -135,6 +140,14 @@ class RoomRestV2Servlet(RestServlet):
         return HTTPStatus.OK, {"delete_id": delete_id}
 
 
+def _convert_delete_task_to_response(task: ScheduledTask) -> JsonDict:
+    return {
+        "delete_id": task.id,
+        "status": task.status,
+        "shutdown_room": task.result,
+    }
+
+
 class DeleteRoomStatusByRoomIdRestServlet(RestServlet):
     """Get the status of the delete room background task."""
 
@@ -154,16 +167,14 @@ class DeleteRoomStatusByRoomIdRestServlet(RestServlet):
                 HTTPStatus.BAD_REQUEST, "%s is not a legal room ID" % (room_id,)
             )
 
-        delete_statuses = await self._pagination_handler.get_delete_statuses_by_room(
-            room_id
-        )
+        delete_tasks = await self._pagination_handler.get_delete_tasks_by_room(room_id)
 
         response = []
-        for delete_status in delete_statuses:
+        for delete_task in delete_tasks:
             # We ignore scheduled deletes because currently they are only used
             # for automatically purging forgotten room after X time.
-            if delete_status.status != DeleteStatus.STATUS_SCHEDULED:
-                response += [delete_status.asdict()]
+            if delete_task.status != TaskStatus.SCHEDULED:
+                response += [_convert_delete_task_to_response(delete_task)]
 
         if response:
             return HTTPStatus.OK, {"results": cast(JsonDict, response)}
@@ -185,11 +196,14 @@ class DeleteRoomStatusByDeleteIdRestServlet(RestServlet):
     ) -> Tuple[int, JsonDict]:
         await assert_requester_is_admin(self._auth, request)
 
-        delete_status = await self._pagination_handler.get_delete_status(delete_id)
-        if delete_status is None:
+        delete_task = await self._pagination_handler.get_delete_task(delete_id)
+        if delete_task is None or (
+            delete_task.action != "purge_room"
+            and delete_task.action != "shutdown_and_purge_room"
+        ):
             raise NotFoundError("delete id '%s' not found" % delete_id)
 
-        return HTTPStatus.OK, cast(JsonDict, delete_status.asdict())
+        return HTTPStatus.OK, _convert_delete_task_to_response(delete_task)
 
 
 class ListRoomRestServlet(RestServlet):
@@ -351,12 +365,9 @@ class RoomRestServlet(RestServlet):
                 Codes.BAD_JSON,
             )
 
-        delete_id = random_string(16)
-
         ret = await room_shutdown_handler.shutdown_room(
             room_id=room_id,
-            delete_id=delete_id,
-            shutdown_params={
+            params={
                 "new_room_user_id": content.get("new_room_user_id"),
                 "new_room_name": content.get("room_name"),
                 "message": content.get("message"),
@@ -370,9 +381,7 @@ class RoomRestServlet(RestServlet):
         # Purge room
         if purge:
             try:
-                await pagination_handler.purge_room(
-                    room_id, delete_id, force=force_purge
-                )
+                await pagination_handler.purge_room(room_id, force=force_purge)
             except NotFoundError:
                 if block:
                     # We can block unknown rooms with this endpoint, in which case
diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index 2c0b573d4c..582875c91a 100644
--- a/synapse/storage/databases/main/roommember.py
+++ b/synapse/storage/databases/main/roommember.py
@@ -17,7 +17,6 @@ from itertools import chain
 from typing import (
     TYPE_CHECKING,
     AbstractSet,
-    Any,
     Collection,
     Dict,
     FrozenSet,
@@ -1284,113 +1283,6 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
         # If any rows still exist it means someone has not forgotten this room yet
         return not rows[0][0]
 
-    async def upsert_room_to_delete(
-        self,
-        room_id: str,
-        delete_id: str,
-        action: str,
-        status: str,
-        timestamp: Optional[int] = None,
-        params: Optional[str] = None,
-        response: Optional[str] = None,
-        error: Optional[str] = None,
-    ) -> None:
-        """Insert or update a room to shutdown/purge.
-
-        Args:
-            room_id: The room ID to shutdown/purge
-            delete_id: The delete ID identifying this action
-            action: the type of job, mainly `shutdown` `purge` or `purge_history`
-            status: Current status of the delete. Cf `DeleteStatus` for possible values
-            timestamp: Time of the last update. If status is `wait_purge`,
-                then it specifies when to do the purge, with an empty value specifying ASAP
-            error: Error message to return, if any
-            params: JSON representation of delete job parameters
-            response: JSON representation of delete current status
-        """
-        await self.db_pool.simple_upsert(
-            "rooms_to_delete",
-            {
-                "room_id": room_id,
-                "delete_id": delete_id,
-            },
-            {
-                "action": action,
-                "status": status,
-                "timestamp": timestamp,
-                "params": params,
-                "response": response,
-                "error": error,
-            },
-            desc="upsert_room_to_delete",
-        )
-
-    async def delete_room_to_delete(self, room_id: str, delete_id: str) -> None:
-        """Remove a room from the list of rooms to purge.
-
-        Args:
-            room_id: The room ID matching the delete to remove
-            delete_id: The delete ID identifying the delete to remove
-        """
-
-        await self.db_pool.simple_delete(
-            "rooms_to_delete",
-            keyvalues={
-                "room_id": room_id,
-                "delete_id": delete_id,
-            },
-            desc="delete_room_to_delete",
-        )
-
-    async def get_rooms_to_delete(
-        self, room_id: Optional[str] = None
-    ) -> List[Dict[str, Any]]:
-        """Returns all delete jobs. This includes those that have been
-        interrupted by a stop/restart of synapse, but also scheduled ones
-        like locally forgotten rooms.
-
-        Args:
-            room_id: if specified, will only return the delete jobs for a specific room
-
-        """
-        keyvalues = {}
-        if room_id is not None:
-            keyvalues["room_id"] = room_id
-
-        return await self.db_pool.simple_select_list(
-            table="rooms_to_delete",
-            keyvalues=keyvalues,
-            retcols=(
-                "room_id",
-                "delete_id",
-                "action",
-                "status",
-                "timestamp",
-                "params",
-                "response",
-                "error",
-            ),
-            desc="rooms_to_delete_fetch",
-        )
-
-    async def get_room_to_delete(self, delete_id: str) -> Optional[Dict[str, Any]]:
-        """Return the delete job identified by delete_id."""
-        return await self.db_pool.simple_select_one(
-            table="rooms_to_delete",
-            keyvalues={"delete_id": delete_id},
-            retcols=(
-                "room_id",
-                "delete_id",
-                "action",
-                "status",
-                "timestamp",
-                "params",
-                "response",
-                "error",
-            ),
-            desc="rooms_to_delete_fetch",
-        )
-
     async def get_rooms_user_has_been_in(self, user_id: str) -> Set[str]:
         """Get all rooms that the user has ever been in.
 
diff --git a/synapse/storage/schema/main/delta/78/03_rooms_to_delete.sql b/synapse/storage/schema/main/delta/78/03_rooms_to_delete.sql
deleted file mode 100644
index 8f9c8c7010..0000000000
--- a/synapse/storage/schema/main/delta/78/03_rooms_to_delete.sql
+++ /dev/null
@@ -1,27 +0,0 @@
-/* Copyright 2023 The Matrix.org Foundation C.I.C
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
--- cf upsert_room_to_delete docstring for the meaning of the fields.
-CREATE TABLE IF NOT EXISTS rooms_to_delete(
-    room_id text NOT NULL,
-    delete_id text NOT NULL,
-    action text NOT NULL,
-    status text NOT NULL,
-    timestamp bigint,
-    params text,
-    response text,
-    error text,
-    UNIQUE(room_id, delete_id)
-);
diff --git a/tests/rest/admin/test_room.py b/tests/rest/admin/test_room.py
index ba8afbc2b9..6b1138acb6 100644
--- a/tests/rest/admin/test_room.py
+++ b/tests/rest/admin/test_room.py
@@ -24,12 +24,11 @@ from twisted.test.proto_helpers import MemoryReactor
 import synapse.rest.admin
 from synapse.api.constants import EventTypes, Membership, RoomTypes
 from synapse.api.errors import Codes
-from synapse.handlers.pagination import DeleteStatus, PaginationHandler
 from synapse.rest.client import directory, events, login, room
 from synapse.server import HomeServer
 from synapse.types import UserID
 from synapse.util import Clock
-from synapse.util.stringutils import random_string
+from synapse.util.task_scheduler import TaskScheduler
 
 from tests import unittest
 
@@ -50,6 +49,7 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase):
 
     def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
         self.event_creation_handler = hs.get_event_creation_handler()
+        self.task_scheduler = hs.get_task_scheduler()
         hs.config.consent.user_consent_version = "1"
 
         consent_uri_builder = Mock()
@@ -480,6 +480,7 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
 
     def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
         self.event_creation_handler = hs.get_event_creation_handler()
+        self.task_scheduler = hs.get_task_scheduler()
         hs.config.consent.user_consent_version = "1"
 
         consent_uri_builder = Mock()
@@ -668,7 +669,7 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
         delete_id1 = channel.json_body["delete_id"]
 
         # go ahead
-        self.reactor.advance(PaginationHandler.CLEAR_PURGE_AFTER_MS / 1000 / 2)
+        self.reactor.advance(TaskScheduler.KEEP_TASKS_FOR_MS / 1000 / 2)
 
         # second task
         channel = self.make_request(
@@ -700,7 +701,7 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
 
         # get status after more than clearing time for first task
         # second task is not cleared
-        self.reactor.advance(PaginationHandler.CLEAR_PURGE_AFTER_MS / 1000 / 2)
+        self.reactor.advance(TaskScheduler.KEEP_TASKS_FOR_MS / 1000 / 2)
 
         channel = self.make_request(
             "GET",
@@ -714,7 +715,7 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
         self.assertEqual(delete_id2, channel.json_body["results"][0]["delete_id"])
 
         # get status after more than clearing time for all tasks
-        self.reactor.advance(PaginationHandler.CLEAR_PURGE_AFTER_MS / 1000 / 2)
+        self.reactor.advance(TaskScheduler.KEEP_TASKS_FOR_MS / 1000 / 2)
 
         channel = self.make_request(
             "GET",
@@ -725,48 +726,6 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
         self.assertEqual(404, channel.code, msg=channel.json_body)
         self.assertEqual(Codes.NOT_FOUND, channel.json_body["errcode"])
 
-    def test_delete_same_room_twice(self) -> None:
-        """Test that the call for delete a room at second time gives an exception."""
-
-        body = {"new_room_user_id": self.admin_user}
-
-        # first call to delete room
-        # and do not wait for finish the task
-        first_channel = self.make_request(
-            "DELETE",
-            self.url.encode("ascii"),
-            content=body,
-            access_token=self.admin_user_tok,
-            await_result=False,
-        )
-
-        # second call to delete room
-        second_channel = self.make_request(
-            "DELETE",
-            self.url.encode("ascii"),
-            content=body,
-            access_token=self.admin_user_tok,
-        )
-
-        self.assertEqual(400, second_channel.code, msg=second_channel.json_body)
-        self.assertEqual(Codes.UNKNOWN, second_channel.json_body["errcode"])
-        self.assertEqual(
-            f"Purge already in progress for {self.room_id}",
-            second_channel.json_body["error"],
-        )
-
-        # get result of first call
-        first_channel.await_result()
-        self.assertEqual(200, first_channel.code, msg=first_channel.json_body)
-        self.assertIn("delete_id", first_channel.json_body)
-
-        # check status after finish the task
-        self._test_result(
-            first_channel.json_body["delete_id"],
-            self.other_user,
-            expect_new_room=True,
-        )
-
     def test_purge_room_and_block(self) -> None:
         """Test to purge a room and block it.
         Members will not be moved to a new room and will not receive a message.
@@ -1005,7 +964,7 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
 
         self._is_purged(room_id)
 
-    def test_resume_purge_room(self) -> None:
+    def test_scheduled_purge_room(self) -> None:
         # Create a test room
         room_id = self.helper.create_room_as(
             self.admin_user,
@@ -1013,12 +972,12 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
         )
         self.helper.leave(room_id, user=self.admin_user, tok=self.admin_user_tok)
 
+        # Schedule a purge 10 seconds in the future
         self.get_success(
-            self.store.upsert_room_to_delete(
-                room_id,
-                random_string(16),
-                DeleteStatus.ACTION_PURGE,
-                DeleteStatus.STATUS_PURGING,
+            self.task_scheduler.schedule_task(
+                "purge_room",
+                resource_id=room_id,
+                timestamp=self.clock.time_msec() + 10 * 1000,
             )
         )
 
@@ -1026,38 +985,34 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
         with self.assertRaises(AssertionError):
             self._is_purged(room_id)
 
-        # Advance one hour in the future past `PURGE_ROOMS_INTERVAL_MS` so that
-        # the automatic purging takes place and resumes the purge
+        # Advance one hour in the future past `TaskScheduler.SCHEDULE_INTERVAL_MS` so that
+        # the automatic purging takes place and launch the purge
         self.reactor.advance(ONE_HOUR_IN_S)
 
         self._is_purged(room_id)
 
-    def test_resume_shutdown_room(self) -> None:
+    def test_schedule_shutdown_room(self) -> None:
         # Create a test room
         room_id = self.helper.create_room_as(
             self.other_user,
             tok=self.other_user_tok,
         )
 
-        delete_id = random_string(16)
-
-        self.get_success(
-            self.store.upsert_room_to_delete(
-                room_id,
-                delete_id,
-                DeleteStatus.ACTION_SHUTDOWN,
-                DeleteStatus.STATUS_SHUTTING_DOWN,
-                params=json.dumps(
-                    {
-                        "requester_user_id": self.admin_user,
-                        "new_room_user_id": self.admin_user,
-                        "new_room_name": None,
-                        "message": None,
-                        "block": False,
-                        "purge": True,
-                        "force_purge": True,
-                    }
-                ),
+        # Schedule a shutdown 10 seconds in the future
+        delete_id = self.get_success(
+            self.task_scheduler.schedule_task(
+                "shutdown_and_purge_room",
+                resource_id=room_id,
+                params={
+                    "requester_user_id": self.admin_user,
+                    "new_room_user_id": self.admin_user,
+                    "new_room_name": None,
+                    "message": None,
+                    "block": False,
+                    "purge": True,
+                    "force_purge": True,
+                },
+                timestamp=self.clock.time_msec() + 10 * 1000,
             )
         )
 
@@ -1068,7 +1023,7 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
         with self.assertRaises(AssertionError):
             self._is_purged(room_id)
 
-        # Advance one hour in the future past `PURGE_ROOMS_INTERVAL_MS` so that
+        # Advance one hour in the future past `TaskScheduler.SCHEDULE_INTERVAL_MS` so that
         # the automatic purging takes place and resumes the purge
         self.reactor.advance(ONE_HOUR_IN_S)
 
@@ -2081,14 +2036,11 @@ class RoomMessagesTestCase(unittest.HomeserverTestCase):
         self.assertEqual(len(chunk), 2, [event["content"] for event in chunk])
 
         # Purge every event before the second event.
-        purge_id = random_string(16)
         self.get_success(
-            pagination_handler._purge_history(
-                purge_id=purge_id,
+            pagination_handler.purge_history(
                 room_id=self.room_id,
                 token=second_token_str,
                 delete_local_events=True,
-                update_rooms_to_delete_table=True,
             )
         )
 
diff --git a/tests/rest/admin/test_server_notice.py b/tests/rest/admin/test_server_notice.py
index d14da9fd0e..dfd14f5751 100644
--- a/tests/rest/admin/test_server_notice.py
+++ b/tests/rest/admin/test_server_notice.py
@@ -414,13 +414,12 @@ class ServerNoticeTestCase(unittest.HomeserverTestCase):
         self.assertEqual(messages[0]["content"]["body"], "test msg one")
         self.assertEqual(messages[0]["sender"], "@notices:test")
 
-        delete_id = random_string(16)
+        random_string(16)
 
         # shut down and purge room
         self.get_success(
             self.room_shutdown_handler.shutdown_room(
                 first_room_id,
-                delete_id,
                 {
                     "requester_user_id": self.admin_user,
                     "new_room_user_id": None,
@@ -432,7 +431,7 @@ class ServerNoticeTestCase(unittest.HomeserverTestCase):
                 },
             )
         )
-        self.get_success(self.pagination_handler.purge_room(first_room_id, "delete_id"))
+        self.get_success(self.pagination_handler.purge_room(first_room_id, force=False))
 
         # user is not member anymore
         self._check_invite_and_join_status(self.other_user, 0, 0)
diff --git a/tests/rest/client/test_rooms.py b/tests/rest/client/test_rooms.py
index 98c3f99d11..88643cca9e 100644
--- a/tests/rest/client/test_rooms.py
+++ b/tests/rest/client/test_rooms.py
@@ -2088,14 +2088,11 @@ class RoomMessageListTestCase(RoomBase):
         self.assertEqual(len(chunk), 2, [event["content"] for event in chunk])
 
         # Purge every event before the second event.
-        purge_id = random_string(16)
         self.get_success(
-            pagination_handler._purge_history(
-                purge_id=purge_id,
+            pagination_handler.purge_history(
                 room_id=self.room_id,
                 token=second_token_str,
                 delete_local_events=True,
-                update_rooms_to_delete_table=True,
             )
         )