summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorMathieu Velten <mathieuv@matrix.org>2023-06-26 17:07:53 +0200
committerMathieu Velten <mathieuv@matrix.org>2023-06-30 15:58:11 +0200
commit5bce6397aa477f55a2eb89a44ee449bdc486f8c4 (patch)
treef9d38d81f56de6d4e2ea89b65343d891250aee45 /synapse
parentcomments (diff)
downloadsynapse-5bce6397aa477f55a2eb89a44ee449bdc486f8c4.tar.xz
Use DB for all purge/shutdown actions, including purge history
Diffstat (limited to 'synapse')
-rw-r--r--synapse/handlers/pagination.py239
-rw-r--r--synapse/handlers/room.py45
-rw-r--r--synapse/handlers/room_member.py15
-rw-r--r--synapse/rest/admin/__init__.py5
-rw-r--r--synapse/rest/admin/rooms.py20
-rw-r--r--synapse/storage/databases/main/roommember.py80
-rw-r--r--synapse/storage/schema/main/delta/78/03_rooms_to_delete.sql (renamed from synapse/storage/schema/main/delta/78/03_rooms_to_purge.sql)11
7 files changed, 238 insertions, 177 deletions
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 4ed4c301e2..adedff94bd 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -14,9 +14,7 @@
 # limitations under the License.
 import json
 import logging
-from typing import TYPE_CHECKING, Dict, List, Optional, Set
-
-import attr
+from typing import TYPE_CHECKING, List, Optional, Set
 
 from twisted.python.failure import Failure
 
@@ -29,7 +27,7 @@ from synapse.logging.opentracing import trace
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.rest.admin._base import assert_user_is_admin
 from synapse.streams.config import PaginationConfig
-from synapse.types import JsonDict, Requester, StrCollection, StreamKeyType
+from synapse.types import JsonDict, Requester, StreamKeyType
 from synapse.types.state import StateFilter
 from synapse.util.async_helpers import ReadWriteLock
 from synapse.util.stringutils import random_string
@@ -42,37 +40,6 @@ if TYPE_CHECKING:
 logger = logging.getLogger(__name__)
 
 
-@attr.s(slots=True, auto_attribs=True)
-class PurgeStatus:
-    """Object tracking the status of a purge request
-
-    This class contains information on the progress of a purge request, for
-    return by get_purge_status.
-    """
-
-    STATUS_ACTIVE = 0
-    STATUS_COMPLETE = 1
-    STATUS_FAILED = 2
-
-    STATUS_TEXT = {
-        STATUS_ACTIVE: "active",
-        STATUS_COMPLETE: "complete",
-        STATUS_FAILED: "failed",
-    }
-
-    # Save the error message if an error occurs
-    error: str = ""
-
-    # Tracks whether this request has completed. One of STATUS_{ACTIVE,COMPLETE,FAILED}.
-    status: int = STATUS_ACTIVE
-
-    def asdict(self) -> JsonDict:
-        ret = {"status": PurgeStatus.STATUS_TEXT[self.status]}
-        if self.error:
-            ret["error"] = self.error
-        return ret
-
-
 class PaginationHandler:
     """Handles pagination and purge history requests.
 
@@ -100,13 +67,6 @@ class PaginationHandler:
         self.pagination_lock = ReadWriteLock()
         # IDs of rooms in which there currently an active purge *or delete* operation.
         self._purges_in_progress_by_room: Set[str] = set()
-        # map from purge id to PurgeStatus
-        self._purges_by_id: Dict[str, PurgeStatus] = {}
-        # map from purge id to DeleteStatus
-        self._delete_by_id: Dict[str, DeleteStatus] = {}
-        # map from room id to delete ids
-        # Dict[`room_id`, List[`delete_id`]]
-        self._delete_by_room: Dict[str, List[str]] = {}
         self._event_serializer = hs.get_event_client_serializer()
 
         self._retention_default_max_lifetime = (
@@ -151,11 +111,12 @@ class PaginationHandler:
 
         It should be run regularly.
         """
-        rooms_to_purge = await self.store.get_rooms_to_purge()
-        for r in rooms_to_purge:
+        rooms_to_delete = await self.store.get_rooms_to_delete()
+        for r in rooms_to_delete:
             room_id = r["room_id"]
             delete_id = r["delete_id"]
             status = r["status"]
+            action = r["action"]
             timestamp = r["timestamp"]
 
             if (
@@ -165,32 +126,25 @@ class PaginationHandler:
                 # remove the delete from the list 24 hours after it completes or fails
                 ms_since_completed = self.clock.time_msec() - timestamp
                 if ms_since_completed >= PaginationHandler.CLEAR_PURGE_AFTER_MS:
-                    await self.store.delete_room_to_purge(room_id, delete_id)
-
-                    del self._delete_by_id[delete_id]
-                    self._delete_by_room[room_id].remove(delete_id)
-                    if not self._delete_by_room[room_id]:
-                        del self._delete_by_room[room_id]
+                    await self.store.delete_room_to_delete(room_id, delete_id)
 
                 continue
 
-            delete_status = self._delete_by_id.get(delete_id)
-            if delete_status is not None:
+            if room_id in self._purges_in_progress_by_room:
                 # a delete background task is already running (or has run)
-                # for this delete id, let's ignore it
+                # for this room id, let's ignore it for now
                 continue
 
-            self._delete_by_id[delete_id] = DeleteStatus()
-            self._delete_by_id[delete_id].status = status
-            self._delete_by_room.setdefault(room_id, []).append(delete_id)
-
             # If the database says we were last in the middle of shutting down the room,
             # let's continue the shutdown process.
             shutdown_response = None
-            if status == DeleteStatus.STATUS_SHUTTING_DOWN:
-                shutdown_params = json.loads(r["shutdown_params"])
-                if r["shutdown_response"]:
-                    shutdown_response = json.loads(r["shutdown_response"])
+            if (
+                action == DeleteStatus.ACTION_SHUTDOWN
+                and status == DeleteStatus.STATUS_SHUTTING_DOWN
+            ):
+                shutdown_params = json.loads(r["params"])
+                if r["response"]:
+                    shutdown_response = json.loads(r["response"])
                 await self._shutdown_and_purge_room(
                     room_id,
                     delete_id,
@@ -204,16 +158,33 @@ class PaginationHandler:
             if status == DeleteStatus.STATUS_PURGING:
                 purge_now = True
             # Or if we're at or past the scheduled purge time, let's start that one as well
-            elif status == DeleteStatus.STATUS_SCHEDULED_PURGE and (
+            elif status == DeleteStatus.STATUS_SCHEDULED and (
                 timestamp is None or self.clock.time_msec() >= timestamp
             ):
                 purge_now = True
 
             # TODO 2 stages purge, keep memberships for a while so we don't "break" sync
             if purge_now:
-                await self.purge_room(
-                    room_id, delete_id, True, shutdown_response=shutdown_response
-                )
+                params = {}
+                if r["params"]:
+                    params = json.loads(r["params"])
+
+                if action == DeleteStatus.ACTION_PURGE_HISTORY:
+                    if "token" in params:
+                        await self._purge_history(
+                            delete_id,
+                            room_id,
+                            params["token"],
+                            params.get("delete_local_events", False),
+                            True,
+                        )
+                elif action == DeleteStatus.ACTION_PURGE:
+                    await self.purge_room(
+                        room_id,
+                        delete_id,
+                        params.get("force", False),
+                        shutdown_response=shutdown_response,
+                    )
 
     async def purge_history_for_rooms_in_range(
         self, min_ms: Optional[int], max_ms: Optional[int]
@@ -323,8 +294,6 @@ class PaginationHandler:
 
             purge_id = random_string(16)
 
-            self._purges_by_id[purge_id] = PurgeStatus()
-
             logger.info(
                 "Starting purging events in room %s (purge_id %s)" % (room_id, purge_id)
             )
@@ -339,9 +308,10 @@ class PaginationHandler:
                 room_id,
                 token,
                 True,
+                False,
             )
 
-    def start_purge_history(
+    async def start_purge_history(
         self, room_id: str, token: str, delete_local_events: bool = False
     ) -> str:
         """Start off a history purge on a room.
@@ -366,7 +336,16 @@ class PaginationHandler:
         # request id in the log lines.
         logger.info("[purge] starting purge_id %s", purge_id)
 
-        self._purges_by_id[purge_id] = PurgeStatus()
+        await self.store.upsert_room_to_delete(
+            room_id,
+            purge_id,
+            DeleteStatus.ACTION_PURGE_HISTORY,
+            DeleteStatus.STATUS_PURGING,
+            params=json.dumps(
+                {"token": token, "delete_local_events": delete_local_events}
+            ),
+        )
+
         run_as_background_process(
             "purge_history",
             self._purge_history,
@@ -374,11 +353,17 @@ class PaginationHandler:
             room_id,
             token,
             delete_local_events,
+            True,
         )
         return purge_id
 
     async def _purge_history(
-        self, purge_id: str, room_id: str, token: str, delete_local_events: bool
+        self,
+        purge_id: str,
+        room_id: str,
+        token: str,
+        delete_local_events: bool,
+        update_rooms_to_delete_table: bool,
     ) -> None:
         """Carry out a history purge on a room.
 
@@ -387,6 +372,10 @@ class PaginationHandler:
             room_id: The room to purge from
             token: topological token to delete events before
             delete_local_events: True to delete local events as well as remote ones
+            update_rooms_to_delete_table: True if we don't want to update/persist this
+                purge history action to the DB to be restorable. Used with the retention
+                functionality since we don't need to explicitly restore those, they
+                will be relaunch by the retention logic.
         """
         self._purges_in_progress_by_room.add(room_id)
         try:
@@ -395,48 +384,75 @@ class PaginationHandler:
                     room_id, token, delete_local_events
                 )
             logger.info("[purge] complete")
-            self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
+            if update_rooms_to_delete_table:
+                await self.store.upsert_room_to_delete(
+                    room_id,
+                    purge_id,
+                    DeleteStatus.ACTION_PURGE_HISTORY,
+                    DeleteStatus.STATUS_COMPLETE,
+                    timestamp=self.clock.time_msec(),
+                )
         except Exception:
             f = Failure()
             logger.error(
                 "[purge] failed", exc_info=(f.type, f.value, f.getTracebackObject())
             )
-            self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED
-            self._purges_by_id[purge_id].error = f.getErrorMessage()
+            if update_rooms_to_delete_table:
+                await self.store.upsert_room_to_delete(
+                    room_id,
+                    purge_id,
+                    DeleteStatus.ACTION_PURGE_HISTORY,
+                    DeleteStatus.STATUS_FAILED,
+                    error=f.getErrorMessage(),
+                    timestamp=self.clock.time_msec(),
+                )
         finally:
             self._purges_in_progress_by_room.discard(room_id)
 
-            # remove the purge from the list 24 hours after it completes
-            def clear_purge() -> None:
-                del self._purges_by_id[purge_id]
-
-            self.hs.get_reactor().callLater(
-                PaginationHandler.CLEAR_PURGE_AFTER_MS / 1000, clear_purge
-            )
-
-    def get_purge_status(self, purge_id: str) -> Optional[PurgeStatus]:
-        """Get the current status of an active purge
+            if update_rooms_to_delete_table:
+                # remove the purge from the list 24 hours after it completes
+                async def clear_purge() -> None:
+                    await self.store.delete_room_to_delete(room_id, purge_id)
 
-        Args:
-            purge_id: purge_id returned by start_purge_history
-        """
-        return self._purges_by_id.get(purge_id)
+                self.hs.get_reactor().callLater(
+                    PaginationHandler.CLEAR_PURGE_AFTER_MS / 1000, clear_purge
+                )
 
-    def get_delete_status(self, delete_id: str) -> Optional[DeleteStatus]:
+    async def get_delete_status(self, delete_id: str) -> Optional[DeleteStatus]:
         """Get the current status of an active deleting
 
         Args:
             delete_id: delete_id returned by start_shutdown_and_purge_room
+                or start_purge_history.
         """
-        return self._delete_by_id.get(delete_id)
-
-    def get_delete_ids_by_room(self, room_id: str) -> Optional[StrCollection]:
-        """Get all active delete ids by room
+        res = await self.store.get_room_to_delete(delete_id)
+        if res:
+            status = DeleteStatus()
+            status.delete_id = res["delete_id"]
+            status.action = res["action"]
+            status.status = res["status"]
+            if status.action == DeleteStatus.ACTION_SHUTDOWN and res["response"]:
+                status.shutdown_room = json.loads(res["response"])
+            return status
+        return None
+
+    async def get_delete_statuses_by_room(self, room_id: str) -> List[DeleteStatus]:
+        """Get all active delete statuses by room
 
         Args:
             room_id: room_id that is deleted
         """
-        return self._delete_by_room.get(room_id)
+        res = await self.store.get_rooms_to_delete(room_id)
+        statuses = []
+        for r in res:
+            status = DeleteStatus()
+            status.delete_id = r["delete_id"]
+            status.action = r["action"]
+            status.status = r["status"]
+            if status.action == DeleteStatus.ACTION_SHUTDOWN and r["response"]:
+                status.shutdown_room = json.loads(r["response"])
+            statuses.append(status)
+        return statuses
 
     async def purge_room(
         self,
@@ -455,6 +471,10 @@ class PaginationHandler:
         """
         logger.info("starting purge room_id=%s force=%s", room_id, force)
 
+        action = DeleteStatus.ACTION_PURGE
+        if shutdown_response:
+            action = DeleteStatus.ACTION_SHUTDOWN
+
         async with self.pagination_lock.write(room_id):
             # first check that we have no users in this room
             if not force:
@@ -462,21 +482,23 @@ class PaginationHandler:
                 if joined:
                     raise SynapseError(400, "Users are still joined to this room")
 
-            await self.store.upsert_room_to_purge(
+            await self.store.upsert_room_to_delete(
                 room_id,
                 delete_id,
+                action,
                 DeleteStatus.STATUS_PURGING,
-                shutdown_response=json.dumps(shutdown_response),
+                response=json.dumps(shutdown_response),
             )
 
             await self._storage_controllers.purge_events.purge_room(room_id)
 
-            await self.store.upsert_room_to_purge(
+            await self.store.upsert_room_to_delete(
                 room_id,
                 delete_id,
+                action,
                 DeleteStatus.STATUS_COMPLETE,
                 timestamp=self.clock.time_msec(),
-                shutdown_response=json.dumps(shutdown_response),
+                response=json.dumps(shutdown_response),
             )
 
         logger.info("purge complete for room_id %s", room_id)
@@ -691,44 +713,41 @@ class PaginationHandler:
 
         self._purges_in_progress_by_room.add(room_id)
         try:
-            self._delete_by_id[delete_id].status = DeleteStatus.STATUS_SHUTTING_DOWN
             shutdown_response = await self._room_shutdown_handler.shutdown_room(
                 room_id=room_id,
                 delete_id=delete_id,
                 shutdown_params=shutdown_params,
                 shutdown_response=shutdown_response,
             )
-            self._delete_by_id[delete_id].shutdown_room = shutdown_response
 
             if shutdown_params["purge"]:
-                self._delete_by_id[delete_id].status = DeleteStatus.STATUS_PURGING
                 await self.purge_room(
                     room_id,
                     delete_id,
                     shutdown_params["force_purge"],
-                    shutdown_response=self._delete_by_id[delete_id].shutdown_room,
+                    shutdown_response=shutdown_response,
                 )
 
-            await self.store.upsert_room_to_purge(
+            await self.store.upsert_room_to_delete(
                 room_id,
                 delete_id,
+                DeleteStatus.ACTION_SHUTDOWN,
                 DeleteStatus.STATUS_COMPLETE,
                 timestamp=self.clock.time_msec(),
-                shutdown_response=json.dumps(shutdown_response),
+                response=json.dumps(shutdown_response),
             )
-            self._delete_by_id[delete_id].status = DeleteStatus.STATUS_COMPLETE
         except Exception:
             f = Failure()
             logger.error(
                 "failed",
                 exc_info=(f.type, f.value, f.getTracebackObject()),
             )
-            self._delete_by_id[delete_id].status = DeleteStatus.STATUS_FAILED
-            self._delete_by_id[delete_id].error = f.getErrorMessage()
-            await self.store.upsert_room_to_purge(
+            await self.store.upsert_room_to_delete(
                 room_id,
                 delete_id,
+                DeleteStatus.ACTION_SHUTDOWN,
                 DeleteStatus.STATUS_FAILED,
+                timestamp=self.clock.time_msec(),
                 error=f.getErrorMessage(),
             )
         finally:
@@ -749,9 +768,7 @@ class PaginationHandler:
             unique ID for this delete transaction.
         """
         if room_id in self._purges_in_progress_by_room:
-            raise SynapseError(
-                400, "History purge already in progress for %s" % (room_id,)
-            )
+            raise SynapseError(400, "Purge already in progress for %s" % (room_id,))
 
         # This check is double to `RoomShutdownHandler.shutdown_room`
         # But here the requester get a direct response / error with HTTP request
@@ -773,8 +790,6 @@ class PaginationHandler:
             delete_id,
         )
 
-        self._delete_by_id[delete_id] = DeleteStatus()
-        self._delete_by_room.setdefault(room_id, []).append(delete_id)
         run_as_background_process(
             "shutdown_and_purge_room",
             self._shutdown_and_purge_room,
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 0f04038068..27e3d1d739 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -1815,13 +1815,21 @@ class DeleteStatus:
     return by get_delete_status.
     """
 
+    ACTION_SHUTDOWN = "shutdown"
+    ACTION_PURGE = "purge"
+    ACTION_PURGE_HISTORY = "purge_history"
+
+    # Scheduled delete waiting to be launch at a specific time
+    STATUS_SCHEDULED = "scheduled"
     STATUS_SHUTTING_DOWN = "shutting_down"
-    # Scheduled purge waiting to be launch at a specific time
-    STATUS_SCHEDULED_PURGE = "scheduled_purge"
     STATUS_PURGING = "purging"
     STATUS_COMPLETE = "complete"
     STATUS_FAILED = "failed"
 
+    delete_id: str = ""
+
+    action: str = ACTION_PURGE
+
     # Tracks whether this request has completed.
     # One of STATUS_{PURGING,COMPLETE,FAILED,SHUTTING_DOWN,WAIT_PURGE}.
     status: str = STATUS_PURGING
@@ -1839,6 +1847,7 @@ class DeleteStatus:
 
     def asdict(self) -> JsonDict:
         ret = {
+            "delete_id": self.delete_id,
             "status": self.status,
             "shutdown_room": self.shutdown_room,
         }
@@ -1925,12 +1934,13 @@ class RoomShutdownHandler:
                 "new_room_id": None,
             }
 
-        await self.store.upsert_room_to_purge(
+        await self.store.upsert_room_to_delete(
             room_id,
             delete_id,
+            DeleteStatus.ACTION_SHUTDOWN,
             DeleteStatus.STATUS_SHUTTING_DOWN,
-            shutdown_params=json.dumps(shutdown_params),
-            shutdown_response=json.dumps(shutdown_response),
+            params=json.dumps(shutdown_params),
+            response=json.dumps(shutdown_response),
         )
 
         # Action the block first (even if the room doesn't exist yet)
@@ -1965,12 +1975,13 @@ class RoomShutdownHandler:
             )
 
             shutdown_response["new_room_id"] = new_room_id
-            await self.store.upsert_room_to_purge(
+            await self.store.upsert_room_to_delete(
                 room_id,
                 delete_id,
+                DeleteStatus.ACTION_SHUTDOWN,
                 DeleteStatus.STATUS_SHUTTING_DOWN,
-                shutdown_params=json.dumps(shutdown_params),
-                shutdown_response=json.dumps(shutdown_response),
+                params=json.dumps(shutdown_params),
+                response=json.dumps(shutdown_response),
             )
 
             logger.info(
@@ -2017,7 +2028,9 @@ class RoomShutdownHandler:
                     stream_id,
                 )
 
-                await self.room_member_handler.forget(target_requester.user, room_id)
+                await self.room_member_handler.forget(
+                    target_requester.user, room_id, do_not_schedule_purge=True
+                )
 
                 # Join users to new room
                 if new_room_user_id:
@@ -2033,24 +2046,26 @@ class RoomShutdownHandler:
                     )
 
                 shutdown_response["kicked_users"].append(user_id)
-                await self.store.upsert_room_to_purge(
+                await self.store.upsert_room_to_delete(
                     room_id,
                     delete_id,
+                    DeleteStatus.ACTION_SHUTDOWN,
                     DeleteStatus.STATUS_SHUTTING_DOWN,
-                    shutdown_params=json.dumps(shutdown_params),
-                    shutdown_response=json.dumps(shutdown_response),
+                    params=json.dumps(shutdown_params),
+                    response=json.dumps(shutdown_response),
                 )
             except Exception:
                 logger.exception(
                     "Failed to leave old room and join new room for %r", user_id
                 )
                 shutdown_response["failed_to_kick_users"].append(user_id)
-                await self.store.upsert_room_to_purge(
+                await self.store.upsert_room_to_delete(
                     room_id,
                     delete_id,
+                    DeleteStatus.ACTION_SHUTDOWN,
                     DeleteStatus.STATUS_SHUTTING_DOWN,
-                    shutdown_params=json.dumps(shutdown_params),
-                    shutdown_response=json.dumps(shutdown_response),
+                    params=json.dumps(shutdown_params),
+                    response=json.dumps(shutdown_response),
                 )
 
         # Send message in new room and move aliases
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 27f1e3a615..170c071b50 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -289,7 +289,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
         """
         raise NotImplementedError()
 
-    async def forget(self, user: UserID, room_id: str) -> None:
+    async def forget(
+        self, user: UserID, room_id: str, do_not_schedule_purge: bool = False
+    ) -> None:
         user_id = user.to_string()
 
         member = await self._storage_controllers.state.get_current_state_event(
@@ -311,14 +313,17 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
 
         # If everyone locally has left the room, then there is no reason for us to keep the
         # room around and we automatically purge room after a little bit
-        if self._purge_retention_period and await self.store.is_locally_forgotten_room(
-            room_id
+        if (
+            not do_not_schedule_purge
+            and self._purge_retention_period
+            and await self.store.is_locally_forgotten_room(room_id)
         ):
             delete_id = random_string(16)
-            await self.store.upsert_room_to_purge(
+            await self.store.upsert_room_to_delete(
                 room_id,
                 delete_id,
-                DeleteStatus.STATUS_SCHEDULED_PURGE,
+                DeleteStatus.ACTION_PURGE,
+                DeleteStatus.STATUS_SCHEDULED,
                 timestamp=self.clock.time_msec() + self._purge_retention_period,
             )
 
diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py
index fe8177ed4d..dbdcef7ae6 100644
--- a/synapse/rest/admin/__init__.py
+++ b/synapse/rest/admin/__init__.py
@@ -196,7 +196,7 @@ class PurgeHistoryRestServlet(RestServlet):
                 errcode=Codes.BAD_JSON,
             )
 
-        purge_id = self.pagination_handler.start_purge_history(
+        purge_id = await self.pagination_handler.start_purge_history(
             room_id, token, delete_local_events=delete_local_events
         )
 
@@ -215,10 +215,11 @@ class PurgeHistoryStatusRestServlet(RestServlet):
     ) -> Tuple[int, JsonDict]:
         await assert_requester_is_admin(self.auth, request)
 
-        purge_status = self.pagination_handler.get_purge_status(purge_id)
+        purge_status = await self.pagination_handler.get_delete_status(purge_id)
         if purge_status is None:
             raise NotFoundError("purge id '%s' not found" % purge_id)
 
+        # TODO active vs purging etc
         return HTTPStatus.OK, purge_status.asdict()
 
 
diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py
index 5e4b0b4759..ed54c8bb64 100644
--- a/synapse/rest/admin/rooms.py
+++ b/synapse/rest/admin/rooms.py
@@ -154,20 +154,14 @@ class DeleteRoomStatusByRoomIdRestServlet(RestServlet):
                 HTTPStatus.BAD_REQUEST, "%s is not a legal room ID" % (room_id,)
             )
 
-        delete_ids = self._pagination_handler.get_delete_ids_by_room(room_id)
-        if delete_ids is None:
-            delete_ids = []
+        delete_statuses = await self._pagination_handler.get_delete_statuses_by_room(
+            room_id
+        )
 
         response = []
-        for delete_id in delete_ids:
-            delete = self._pagination_handler.get_delete_status(delete_id)
-            if delete and delete.status != DeleteStatus.STATUS_SCHEDULED_PURGE:
-                response += [
-                    {
-                        "delete_id": delete_id,
-                        **delete.asdict(),
-                    }
-                ]
+        for delete_status in delete_statuses:
+            if delete_status.status != DeleteStatus.STATUS_SCHEDULED:
+                response += [delete_status.asdict()]
 
         if response:
             return HTTPStatus.OK, {"results": cast(JsonDict, response)}
@@ -189,7 +183,7 @@ class DeleteRoomStatusByDeleteIdRestServlet(RestServlet):
     ) -> Tuple[int, JsonDict]:
         await assert_requester_is_admin(self._auth, request)
 
-        delete_status = self._pagination_handler.get_delete_status(delete_id)
+        delete_status = await self._pagination_handler.get_delete_status(delete_id)
         if delete_status is None:
             raise NotFoundError("delete id '%s' not found" % delete_id)
 
diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index a4d806828f..2c0b573d4c 100644
--- a/synapse/storage/databases/main/roommember.py
+++ b/synapse/storage/databases/main/roommember.py
@@ -1284,47 +1284,48 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
         # If any rows still exist it means someone has not forgotten this room yet
         return not rows[0][0]
 
-    async def upsert_room_to_purge(
+    async def upsert_room_to_delete(
         self,
         room_id: str,
         delete_id: str,
+        action: str,
         status: str,
-        error: Optional[str] = None,
         timestamp: Optional[int] = None,
-        shutdown_params: Optional[str] = None,
-        shutdown_response: Optional[str] = None,
+        params: Optional[str] = None,
+        response: Optional[str] = None,
+        error: Optional[str] = None,
     ) -> None:
         """Insert or update a room to shutdown/purge.
 
         Args:
             room_id: The room ID to shutdown/purge
             delete_id: The delete ID identifying this action
+            action: the type of job, mainly `shutdown` `purge` or `purge_history`
             status: Current status of the delete. Cf `DeleteStatus` for possible values
-            error: Error message to return, if any
             timestamp: Time of the last update. If status is `wait_purge`,
                 then it specifies when to do the purge, with an empty value specifying ASAP
-            shutdown_params: JSON representation of shutdown parameters, cf `ShutdownRoomParams`
-            shutdown_response: JSON representation of shutdown current status, cf `ShutdownRoomResponse`
+            error: Error message to return, if any
+            params: JSON representation of delete job parameters
+            response: JSON representation of delete current status
         """
         await self.db_pool.simple_upsert(
-            "rooms_to_purge",
+            "rooms_to_delete",
             {
                 "room_id": room_id,
                 "delete_id": delete_id,
             },
             {
-                "room_id": room_id,
-                "delete_id": delete_id,
+                "action": action,
                 "status": status,
-                "error": error,
                 "timestamp": timestamp,
-                "shutdown_params": shutdown_params,
-                "shutdown_response": shutdown_response,
+                "params": params,
+                "response": response,
+                "error": error,
             },
-            desc="upsert_room_to_purge",
+            desc="upsert_room_to_delete",
         )
 
-    async def delete_room_to_purge(self, room_id: str, delete_id: str) -> None:
+    async def delete_room_to_delete(self, room_id: str, delete_id: str) -> None:
         """Remove a room from the list of rooms to purge.
 
         Args:
@@ -1333,32 +1334,61 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
         """
 
         await self.db_pool.simple_delete(
-            "rooms_to_purge",
+            "rooms_to_delete",
             keyvalues={
                 "room_id": room_id,
                 "delete_id": delete_id,
             },
-            desc="delete_room_to_purge",
+            desc="delete_room_to_delete",
         )
 
-    async def get_rooms_to_purge(self) -> List[Dict[str, Any]]:
-        """Returns all rooms to shutdown/purge. This includes those that have
-        been interrupted by a stop/restart of synapse, but also scheduled ones
+    async def get_rooms_to_delete(
+        self, room_id: Optional[str] = None
+    ) -> List[Dict[str, Any]]:
+        """Returns all delete jobs. This includes those that have been
+        interrupted by a stop/restart of synapse, but also scheduled ones
         like locally forgotten rooms.
+
+        Args:
+            room_id: if specified, will only return the delete jobs for a specific room
+
         """
+        keyvalues = {}
+        if room_id is not None:
+            keyvalues["room_id"] = room_id
+
         return await self.db_pool.simple_select_list(
-            table="rooms_to_purge",
-            keyvalues={},
+            table="rooms_to_delete",
+            keyvalues=keyvalues,
             retcols=(
                 "room_id",
                 "delete_id",
+                "action",
+                "status",
                 "timestamp",
+                "params",
+                "response",
+                "error",
+            ),
+            desc="rooms_to_delete_fetch",
+        )
+
+    async def get_room_to_delete(self, delete_id: str) -> Optional[Dict[str, Any]]:
+        """Return the delete job identified by delete_id."""
+        return await self.db_pool.simple_select_one(
+            table="rooms_to_delete",
+            keyvalues={"delete_id": delete_id},
+            retcols=(
+                "room_id",
+                "delete_id",
+                "action",
                 "status",
+                "timestamp",
+                "params",
+                "response",
                 "error",
-                "shutdown_params",
-                "shutdown_response",
             ),
-            desc="rooms_to_purge_fetch",
+            desc="rooms_to_delete_fetch",
         )
 
     async def get_rooms_user_has_been_in(self, user_id: str) -> Set[str]:
diff --git a/synapse/storage/schema/main/delta/78/03_rooms_to_purge.sql b/synapse/storage/schema/main/delta/78/03_rooms_to_delete.sql
index ef541e11f4..8f9c8c7010 100644
--- a/synapse/storage/schema/main/delta/78/03_rooms_to_purge.sql
+++ b/synapse/storage/schema/main/delta/78/03_rooms_to_delete.sql
@@ -13,14 +13,15 @@
  * limitations under the License.
  */
 
--- cf upsert_room_to_purge docstring for the meaning of the fields.
-CREATE TABLE IF NOT EXISTS rooms_to_purge(
+-- cf upsert_room_to_delete docstring for the meaning of the fields.
+CREATE TABLE IF NOT EXISTS rooms_to_delete(
     room_id text NOT NULL,
     delete_id text NOT NULL,
+    action text NOT NULL,
     status text NOT NULL,
-    error text,
     timestamp bigint,
-    shutdown_params text,
-    shutdown_response text,
+    params text,
+    response text,
+    error text,
     UNIQUE(room_id, delete_id)
 );