summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/15488.feature1
-rw-r--r--docs/usage/configuration/config_documentation.md19
-rw-r--r--synapse/config/server.py9
-rw-r--r--synapse/handlers/pagination.py491
-rw-r--r--synapse/handlers/room.py250
-rw-r--r--synapse/handlers/room_member.py24
-rw-r--r--synapse/rest/admin/__init__.py7
-rw-r--r--synapse/rest/admin/rooms.py67
-rw-r--r--synapse/storage/databases/main/roommember.py108
-rw-r--r--synapse/storage/schema/main/delta/78/03_rooms_to_delete.sql27
-rw-r--r--tests/rest/admin/test_room.py134
-rw-r--r--tests/rest/admin/test_server_notice.py21
-rw-r--r--tests/rest/client/test_rooms.py3
13 files changed, 810 insertions, 351 deletions
diff --git a/changelog.d/15488.feature b/changelog.d/15488.feature
new file mode 100644
index 0000000000..8684d84192
--- /dev/null
+++ b/changelog.d/15488.feature
@@ -0,0 +1 @@
+Add automatic purge after all users forgotten a room. Also add restore of purge/shutdown rooms after a synapse restart.
diff --git a/docs/usage/configuration/config_documentation.md b/docs/usage/configuration/config_documentation.md
index 22cd1772dc..69b4a6b035 100644
--- a/docs/usage/configuration/config_documentation.md
+++ b/docs/usage/configuration/config_documentation.md
@@ -923,7 +923,7 @@ allowed_avatar_mimetypes: ["image/png", "image/jpeg", "image/gif"]
 How long to keep redacted events in unredacted form in the database. After
 this period redacted events get replaced with their redacted form in the DB.
 
-Synapse will check whether the rentention period has concluded for redacted
+Synapse will check whether the retention period has concluded for redacted
 events every 5 minutes. Thus, even if this option is set to `0`, Synapse may
 still take up to 5 minutes to purge redacted events from the database.
 
@@ -934,6 +934,23 @@ Example configuration:
 redaction_retention_period: 28d
 ```
 ---
+---
+### `purge_retention_period`
+
+How long to keep locally forgotten room in the DB. After this period the room
+will be fully purged from the DB.
+
+Synapse will check whether the retention period has concluded for room
+purges every hour. Thus, even if this option is set to `0`, Synapse may
+still take up to one hour to purge forgotten rooms from the database.
+
+Defaults to `7d`. Set to `null` to disable.
+
+Example configuration:
+```yaml
+purge_retention_period: 28d
+```
+---
 ### `user_ips_max_age`
 
 How long to track users' last seen time and IPs in the database.
diff --git a/synapse/config/server.py b/synapse/config/server.py
index b46fa51593..cb93e56678 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -486,6 +486,15 @@ class ServerConfig(Config):
         else:
             self.redaction_retention_period = None
 
+        # How long to keep locally forgotten rooms before purging them.
+        purge_retention_period = config.get("purge_retention_period", "7d")
+        if purge_retention_period is not None:
+            self.purge_retention_period: Optional[int] = self.parse_duration(
+                purge_retention_period
+            )
+        else:
+            self.purge_retention_period = None
+
         # How long to keep entries in the `users_ips` table.
         user_ips_max_age = config.get("user_ips_max_age", "28d")
         if user_ips_max_age is not None:
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 19b8728db9..982b38cc43 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -12,10 +12,9 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import json
 import logging
-from typing import TYPE_CHECKING, Dict, List, Optional, Set
-
-import attr
+from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set
 
 from twisted.python.failure import Failure
 
@@ -23,12 +22,12 @@ from synapse.api.constants import Direction, EventTypes, Membership
 from synapse.api.errors import SynapseError
 from synapse.api.filtering import Filter
 from synapse.events.utils import SerializeEventConfig
-from synapse.handlers.room import ShutdownRoomResponse
+from synapse.handlers.room import DeleteStatus, ShutdownRoomParams, ShutdownRoomResponse
 from synapse.logging.opentracing import trace
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.rest.admin._base import assert_user_is_admin
 from synapse.streams.config import PaginationConfig
-from synapse.types import JsonDict, Requester, StrCollection, StreamKeyType
+from synapse.types import JsonDict, Requester, StreamKeyType
 from synapse.types.state import StateFilter
 from synapse.util.async_helpers import ReadWriteLock
 from synapse.util.stringutils import random_string
@@ -46,82 +45,6 @@ logger = logging.getLogger(__name__)
 BACKFILL_BECAUSE_TOO_MANY_GAPS_THRESHOLD = 3
 
 
-@attr.s(slots=True, auto_attribs=True)
-class PurgeStatus:
-    """Object tracking the status of a purge request
-
-    This class contains information on the progress of a purge request, for
-    return by get_purge_status.
-    """
-
-    STATUS_ACTIVE = 0
-    STATUS_COMPLETE = 1
-    STATUS_FAILED = 2
-
-    STATUS_TEXT = {
-        STATUS_ACTIVE: "active",
-        STATUS_COMPLETE: "complete",
-        STATUS_FAILED: "failed",
-    }
-
-    # Save the error message if an error occurs
-    error: str = ""
-
-    # Tracks whether this request has completed. One of STATUS_{ACTIVE,COMPLETE,FAILED}.
-    status: int = STATUS_ACTIVE
-
-    def asdict(self) -> JsonDict:
-        ret = {"status": PurgeStatus.STATUS_TEXT[self.status]}
-        if self.error:
-            ret["error"] = self.error
-        return ret
-
-
-@attr.s(slots=True, auto_attribs=True)
-class DeleteStatus:
-    """Object tracking the status of a delete room request
-
-    This class contains information on the progress of a delete room request, for
-    return by get_delete_status.
-    """
-
-    STATUS_PURGING = 0
-    STATUS_COMPLETE = 1
-    STATUS_FAILED = 2
-    STATUS_SHUTTING_DOWN = 3
-
-    STATUS_TEXT = {
-        STATUS_PURGING: "purging",
-        STATUS_COMPLETE: "complete",
-        STATUS_FAILED: "failed",
-        STATUS_SHUTTING_DOWN: "shutting_down",
-    }
-
-    # Tracks whether this request has completed.
-    # One of STATUS_{PURGING,COMPLETE,FAILED,SHUTTING_DOWN}.
-    status: int = STATUS_PURGING
-
-    # Save the error message if an error occurs
-    error: str = ""
-
-    # Saves the result of an action to give it back to REST API
-    shutdown_room: ShutdownRoomResponse = {
-        "kicked_users": [],
-        "failed_to_kick_users": [],
-        "local_aliases": [],
-        "new_room_id": None,
-    }
-
-    def asdict(self) -> JsonDict:
-        ret = {
-            "status": DeleteStatus.STATUS_TEXT[self.status],
-            "shutdown_room": self.shutdown_room,
-        }
-        if self.error:
-            ret["error"] = self.error
-        return ret
-
-
 class PaginationHandler:
     """Handles pagination and purge history requests.
 
@@ -132,6 +55,9 @@ class PaginationHandler:
     # when to remove a completed deletion/purge from the results map
     CLEAR_PURGE_AFTER_MS = 1000 * 3600 * 24  # 24 hours
 
+    # how often to run the purge rooms loop
+    PURGE_ROOMS_INTERVAL_MS = 1000 * 3600  # 1 hour
+
     def __init__(self, hs: "HomeServer"):
         self.hs = hs
         self.auth = hs.get_auth()
@@ -146,13 +72,6 @@ class PaginationHandler:
         self.pagination_lock = ReadWriteLock()
         # IDs of rooms in which there currently an active purge *or delete* operation.
         self._purges_in_progress_by_room: Set[str] = set()
-        # map from purge id to PurgeStatus
-        self._purges_by_id: Dict[str, PurgeStatus] = {}
-        # map from purge id to DeleteStatus
-        self._delete_by_id: Dict[str, DeleteStatus] = {}
-        # map from room id to delete ids
-        # Dict[`room_id`, List[`delete_id`]]
-        self._delete_by_room: Dict[str, List[str]] = {}
         self._event_serializer = hs.get_event_client_serializer()
 
         self._retention_default_max_lifetime = (
@@ -165,6 +84,7 @@ class PaginationHandler:
         self._retention_allowed_lifetime_max = (
             hs.config.retention.retention_allowed_lifetime_max
         )
+        self._purge_retention_period = hs.config.server.purge_retention_period
         self._is_master = hs.config.worker.worker_app is None
 
         if hs.config.retention.retention_enabled and self._is_master:
@@ -181,6 +101,96 @@ class PaginationHandler:
                     job.longest_max_lifetime,
                 )
 
+        if self._is_master:
+            self.clock.looping_call(
+                run_as_background_process,
+                PaginationHandler.PURGE_ROOMS_INTERVAL_MS,
+                "purge_rooms",
+                self.purge_rooms,
+            )
+
+    async def purge_rooms(self) -> None:
+        """This takes care of restoring unfinished purge/shutdown rooms from the DB.
+        It also takes care to launch scheduled ones, like rooms that has been fully
+        forgotten.
+
+        It should be run regularly.
+        """
+        rooms_to_delete = await self.store.get_rooms_to_delete()
+        for r in rooms_to_delete:
+            room_id = r["room_id"]
+            delete_id = r["delete_id"]
+            status = r["status"]
+            action = r["action"]
+            timestamp = r["timestamp"]
+
+            if (
+                status == DeleteStatus.STATUS_COMPLETE
+                or status == DeleteStatus.STATUS_FAILED
+            ):
+                # remove the delete from the list 24 hours after it completes or fails
+                ms_since_completed = self.clock.time_msec() - timestamp
+                if ms_since_completed >= PaginationHandler.CLEAR_PURGE_AFTER_MS:
+                    await self.store.delete_room_to_delete(room_id, delete_id)
+
+                continue
+
+            if room_id in self._purges_in_progress_by_room:
+                # a delete background task is already running (or has run)
+                # for this room id, let's ignore it for now
+                continue
+
+            # If the database says we were last in the middle of shutting down the room,
+            # let's continue the shutdown process.
+            shutdown_response = None
+            if (
+                action == DeleteStatus.ACTION_SHUTDOWN
+                and status == DeleteStatus.STATUS_SHUTTING_DOWN
+            ):
+                shutdown_params = json.loads(r["params"])
+                if r["response"]:
+                    shutdown_response = json.loads(r["response"])
+                await self._shutdown_and_purge_room(
+                    room_id,
+                    delete_id,
+                    shutdown_params=shutdown_params,
+                    shutdown_response=shutdown_response,
+                )
+                continue
+
+            # If the database says we were last in the middle of purging the room,
+            # let's continue the purge process.
+            if status == DeleteStatus.STATUS_PURGING:
+                purge_now = True
+            # Or if we're at or past the scheduled purge time, let's start that one as well
+            elif status == DeleteStatus.STATUS_SCHEDULED and (
+                timestamp is None or self.clock.time_msec() >= timestamp
+            ):
+                purge_now = True
+
+            # TODO 2 stages purge, keep memberships for a while so we don't "break" sync
+            if purge_now:
+                params = {}
+                if r["params"]:
+                    params = json.loads(r["params"])
+
+                if action == DeleteStatus.ACTION_PURGE_HISTORY:
+                    if "token" in params:
+                        await self._purge_history(
+                            delete_id,
+                            room_id,
+                            params["token"],
+                            params.get("delete_local_events", False),
+                            True,
+                        )
+                elif action == DeleteStatus.ACTION_PURGE:
+                    await self.purge_room(
+                        room_id,
+                        delete_id,
+                        params.get("force", False),
+                        shutdown_response=shutdown_response,
+                    )
+
     async def purge_history_for_rooms_in_range(
         self, min_ms: Optional[int], max_ms: Optional[int]
     ) -> None:
@@ -289,8 +299,6 @@ class PaginationHandler:
 
             purge_id = random_string(16)
 
-            self._purges_by_id[purge_id] = PurgeStatus()
-
             logger.info(
                 "Starting purging events in room %s (purge_id %s)" % (room_id, purge_id)
             )
@@ -305,9 +313,10 @@ class PaginationHandler:
                 room_id,
                 token,
                 True,
+                False,
             )
 
-    def start_purge_history(
+    async def start_purge_history(
         self, room_id: str, token: str, delete_local_events: bool = False
     ) -> str:
         """Start off a history purge on a room.
@@ -332,7 +341,16 @@ class PaginationHandler:
         # request id in the log lines.
         logger.info("[purge] starting purge_id %s", purge_id)
 
-        self._purges_by_id[purge_id] = PurgeStatus()
+        await self.store.upsert_room_to_delete(
+            room_id,
+            purge_id,
+            DeleteStatus.ACTION_PURGE_HISTORY,
+            DeleteStatus.STATUS_PURGING,
+            params=json.dumps(
+                {"token": token, "delete_local_events": delete_local_events}
+            ),
+        )
+
         run_as_background_process(
             "purge_history",
             self._purge_history,
@@ -340,11 +358,17 @@ class PaginationHandler:
             room_id,
             token,
             delete_local_events,
+            True,
         )
         return purge_id
 
     async def _purge_history(
-        self, purge_id: str, room_id: str, token: str, delete_local_events: bool
+        self,
+        purge_id: str,
+        room_id: str,
+        token: str,
+        delete_local_events: bool,
+        update_rooms_to_delete_table: bool,
     ) -> None:
         """Carry out a history purge on a room.
 
@@ -353,6 +377,10 @@ class PaginationHandler:
             room_id: The room to purge from
             token: topological token to delete events before
             delete_local_events: True to delete local events as well as remote ones
+            update_rooms_to_delete_table: True if we don't want to update/persist this
+                purge history action to the DB to be restorable. Used with the retention
+                functionality since we don't need to explicitly restore those, they
+                will be relaunch by the retention logic.
         """
         self._purges_in_progress_by_room.add(room_id)
         try:
@@ -361,66 +389,129 @@ class PaginationHandler:
                     room_id, token, delete_local_events
                 )
             logger.info("[purge] complete")
-            self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
+            if update_rooms_to_delete_table:
+                await self.store.upsert_room_to_delete(
+                    room_id,
+                    purge_id,
+                    DeleteStatus.ACTION_PURGE_HISTORY,
+                    DeleteStatus.STATUS_COMPLETE,
+                    timestamp=self.clock.time_msec(),
+                )
         except Exception:
             f = Failure()
             logger.error(
                 "[purge] failed", exc_info=(f.type, f.value, f.getTracebackObject())
             )
-            self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED
-            self._purges_by_id[purge_id].error = f.getErrorMessage()
+            if update_rooms_to_delete_table:
+                await self.store.upsert_room_to_delete(
+                    room_id,
+                    purge_id,
+                    DeleteStatus.ACTION_PURGE_HISTORY,
+                    DeleteStatus.STATUS_FAILED,
+                    error=f.getErrorMessage(),
+                    timestamp=self.clock.time_msec(),
+                )
         finally:
             self._purges_in_progress_by_room.discard(room_id)
 
-            # remove the purge from the list 24 hours after it completes
-            def clear_purge() -> None:
-                del self._purges_by_id[purge_id]
+            if update_rooms_to_delete_table:
+                # remove the purge from the list 24 hours after it completes
+                async def clear_purge() -> None:
+                    await self.store.delete_room_to_delete(room_id, purge_id)
 
-            self.hs.get_reactor().callLater(
-                PaginationHandler.CLEAR_PURGE_AFTER_MS / 1000, clear_purge
-            )
+                self.hs.get_reactor().callLater(
+                    PaginationHandler.CLEAR_PURGE_AFTER_MS / 1000, clear_purge
+                )
 
-    def get_purge_status(self, purge_id: str) -> Optional[PurgeStatus]:
-        """Get the current status of an active purge
+    @staticmethod
+    def _convert_to_delete_status(res: Dict[str, Any]) -> DeleteStatus:
+        status = DeleteStatus()
+        status.delete_id = res["delete_id"]
+        status.action = res["action"]
+        status.status = res["status"]
+        if "error" in res:
+            status.error = res["error"]
 
-        Args:
-            purge_id: purge_id returned by start_purge_history
-        """
-        return self._purges_by_id.get(purge_id)
+        if status.action == DeleteStatus.ACTION_SHUTDOWN and res["response"]:
+            status.shutdown_room = json.loads(res["response"])
+
+        return status
 
-    def get_delete_status(self, delete_id: str) -> Optional[DeleteStatus]:
+    async def get_delete_status(self, delete_id: str) -> Optional[DeleteStatus]:
         """Get the current status of an active deleting
 
         Args:
             delete_id: delete_id returned by start_shutdown_and_purge_room
+                or start_purge_history.
         """
-        return self._delete_by_id.get(delete_id)
+        res = await self.store.get_room_to_delete(delete_id)
+        if res:
+            return PaginationHandler._convert_to_delete_status(res)
+        return None
 
-    def get_delete_ids_by_room(self, room_id: str) -> Optional[StrCollection]:
-        """Get all active delete ids by room
+    async def get_delete_statuses_by_room(self, room_id: str) -> List[DeleteStatus]:
+        """Get all active delete statuses by room
 
         Args:
             room_id: room_id that is deleted
         """
-        return self._delete_by_room.get(room_id)
+        res = await self.store.get_rooms_to_delete(room_id)
+        return [PaginationHandler._convert_to_delete_status(r) for r in res]
 
-    async def purge_room(self, room_id: str, force: bool = False) -> None:
+    async def purge_room(
+        self,
+        room_id: str,
+        delete_id: str,
+        force: bool = False,
+        shutdown_response: Optional[ShutdownRoomResponse] = None,
+    ) -> None:
         """Purge the given room from the database.
-        This function is part the delete room v1 API.
 
         Args:
             room_id: room to be purged
+            delete_id: the delete ID for this purge
             force: set true to skip checking for joined users.
+            shutdown_response: optional response coming from the shutdown phase
         """
+        logger.info("starting purge room_id=%s force=%s", room_id, force)
+
+        action = DeleteStatus.ACTION_PURGE
+        if shutdown_response:
+            action = DeleteStatus.ACTION_SHUTDOWN
+
         async with self.pagination_lock.write(room_id):
             # first check that we have no users in this room
-            if not force:
-                joined = await self.store.is_host_joined(room_id, self._server_name)
-                if joined:
+            joined = await self.store.is_host_joined(room_id, self._server_name)
+            if joined:
+                if force:
+                    logger.info(
+                        "force-purging room %s with some local users still joined",
+                        room_id,
+                    )
+                else:
                     raise SynapseError(400, "Users are still joined to this room")
 
+            await self.store.upsert_room_to_delete(
+                room_id,
+                delete_id,
+                action,
+                DeleteStatus.STATUS_PURGING,
+                response=json.dumps(shutdown_response),
+            )
+
             await self._storage_controllers.purge_events.purge_room(room_id)
 
+            await self.store.upsert_room_to_delete(
+                room_id,
+                delete_id,
+                action,
+                DeleteStatus.STATUS_COMPLETE,
+                timestamp=self.clock.time_msec(),
+                response=json.dumps(shutdown_response),
+            )
+
+        logger.info("purge complete for room_id %s", room_id)
+
     @trace
     async def get_messages(
         self,
@@ -698,15 +789,10 @@ class PaginationHandler:
 
     async def _shutdown_and_purge_room(
         self,
-        delete_id: str,
         room_id: str,
-        requester_user_id: str,
-        new_room_user_id: Optional[str] = None,
-        new_room_name: Optional[str] = None,
-        message: Optional[str] = None,
-        block: bool = False,
-        purge: bool = True,
-        force_purge: bool = False,
+        delete_id: str,
+        shutdown_params: ShutdownRoomParams,
+        shutdown_response: Optional[ShutdownRoomResponse] = None,
     ) -> None:
         """
         Shuts down and purges a room.
@@ -716,142 +802,75 @@ class PaginationHandler:
         Args:
             delete_id: The ID for this delete.
             room_id: The ID of the room to shut down.
-            requester_user_id:
-                User who requested the action. Will be recorded as putting the room on the
-                blocking list.
-            new_room_user_id:
-                If set, a new room will be created with this user ID
-                as the creator and admin, and all users in the old room will be
-                moved into that room. If not set, no new room will be created
-                and the users will just be removed from the old room.
-            new_room_name:
-                A string representing the name of the room that new users will
-                be invited to. Defaults to `Content Violation Notification`
-            message:
-                A string containing the first message that will be sent as
-                `new_room_user_id` in the new room. Ideally this will clearly
-                convey why the original room was shut down.
-                Defaults to `Sharing illegal content on this server is not
-                permitted and rooms in violation will be blocked.`
-            block:
-                If set to `true`, this room will be added to a blocking list,
-                preventing future attempts to join the room. Defaults to `false`.
-            purge:
-                If set to `true`, purge the given room from the database.
-            force_purge:
-                If set to `true`, the room will be purged from database
-                also if it fails to remove some users from room.
-
-        Saves a `RoomShutdownHandler.ShutdownRoomResponse` in `DeleteStatus`:
+            shutdown_params: parameters for the shutdown, cf `RoomShutdownHandler.ShutdownRoomParams`
+            shutdown_response: current status of the shutdown, if it was interrupted
+
+        Keeps track of the `DeleteStatus` (and `ShutdownRoomResponse`) in `self._delete_by_id` and persisted in DB
         """
 
         self._purges_in_progress_by_room.add(room_id)
         try:
-            async with self.pagination_lock.write(room_id):
-                self._delete_by_id[delete_id].status = DeleteStatus.STATUS_SHUTTING_DOWN
-                self._delete_by_id[
-                    delete_id
-                ].shutdown_room = await self._room_shutdown_handler.shutdown_room(
-                    room_id=room_id,
-                    requester_user_id=requester_user_id,
-                    new_room_user_id=new_room_user_id,
-                    new_room_name=new_room_name,
-                    message=message,
-                    block=block,
-                )
-                self._delete_by_id[delete_id].status = DeleteStatus.STATUS_PURGING
-
-                if purge:
-                    logger.info("starting purge room_id %s", room_id)
-
-                    # first check that we have no users in this room
-                    if not force_purge:
-                        joined = await self.store.is_host_joined(
-                            room_id, self._server_name
-                        )
-                        if joined:
-                            raise SynapseError(
-                                400, "Users are still joined to this room"
-                            )
+            shutdown_response = await self._room_shutdown_handler.shutdown_room(
+                room_id=room_id,
+                delete_id=delete_id,
+                shutdown_params=shutdown_params,
+                shutdown_response=shutdown_response,
+            )
 
-                    await self._storage_controllers.purge_events.purge_room(room_id)
+            if shutdown_params["purge"]:
+                await self.purge_room(
+                    room_id,
+                    delete_id,
+                    shutdown_params["force_purge"],
+                    shutdown_response=shutdown_response,
+                )
 
-            logger.info("purge complete for room_id %s", room_id)
-            self._delete_by_id[delete_id].status = DeleteStatus.STATUS_COMPLETE
+            await self.store.upsert_room_to_delete(
+                room_id,
+                delete_id,
+                DeleteStatus.ACTION_SHUTDOWN,
+                DeleteStatus.STATUS_COMPLETE,
+                timestamp=self.clock.time_msec(),
+                response=json.dumps(shutdown_response),
+            )
         except Exception:
             f = Failure()
             logger.error(
                 "failed",
                 exc_info=(f.type, f.value, f.getTracebackObject()),
             )
-            self._delete_by_id[delete_id].status = DeleteStatus.STATUS_FAILED
-            self._delete_by_id[delete_id].error = f.getErrorMessage()
+            await self.store.upsert_room_to_delete(
+                room_id,
+                delete_id,
+                DeleteStatus.ACTION_SHUTDOWN,
+                DeleteStatus.STATUS_FAILED,
+                timestamp=self.clock.time_msec(),
+                error=f.getErrorMessage(),
+            )
         finally:
             self._purges_in_progress_by_room.discard(room_id)
 
-            # remove the delete from the list 24 hours after it completes
-            def clear_delete() -> None:
-                del self._delete_by_id[delete_id]
-                self._delete_by_room[room_id].remove(delete_id)
-                if not self._delete_by_room[room_id]:
-                    del self._delete_by_room[room_id]
-
-            self.hs.get_reactor().callLater(
-                PaginationHandler.CLEAR_PURGE_AFTER_MS / 1000, clear_delete
-            )
-
     def start_shutdown_and_purge_room(
         self,
         room_id: str,
-        requester_user_id: str,
-        new_room_user_id: Optional[str] = None,
-        new_room_name: Optional[str] = None,
-        message: Optional[str] = None,
-        block: bool = False,
-        purge: bool = True,
-        force_purge: bool = False,
+        shutdown_params: ShutdownRoomParams,
     ) -> str:
         """Start off shut down and purge on a room.
 
         Args:
             room_id: The ID of the room to shut down.
-            requester_user_id:
-                User who requested the action and put the room on the
-                blocking list.
-            new_room_user_id:
-                If set, a new room will be created with this user ID
-                as the creator and admin, and all users in the old room will be
-                moved into that room. If not set, no new room will be created
-                and the users will just be removed from the old room.
-            new_room_name:
-                A string representing the name of the room that new users will
-                be invited to. Defaults to `Content Violation Notification`
-            message:
-                A string containing the first message that will be sent as
-                `new_room_user_id` in the new room. Ideally this will clearly
-                convey why the original room was shut down.
-                Defaults to `Sharing illegal content on this server is not
-                permitted and rooms in violation will be blocked.`
-            block:
-                If set to `true`, this room will be added to a blocking list,
-                preventing future attempts to join the room. Defaults to `false`.
-            purge:
-                If set to `true`, purge the given room from the database.
-            force_purge:
-                If set to `true`, the room will be purged from database
-                also if it fails to remove some users from room.
+            shutdown_params: parameters for the shutdown, cf `RoomShutdownHandler.ShutdownRoomParams`
 
         Returns:
             unique ID for this delete transaction.
         """
         if room_id in self._purges_in_progress_by_room:
-            raise SynapseError(
-                400, "History purge already in progress for %s" % (room_id,)
-            )
+            raise SynapseError(400, "Purge already in progress for %s" % (room_id,))
 
         # This check is double to `RoomShutdownHandler.shutdown_room`
         # But here the requester get a direct response / error with HTTP request
         # and do not have to check the purge status
+        new_room_user_id = shutdown_params["new_room_user_id"]
         if new_room_user_id is not None:
             if not self.hs.is_mine_id(new_room_user_id):
                 raise SynapseError(
@@ -868,19 +887,11 @@ class PaginationHandler:
             delete_id,
         )
 
-        self._delete_by_id[delete_id] = DeleteStatus()
-        self._delete_by_room.setdefault(room_id, []).append(delete_id)
         run_as_background_process(
             "shutdown_and_purge_room",
             self._shutdown_and_purge_room,
-            delete_id,
             room_id,
-            requester_user_id,
-            new_room_user_id,
-            new_room_name,
-            message,
-            block,
-            purge,
-            force_purge,
+            delete_id,
+            shutdown_params,
         )
         return delete_id
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index bf907b7881..f7f9d9d2f5 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -14,6 +14,7 @@
 
 """Contains functions for performing actions on rooms."""
 import itertools
+import json
 import logging
 import math
 import random
@@ -54,7 +55,6 @@ from synapse.events import EventBase
 from synapse.events.snapshot import UnpersistedEventContext
 from synapse.events.utils import copy_and_fixup_power_levels_contents
 from synapse.handlers.relations import BundledAggregations
-from synapse.module_api import NOT_SPAM
 from synapse.rest.admin._base import assert_user_is_admin
 from synapse.streams import EventSource
 from synapse.types import (
@@ -454,7 +454,7 @@ class RoomCreationHandler:
         spam_check = await self._spam_checker_module_callbacks.user_may_create_room(
             user_id
         )
-        if spam_check != NOT_SPAM:
+        if spam_check != self._spam_checker_module_callbacks.NOT_SPAM:
             raise SynapseError(
                 403,
                 "You are not permitted to create rooms",
@@ -768,7 +768,7 @@ class RoomCreationHandler:
             spam_check = await self._spam_checker_module_callbacks.user_may_create_room(
                 user_id
             )
-            if spam_check != NOT_SPAM:
+            if spam_check != self._spam_checker_module_callbacks.NOT_SPAM:
                 raise SynapseError(
                     403,
                     "You are not permitted to create rooms",
@@ -1750,6 +1750,45 @@ class RoomEventSource(EventSource[RoomStreamToken, EventBase]):
         return self.store.get_current_room_stream_token_for_room_id(room_id)
 
 
+class ShutdownRoomParams(TypedDict):
+    """
+    Attributes:
+        requester_user_id:
+            User who requested the action. Will be recorded as putting the room on the
+            blocking list.
+        new_room_user_id:
+            If set, a new room will be created with this user ID
+            as the creator and admin, and all users in the old room will be
+            moved into that room. If not set, no new room will be created
+            and the users will just be removed from the old room.
+        new_room_name:
+            A string representing the name of the room that new users will
+            be invited to. Defaults to `Content Violation Notification`
+        message:
+            A string containing the first message that will be sent as
+            `new_room_user_id` in the new room. Ideally this will clearly
+            convey why the original room was shut down.
+            Defaults to `Sharing illegal content on this server is not
+            permitted and rooms in violation will be blocked.`
+        block:
+            If set to `true`, this room will be added to a blocking list,
+            preventing future attempts to join the room. Defaults to `false`.
+        purge:
+            If set to `true`, purge the given room from the database.
+        force_purge:
+            If set to `true`, the room will be purged from database
+            even if there are still users joined to the room.
+    """
+
+    requester_user_id: str
+    new_room_user_id: Optional[str]
+    new_room_name: Optional[str]
+    message: Optional[str]
+    block: bool
+    purge: bool
+    force_purge: bool
+
+
 class ShutdownRoomResponse(TypedDict):
     """
     Attributes:
@@ -1768,6 +1807,63 @@ class ShutdownRoomResponse(TypedDict):
     new_room_id: Optional[str]
 
 
+@attr.s(slots=True, auto_attribs=True)
+class DeleteStatus:
+    """Object tracking the status of a delete room request
+
+    This class contains information on the progress of a delete room request, for
+    return by get_delete_status.
+    """
+
+    ACTION_SHUTDOWN = "shutdown"
+    ACTION_PURGE = "purge"
+    ACTION_PURGE_HISTORY = "purge_history"
+
+    # Scheduled delete waiting to be launch at a specific time
+    STATUS_SCHEDULED = "scheduled"
+    STATUS_SHUTTING_DOWN = "shutting_down"
+    STATUS_PURGING = "purging"
+    STATUS_COMPLETE = "complete"
+    STATUS_FAILED = "failed"
+
+    delete_id: str = ""
+
+    action: str = ACTION_PURGE
+
+    # Tracks whether this request has completed.
+    # One of STATUS_{PURGING,COMPLETE,FAILED,SHUTTING_DOWN,WAIT_PURGE}.
+    status: str = STATUS_PURGING
+
+    # Save the error message if an error occurs
+    error: str = ""
+
+    # Saves the result of an action to give it back to REST API
+    shutdown_room: ShutdownRoomResponse = {
+        "kicked_users": [],
+        "failed_to_kick_users": [],
+        "local_aliases": [],
+        "new_room_id": None,
+    }
+
+    def asdict(self, use_purge_history_format: bool = False) -> JsonDict:
+        if not use_purge_history_format:
+            ret = {
+                "delete_id": self.delete_id,
+                "status": self.status,
+                "shutdown_room": self.shutdown_room,
+            }
+        else:
+            ret = {
+                "status": self.status
+                if self.status == DeleteStatus.STATUS_COMPLETE
+                or self.status == DeleteStatus.STATUS_FAILED
+                else "active",
+            }
+        if self.error:
+            ret["error"] = self.error
+        return ret
+
+
 class RoomShutdownHandler:
     DEFAULT_MESSAGE = (
         "Sharing illegal content on this server is not permitted and rooms in"
@@ -1787,11 +1883,9 @@ class RoomShutdownHandler:
     async def shutdown_room(
         self,
         room_id: str,
-        requester_user_id: str,
-        new_room_user_id: Optional[str] = None,
-        new_room_name: Optional[str] = None,
-        message: Optional[str] = None,
-        block: bool = False,
+        delete_id: str,
+        shutdown_params: ShutdownRoomParams,
+        shutdown_response: Optional[ShutdownRoomResponse] = None,
     ) -> ShutdownRoomResponse:
         """
         Shuts down a room. Moves all local users and room aliases automatically
@@ -1808,48 +1902,27 @@ class RoomShutdownHandler:
 
         Args:
             room_id: The ID of the room to shut down.
-            requester_user_id:
-                User who requested the action and put the room on the
-                blocking list.
-            new_room_user_id:
-                If set, a new room will be created with this user ID
-                as the creator and admin, and all users in the old room will be
-                moved into that room. If not set, no new room will be created
-                and the users will just be removed from the old room.
-            new_room_name:
-                A string representing the name of the room that new users will
-                be invited to. Defaults to `Content Violation Notification`
-            message:
-                A string containing the first message that will be sent as
-                `new_room_user_id` in the new room. Ideally this will clearly
-                convey why the original room was shut down.
-                Defaults to `Sharing illegal content on this server is not
-                permitted and rooms in violation will be blocked.`
-            block:
-                If set to `True`, users will be prevented from joining the old
-                room. This option can also be used to pre-emptively block a room,
-                even if it's unknown to this homeserver. In this case, the room
-                will be blocked, and no further action will be taken. If `False`,
-                attempting to delete an unknown room is invalid.
-
-                Defaults to `False`.
-
-        Returns: a dict containing the following keys:
-            kicked_users: An array of users (`user_id`) that were kicked.
-            failed_to_kick_users:
-                An array of users (`user_id`) that that were not kicked.
-            local_aliases:
-                An array of strings representing the local aliases that were
-                migrated from the old room to the new.
-            new_room_id:
-                A string representing the room ID of the new room, or None if
-                no such room was created.
+            delete_id: The delete ID identifying this delete request
+            shutdown_params: parameters for the shutdown, cf `ShutdownRoomParams`
+            shutdown_response: current status of the shutdown, if it was interrupted
+
+        Returns: a dict matching `ShutdownRoomResponse`.
         """
 
-        if not new_room_name:
-            new_room_name = self.DEFAULT_ROOM_NAME
-        if not message:
-            message = self.DEFAULT_MESSAGE
+        requester_user_id = shutdown_params["requester_user_id"]
+        new_room_user_id = shutdown_params["new_room_user_id"]
+        block = shutdown_params["block"]
+
+        new_room_name = (
+            shutdown_params["new_room_name"]
+            if shutdown_params["new_room_name"]
+            else self.DEFAULT_ROOM_NAME
+        )
+        message = (
+            shutdown_params["message"]
+            if shutdown_params["message"]
+            else self.DEFAULT_MESSAGE
+        )
 
         if not RoomID.is_valid(room_id):
             raise SynapseError(400, "%s is not a legal room ID" % (room_id,))
@@ -1861,6 +1934,23 @@ class RoomShutdownHandler:
                 403, "Shutdown of this room is forbidden", Codes.FORBIDDEN
             )
 
+        if not shutdown_response:
+            shutdown_response = {
+                "kicked_users": [],
+                "failed_to_kick_users": [],
+                "local_aliases": [],
+                "new_room_id": None,
+            }
+
+        await self.store.upsert_room_to_delete(
+            room_id,
+            delete_id,
+            DeleteStatus.ACTION_SHUTDOWN,
+            DeleteStatus.STATUS_SHUTTING_DOWN,
+            params=json.dumps(shutdown_params),
+            response=json.dumps(shutdown_response),
+        )
+
         # Action the block first (even if the room doesn't exist yet)
         if block:
             # This will work even if the room is already blocked, but that is
@@ -1869,14 +1959,10 @@ class RoomShutdownHandler:
 
         if not await self.store.get_room(room_id):
             # if we don't know about the room, there is nothing left to do.
-            return {
-                "kicked_users": [],
-                "failed_to_kick_users": [],
-                "local_aliases": [],
-                "new_room_id": None,
-            }
+            return shutdown_response
 
-        if new_room_user_id is not None:
+        new_room_id = shutdown_response.get("new_room_id")
+        if new_room_user_id is not None and new_room_id is None:
             if not self.hs.is_mine_id(new_room_user_id):
                 raise SynapseError(
                     400, "User must be our own: %s" % (new_room_user_id,)
@@ -1896,6 +1982,16 @@ class RoomShutdownHandler:
                 ratelimit=False,
             )
 
+            shutdown_response["new_room_id"] = new_room_id
+            await self.store.upsert_room_to_delete(
+                room_id,
+                delete_id,
+                DeleteStatus.ACTION_SHUTDOWN,
+                DeleteStatus.STATUS_SHUTTING_DOWN,
+                params=json.dumps(shutdown_params),
+                response=json.dumps(shutdown_response),
+            )
+
             logger.info(
                 "Shutting down room %r, joining to new room: %r", room_id, new_room_id
             )
@@ -1909,12 +2005,9 @@ class RoomShutdownHandler:
                 stream_id,
             )
         else:
-            new_room_id = None
             logger.info("Shutting down room %r", room_id)
 
         users = await self.store.get_users_in_room(room_id)
-        kicked_users = []
-        failed_to_kick_users = []
         for user_id in users:
             if not self.hs.is_mine_id(user_id):
                 continue
@@ -1943,7 +2036,9 @@ class RoomShutdownHandler:
                     stream_id,
                 )
 
-                await self.room_member_handler.forget(target_requester.user, room_id)
+                await self.room_member_handler.forget(
+                    target_requester.user, room_id, do_not_schedule_purge=True
+                )
 
                 # Join users to new room
                 if new_room_user_id:
@@ -1958,15 +2053,35 @@ class RoomShutdownHandler:
                         require_consent=False,
                     )
 
-                kicked_users.append(user_id)
+                shutdown_response["kicked_users"].append(user_id)
+                await self.store.upsert_room_to_delete(
+                    room_id,
+                    delete_id,
+                    DeleteStatus.ACTION_SHUTDOWN,
+                    DeleteStatus.STATUS_SHUTTING_DOWN,
+                    params=json.dumps(shutdown_params),
+                    response=json.dumps(shutdown_response),
+                )
             except Exception:
                 logger.exception(
                     "Failed to leave old room and join new room for %r", user_id
                 )
-                failed_to_kick_users.append(user_id)
+                shutdown_response["failed_to_kick_users"].append(user_id)
+                await self.store.upsert_room_to_delete(
+                    room_id,
+                    delete_id,
+                    DeleteStatus.ACTION_SHUTDOWN,
+                    DeleteStatus.STATUS_SHUTTING_DOWN,
+                    params=json.dumps(shutdown_params),
+                    response=json.dumps(shutdown_response),
+                )
 
         # Send message in new room and move aliases
         if new_room_user_id:
+            room_creator_requester = create_requester(
+                new_room_user_id, authenticated_entity=requester_user_id
+            )
+
             await self.event_creation_handler.create_and_send_nonmember_event(
                 room_creator_requester,
                 {
@@ -1978,18 +2093,15 @@ class RoomShutdownHandler:
                 ratelimit=False,
             )
 
-            aliases_for_room = await self.store.get_aliases_for_room(room_id)
+            shutdown_response["local_aliases"] = list(
+                await self.store.get_aliases_for_room(room_id)
+            )
 
             assert new_room_id is not None
             await self.store.update_aliases_for_room(
                 room_id, new_room_id, requester_user_id
             )
         else:
-            aliases_for_room = []
+            shutdown_response["local_aliases"] = []
 
-        return {
-            "kicked_users": kicked_users,
-            "failed_to_kick_users": failed_to_kick_users,
-            "local_aliases": list(aliases_for_room),
-            "new_room_id": new_room_id,
-        }
+        return shutdown_response
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 82e4fa7363..e3147924f7 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -38,6 +38,7 @@ from synapse.event_auth import get_named_level, get_power_level_event
 from synapse.events import EventBase
 from synapse.events.snapshot import EventContext
 from synapse.handlers.profile import MAX_AVATAR_URL_LEN, MAX_DISPLAYNAME_LEN
+from synapse.handlers.room import DeleteStatus
 from synapse.handlers.state_deltas import MatchChange, StateDeltasHandler
 from synapse.logging import opentracing
 from synapse.metrics import event_processing_positions
@@ -56,6 +57,7 @@ from synapse.types import (
 from synapse.types.state import StateFilter
 from synapse.util.async_helpers import Linearizer
 from synapse.util.distributor import user_left_room
+from synapse.util.stringutils import random_string
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
@@ -176,6 +178,8 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
 
         self._msc3970_enabled = hs.config.experimental.msc3970_enabled
 
+        self._purge_retention_period = hs.config.server.purge_retention_period
+
     def _on_user_joined_room(self, event_id: str, room_id: str) -> None:
         """Notify the rate limiter that a room join has occurred.
 
@@ -285,7 +289,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
         """
         raise NotImplementedError()
 
-    async def forget(self, user: UserID, room_id: str) -> None:
+    async def forget(
+        self, user: UserID, room_id: str, do_not_schedule_purge: bool = False
+    ) -> None:
         user_id = user.to_string()
 
         member = await self._storage_controllers.state.get_current_state_event(
@@ -305,6 +311,22 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
         # the table `current_state_events` and `get_current_state_events` is `None`.
         await self.store.forget(user_id, room_id)
 
+        # If everyone locally has left the room, then there is no reason for us to keep the
+        # room around and we automatically purge room after a little bit
+        if (
+            not do_not_schedule_purge
+            and self._purge_retention_period
+            and await self.store.is_locally_forgotten_room(room_id)
+        ):
+            delete_id = random_string(16)
+            await self.store.upsert_room_to_delete(
+                room_id,
+                delete_id,
+                DeleteStatus.ACTION_PURGE,
+                DeleteStatus.STATUS_SCHEDULED,
+                timestamp=self.clock.time_msec() + self._purge_retention_period,
+            )
+
     async def ratelimit_multiple_invites(
         self,
         requester: Optional[Requester],
diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py
index fe8177ed4d..0cabdd1dc6 100644
--- a/synapse/rest/admin/__init__.py
+++ b/synapse/rest/admin/__init__.py
@@ -196,7 +196,7 @@ class PurgeHistoryRestServlet(RestServlet):
                 errcode=Codes.BAD_JSON,
             )
 
-        purge_id = self.pagination_handler.start_purge_history(
+        purge_id = await self.pagination_handler.start_purge_history(
             room_id, token, delete_local_events=delete_local_events
         )
 
@@ -215,11 +215,12 @@ class PurgeHistoryStatusRestServlet(RestServlet):
     ) -> Tuple[int, JsonDict]:
         await assert_requester_is_admin(self.auth, request)
 
-        purge_status = self.pagination_handler.get_purge_status(purge_id)
+        purge_status = await self.pagination_handler.get_delete_status(purge_id)
         if purge_status is None:
             raise NotFoundError("purge id '%s' not found" % purge_id)
 
-        return HTTPStatus.OK, purge_status.asdict()
+        # TODO active vs purging etc
+        return HTTPStatus.OK, purge_status.asdict(use_purge_history_format=True)
 
 
 ########################################################################################
diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py
index 1d65560265..9e31d018b1 100644
--- a/synapse/rest/admin/rooms.py
+++ b/synapse/rest/admin/rooms.py
@@ -19,6 +19,7 @@ from urllib import parse as urlparse
 from synapse.api.constants import Direction, EventTypes, JoinRules, Membership
 from synapse.api.errors import AuthError, Codes, NotFoundError, SynapseError
 from synapse.api.filtering import Filter
+from synapse.handlers.room import DeleteStatus
 from synapse.http.servlet import (
     ResolveRoomIdMixin,
     RestServlet,
@@ -39,6 +40,7 @@ from synapse.streams.config import PaginationConfig
 from synapse.types import JsonDict, RoomID, UserID, create_requester
 from synapse.types.state import StateFilter
 from synapse.util import json_decoder
+from synapse.util.stringutils import random_string
 
 if TYPE_CHECKING:
     from synapse.api.auth import Auth
@@ -119,13 +121,15 @@ class RoomRestV2Servlet(RestServlet):
 
         delete_id = self._pagination_handler.start_shutdown_and_purge_room(
             room_id=room_id,
-            new_room_user_id=content.get("new_room_user_id"),
-            new_room_name=content.get("room_name"),
-            message=content.get("message"),
-            requester_user_id=requester.user.to_string(),
-            block=block,
-            purge=purge,
-            force_purge=force_purge,
+            shutdown_params={
+                "new_room_user_id": content.get("new_room_user_id"),
+                "new_room_name": content.get("room_name"),
+                "message": content.get("message"),
+                "requester_user_id": requester.user.to_string(),
+                "block": block,
+                "purge": purge,
+                "force_purge": force_purge,
+            },
         )
 
         return HTTPStatus.OK, {"delete_id": delete_id}
@@ -150,21 +154,21 @@ class DeleteRoomStatusByRoomIdRestServlet(RestServlet):
                 HTTPStatus.BAD_REQUEST, "%s is not a legal room ID" % (room_id,)
             )
 
-        delete_ids = self._pagination_handler.get_delete_ids_by_room(room_id)
-        if delete_ids is None:
-            raise NotFoundError("No delete task for room_id '%s' found" % room_id)
+        delete_statuses = await self._pagination_handler.get_delete_statuses_by_room(
+            room_id
+        )
 
         response = []
-        for delete_id in delete_ids:
-            delete = self._pagination_handler.get_delete_status(delete_id)
-            if delete:
-                response += [
-                    {
-                        "delete_id": delete_id,
-                        **delete.asdict(),
-                    }
-                ]
-        return HTTPStatus.OK, {"results": cast(JsonDict, response)}
+        for delete_status in delete_statuses:
+            # We ignore scheduled deletes because currently they are only used
+            # for automatically purging forgotten room after X time.
+            if delete_status.status != DeleteStatus.STATUS_SCHEDULED:
+                response += [delete_status.asdict()]
+
+        if response:
+            return HTTPStatus.OK, {"results": cast(JsonDict, response)}
+        else:
+            raise NotFoundError("No delete task for room_id '%s' found" % room_id)
 
 
 class DeleteRoomStatusByDeleteIdRestServlet(RestServlet):
@@ -181,7 +185,7 @@ class DeleteRoomStatusByDeleteIdRestServlet(RestServlet):
     ) -> Tuple[int, JsonDict]:
         await assert_requester_is_admin(self._auth, request)
 
-        delete_status = self._pagination_handler.get_delete_status(delete_id)
+        delete_status = await self._pagination_handler.get_delete_status(delete_id)
         if delete_status is None:
             raise NotFoundError("delete id '%s' not found" % delete_id)
 
@@ -347,19 +351,28 @@ class RoomRestServlet(RestServlet):
                 Codes.BAD_JSON,
             )
 
+        delete_id = random_string(16)
+
         ret = await room_shutdown_handler.shutdown_room(
             room_id=room_id,
-            new_room_user_id=content.get("new_room_user_id"),
-            new_room_name=content.get("room_name"),
-            message=content.get("message"),
-            requester_user_id=requester.user.to_string(),
-            block=block,
+            delete_id=delete_id,
+            shutdown_params={
+                "new_room_user_id": content.get("new_room_user_id"),
+                "new_room_name": content.get("room_name"),
+                "message": content.get("message"),
+                "requester_user_id": requester.user.to_string(),
+                "block": block,
+                "purge": purge,
+                "force_purge": force_purge,
+            },
         )
 
         # Purge room
         if purge:
             try:
-                await pagination_handler.purge_room(room_id, force=force_purge)
+                await pagination_handler.purge_room(
+                    room_id, delete_id, force=force_purge
+                )
             except NotFoundError:
                 if block:
                     # We can block unknown rooms with this endpoint, in which case
diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index 582875c91a..2c0b573d4c 100644
--- a/synapse/storage/databases/main/roommember.py
+++ b/synapse/storage/databases/main/roommember.py
@@ -17,6 +17,7 @@ from itertools import chain
 from typing import (
     TYPE_CHECKING,
     AbstractSet,
+    Any,
     Collection,
     Dict,
     FrozenSet,
@@ -1283,6 +1284,113 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
         # If any rows still exist it means someone has not forgotten this room yet
         return not rows[0][0]
 
+    async def upsert_room_to_delete(
+        self,
+        room_id: str,
+        delete_id: str,
+        action: str,
+        status: str,
+        timestamp: Optional[int] = None,
+        params: Optional[str] = None,
+        response: Optional[str] = None,
+        error: Optional[str] = None,
+    ) -> None:
+        """Insert or update a room to shutdown/purge.
+
+        Args:
+            room_id: The room ID to shutdown/purge
+            delete_id: The delete ID identifying this action
+            action: the type of job, mainly `shutdown` `purge` or `purge_history`
+            status: Current status of the delete. Cf `DeleteStatus` for possible values
+            timestamp: Time of the last update. If status is `wait_purge`,
+                then it specifies when to do the purge, with an empty value specifying ASAP
+            error: Error message to return, if any
+            params: JSON representation of delete job parameters
+            response: JSON representation of delete current status
+        """
+        await self.db_pool.simple_upsert(
+            "rooms_to_delete",
+            {
+                "room_id": room_id,
+                "delete_id": delete_id,
+            },
+            {
+                "action": action,
+                "status": status,
+                "timestamp": timestamp,
+                "params": params,
+                "response": response,
+                "error": error,
+            },
+            desc="upsert_room_to_delete",
+        )
+
+    async def delete_room_to_delete(self, room_id: str, delete_id: str) -> None:
+        """Remove a room from the list of rooms to purge.
+
+        Args:
+            room_id: The room ID matching the delete to remove
+            delete_id: The delete ID identifying the delete to remove
+        """
+
+        await self.db_pool.simple_delete(
+            "rooms_to_delete",
+            keyvalues={
+                "room_id": room_id,
+                "delete_id": delete_id,
+            },
+            desc="delete_room_to_delete",
+        )
+
+    async def get_rooms_to_delete(
+        self, room_id: Optional[str] = None
+    ) -> List[Dict[str, Any]]:
+        """Returns all delete jobs. This includes those that have been
+        interrupted by a stop/restart of synapse, but also scheduled ones
+        like locally forgotten rooms.
+
+        Args:
+            room_id: if specified, will only return the delete jobs for a specific room
+
+        """
+        keyvalues = {}
+        if room_id is not None:
+            keyvalues["room_id"] = room_id
+
+        return await self.db_pool.simple_select_list(
+            table="rooms_to_delete",
+            keyvalues=keyvalues,
+            retcols=(
+                "room_id",
+                "delete_id",
+                "action",
+                "status",
+                "timestamp",
+                "params",
+                "response",
+                "error",
+            ),
+            desc="rooms_to_delete_fetch",
+        )
+
+    async def get_room_to_delete(self, delete_id: str) -> Optional[Dict[str, Any]]:
+        """Return the delete job identified by delete_id."""
+        return await self.db_pool.simple_select_one(
+            table="rooms_to_delete",
+            keyvalues={"delete_id": delete_id},
+            retcols=(
+                "room_id",
+                "delete_id",
+                "action",
+                "status",
+                "timestamp",
+                "params",
+                "response",
+                "error",
+            ),
+            desc="rooms_to_delete_fetch",
+        )
+
     async def get_rooms_user_has_been_in(self, user_id: str) -> Set[str]:
         """Get all rooms that the user has ever been in.
 
diff --git a/synapse/storage/schema/main/delta/78/03_rooms_to_delete.sql b/synapse/storage/schema/main/delta/78/03_rooms_to_delete.sql
new file mode 100644
index 0000000000..8f9c8c7010
--- /dev/null
+++ b/synapse/storage/schema/main/delta/78/03_rooms_to_delete.sql
@@ -0,0 +1,27 @@
+/* Copyright 2023 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- cf upsert_room_to_delete docstring for the meaning of the fields.
+CREATE TABLE IF NOT EXISTS rooms_to_delete(
+    room_id text NOT NULL,
+    delete_id text NOT NULL,
+    action text NOT NULL,
+    status text NOT NULL,
+    timestamp bigint,
+    params text,
+    response text,
+    error text,
+    UNIQUE(room_id, delete_id)
+);
diff --git a/tests/rest/admin/test_room.py b/tests/rest/admin/test_room.py
index eb50086c50..ba8afbc2b9 100644
--- a/tests/rest/admin/test_room.py
+++ b/tests/rest/admin/test_room.py
@@ -24,9 +24,10 @@ from twisted.test.proto_helpers import MemoryReactor
 import synapse.rest.admin
 from synapse.api.constants import EventTypes, Membership, RoomTypes
 from synapse.api.errors import Codes
-from synapse.handlers.pagination import PaginationHandler, PurgeStatus
+from synapse.handlers.pagination import DeleteStatus, PaginationHandler
 from synapse.rest.client import directory, events, login, room
 from synapse.server import HomeServer
+from synapse.types import UserID
 from synapse.util import Clock
 from synapse.util.stringutils import random_string
 
@@ -35,6 +36,9 @@ from tests import unittest
 """Tests admin REST events for /rooms paths."""
 
 
+ONE_HOUR_IN_S = 3600
+
+
 class DeleteRoomTestCase(unittest.HomeserverTestCase):
     servlets = [
         synapse.rest.admin.register_servlets,
@@ -502,6 +506,9 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
         )
         self.url_status_by_delete_id = "/_synapse/admin/v2/rooms/delete_status/"
 
+        self.room_member_handler = hs.get_room_member_handler()
+        self.pagination_handler = hs.get_pagination_handler()
+
     @parameterized.expand(
         [
             ("DELETE", "/_synapse/admin/v2/rooms/%s"),
@@ -686,8 +693,10 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
         self.assertEqual(2, len(channel.json_body["results"]))
         self.assertEqual("complete", channel.json_body["results"][0]["status"])
         self.assertEqual("complete", channel.json_body["results"][1]["status"])
-        self.assertEqual(delete_id1, channel.json_body["results"][0]["delete_id"])
-        self.assertEqual(delete_id2, channel.json_body["results"][1]["delete_id"])
+        delete_ids = {delete_id1, delete_id2}
+        self.assertTrue(channel.json_body["results"][0]["delete_id"] in delete_ids)
+        delete_ids.remove(channel.json_body["results"][0]["delete_id"])
+        self.assertTrue(channel.json_body["results"][1]["delete_id"] in delete_ids)
 
         # get status after more than clearing time for first task
         # second task is not cleared
@@ -742,7 +751,7 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
         self.assertEqual(400, second_channel.code, msg=second_channel.json_body)
         self.assertEqual(Codes.UNKNOWN, second_channel.json_body["errcode"])
         self.assertEqual(
-            f"History purge already in progress for {self.room_id}",
+            f"Purge already in progress for {self.room_id}",
             second_channel.json_body["error"],
         )
 
@@ -972,6 +981,121 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
         # Assert we can no longer peek into the room
         self._assert_peek(self.room_id, expect_code=403)
 
+    @unittest.override_config({"purge_retention_period": "1d"})
+    def test_purge_forgotten_room(self) -> None:
+        # Create a test room
+        room_id = self.helper.create_room_as(
+            self.admin_user,
+            tok=self.admin_user_tok,
+        )
+
+        self.helper.leave(room_id, user=self.admin_user, tok=self.admin_user_tok)
+        self.get_success(
+            self.room_member_handler.forget(
+                UserID.from_string(self.admin_user), room_id
+            )
+        )
+
+        # Test that room is not yet purged
+        with self.assertRaises(AssertionError):
+            self._is_purged(room_id)
+
+        # Advance 24 hours in the future, past the `purge_retention_period`
+        self.reactor.advance(24 * ONE_HOUR_IN_S)
+
+        self._is_purged(room_id)
+
+    def test_resume_purge_room(self) -> None:
+        # Create a test room
+        room_id = self.helper.create_room_as(
+            self.admin_user,
+            tok=self.admin_user_tok,
+        )
+        self.helper.leave(room_id, user=self.admin_user, tok=self.admin_user_tok)
+
+        self.get_success(
+            self.store.upsert_room_to_delete(
+                room_id,
+                random_string(16),
+                DeleteStatus.ACTION_PURGE,
+                DeleteStatus.STATUS_PURGING,
+            )
+        )
+
+        # Test that room is not yet purged
+        with self.assertRaises(AssertionError):
+            self._is_purged(room_id)
+
+        # Advance one hour in the future past `PURGE_ROOMS_INTERVAL_MS` so that
+        # the automatic purging takes place and resumes the purge
+        self.reactor.advance(ONE_HOUR_IN_S)
+
+        self._is_purged(room_id)
+
+    def test_resume_shutdown_room(self) -> None:
+        # Create a test room
+        room_id = self.helper.create_room_as(
+            self.other_user,
+            tok=self.other_user_tok,
+        )
+
+        delete_id = random_string(16)
+
+        self.get_success(
+            self.store.upsert_room_to_delete(
+                room_id,
+                delete_id,
+                DeleteStatus.ACTION_SHUTDOWN,
+                DeleteStatus.STATUS_SHUTTING_DOWN,
+                params=json.dumps(
+                    {
+                        "requester_user_id": self.admin_user,
+                        "new_room_user_id": self.admin_user,
+                        "new_room_name": None,
+                        "message": None,
+                        "block": False,
+                        "purge": True,
+                        "force_purge": True,
+                    }
+                ),
+            )
+        )
+
+        # Test that room is not yet shutdown
+        self._is_member(room_id, self.other_user)
+
+        # Test that room is not yet purged
+        with self.assertRaises(AssertionError):
+            self._is_purged(room_id)
+
+        # Advance one hour in the future past `PURGE_ROOMS_INTERVAL_MS` so that
+        # the automatic purging takes place and resumes the purge
+        self.reactor.advance(ONE_HOUR_IN_S)
+
+        # Test that all users has been kicked (room is shutdown)
+        self._has_no_members(room_id)
+
+        self._is_purged(room_id)
+
+        # Retrieve delete results
+        result = self.make_request(
+            "GET",
+            self.url_status_by_delete_id + delete_id,
+            access_token=self.admin_user_tok,
+        )
+        self.assertEqual(200, result.code, msg=result.json_body)
+
+        # Check that the user is in kicked_users
+        self.assertIn(
+            self.other_user, result.json_body["shutdown_room"]["kicked_users"]
+        )
+
+        new_room_id = result.json_body["shutdown_room"]["new_room_id"]
+        self.assertTrue(new_room_id)
+
+        # Check that the user is actually in the new room
+        self._is_member(new_room_id, self.other_user)
+
     def _is_blocked(self, room_id: str, expect: bool = True) -> None:
         """Assert that the room is blocked or not"""
         d = self.store.is_room_blocked(room_id)
@@ -1958,13 +2082,13 @@ class RoomMessagesTestCase(unittest.HomeserverTestCase):
 
         # Purge every event before the second event.
         purge_id = random_string(16)
-        pagination_handler._purges_by_id[purge_id] = PurgeStatus()
         self.get_success(
             pagination_handler._purge_history(
                 purge_id=purge_id,
                 room_id=self.room_id,
                 token=second_token_str,
                 delete_local_events=True,
+                update_rooms_to_delete_table=True,
             )
         )
 
diff --git a/tests/rest/admin/test_server_notice.py b/tests/rest/admin/test_server_notice.py
index 28b999573e..d14da9fd0e 100644
--- a/tests/rest/admin/test_server_notice.py
+++ b/tests/rest/admin/test_server_notice.py
@@ -22,6 +22,7 @@ from synapse.server import HomeServer
 from synapse.storage.roommember import RoomsForUser
 from synapse.types import JsonDict
 from synapse.util import Clock
+from synapse.util.stringutils import random_string
 
 from tests import unittest
 from tests.unittest import override_config
@@ -413,11 +414,25 @@ class ServerNoticeTestCase(unittest.HomeserverTestCase):
         self.assertEqual(messages[0]["content"]["body"], "test msg one")
         self.assertEqual(messages[0]["sender"], "@notices:test")
 
+        delete_id = random_string(16)
+
         # shut down and purge room
         self.get_success(
-            self.room_shutdown_handler.shutdown_room(first_room_id, self.admin_user)
-        )
-        self.get_success(self.pagination_handler.purge_room(first_room_id))
+            self.room_shutdown_handler.shutdown_room(
+                first_room_id,
+                delete_id,
+                {
+                    "requester_user_id": self.admin_user,
+                    "new_room_user_id": None,
+                    "new_room_name": None,
+                    "message": None,
+                    "block": False,
+                    "purge": True,
+                    "force_purge": False,
+                },
+            )
+        )
+        self.get_success(self.pagination_handler.purge_room(first_room_id, "delete_id"))
 
         # user is not member anymore
         self._check_invite_and_join_status(self.other_user, 0, 0)
diff --git a/tests/rest/client/test_rooms.py b/tests/rest/client/test_rooms.py
index f1b4e1ad2f..98c3f99d11 100644
--- a/tests/rest/client/test_rooms.py
+++ b/tests/rest/client/test_rooms.py
@@ -41,7 +41,6 @@ from synapse.api.errors import Codes, HttpResponseException
 from synapse.appservice import ApplicationService
 from synapse.events import EventBase
 from synapse.events.snapshot import EventContext
-from synapse.handlers.pagination import PurgeStatus
 from synapse.rest import admin
 from synapse.rest.client import account, directory, login, profile, register, room, sync
 from synapse.server import HomeServer
@@ -2090,13 +2089,13 @@ class RoomMessageListTestCase(RoomBase):
 
         # Purge every event before the second event.
         purge_id = random_string(16)
-        pagination_handler._purges_by_id[purge_id] = PurgeStatus()
         self.get_success(
             pagination_handler._purge_history(
                 purge_id=purge_id,
                 room_id=self.room_id,
                 token=second_token_str,
                 delete_local_events=True,
+                update_rooms_to_delete_table=True,
             )
         )