summary refs log tree commit diff
path: root/synapse/handlers/pagination.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/pagination.py')
-rw-r--r--synapse/handlers/pagination.py464
1 files changed, 155 insertions, 309 deletions
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 19cf5a2b43..878f267a4e 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -13,9 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
-from typing import TYPE_CHECKING, Dict, List, Optional, Set
-
-import attr
+from typing import TYPE_CHECKING, List, Optional, Set, Tuple, cast
 
 from twisted.python.failure import Failure
 
@@ -23,16 +21,22 @@ from synapse.api.constants import Direction, EventTypes, Membership
 from synapse.api.errors import SynapseError
 from synapse.api.filtering import Filter
 from synapse.events.utils import SerializeEventConfig
-from synapse.handlers.room import ShutdownRoomResponse
+from synapse.handlers.room import ShutdownRoomParams, ShutdownRoomResponse
 from synapse.handlers.worker_lock import NEW_EVENT_DURING_PURGE_LOCK_NAME
 from synapse.logging.opentracing import trace
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.rest.admin._base import assert_user_is_admin
 from synapse.streams.config import PaginationConfig
-from synapse.types import JsonDict, Requester, StrCollection, StreamKeyType
+from synapse.types import (
+    JsonDict,
+    JsonMapping,
+    Requester,
+    ScheduledTask,
+    StreamKeyType,
+    TaskStatus,
+)
 from synapse.types.state import StateFilter
 from synapse.util.async_helpers import ReadWriteLock
-from synapse.util.stringutils import random_string
 from synapse.visibility import filter_events_for_client
 
 if TYPE_CHECKING:
@@ -53,80 +57,11 @@ BACKFILL_BECAUSE_TOO_MANY_GAPS_THRESHOLD = 3
 PURGE_PAGINATION_LOCK_NAME = "purge_pagination_lock"
 
 
-@attr.s(slots=True, auto_attribs=True)
-class PurgeStatus:
-    """Object tracking the status of a purge request
-
-    This class contains information on the progress of a purge request, for
-    return by get_purge_status.
-    """
-
-    STATUS_ACTIVE = 0
-    STATUS_COMPLETE = 1
-    STATUS_FAILED = 2
-
-    STATUS_TEXT = {
-        STATUS_ACTIVE: "active",
-        STATUS_COMPLETE: "complete",
-        STATUS_FAILED: "failed",
-    }
-
-    # Save the error message if an error occurs
-    error: str = ""
-
-    # Tracks whether this request has completed. One of STATUS_{ACTIVE,COMPLETE,FAILED}.
-    status: int = STATUS_ACTIVE
-
-    def asdict(self) -> JsonDict:
-        ret = {"status": PurgeStatus.STATUS_TEXT[self.status]}
-        if self.error:
-            ret["error"] = self.error
-        return ret
-
-
-@attr.s(slots=True, auto_attribs=True)
-class DeleteStatus:
-    """Object tracking the status of a delete room request
+PURGE_HISTORY_ACTION_NAME = "purge_history"
 
-    This class contains information on the progress of a delete room request, for
-    return by get_delete_status.
-    """
+PURGE_ROOM_ACTION_NAME = "purge_room"
 
-    STATUS_PURGING = 0
-    STATUS_COMPLETE = 1
-    STATUS_FAILED = 2
-    STATUS_SHUTTING_DOWN = 3
-
-    STATUS_TEXT = {
-        STATUS_PURGING: "purging",
-        STATUS_COMPLETE: "complete",
-        STATUS_FAILED: "failed",
-        STATUS_SHUTTING_DOWN: "shutting_down",
-    }
-
-    # Tracks whether this request has completed.
-    # One of STATUS_{PURGING,COMPLETE,FAILED,SHUTTING_DOWN}.
-    status: int = STATUS_PURGING
-
-    # Save the error message if an error occurs
-    error: str = ""
-
-    # Saves the result of an action to give it back to REST API
-    shutdown_room: ShutdownRoomResponse = {
-        "kicked_users": [],
-        "failed_to_kick_users": [],
-        "local_aliases": [],
-        "new_room_id": None,
-    }
-
-    def asdict(self) -> JsonDict:
-        ret = {
-            "status": DeleteStatus.STATUS_TEXT[self.status],
-            "shutdown_room": self.shutdown_room,
-        }
-        if self.error:
-            ret["error"] = self.error
-        return ret
+SHUTDOWN_AND_PURGE_ROOM_ACTION_NAME = "shutdown_and_purge_room"
 
 
 class PaginationHandler:
@@ -136,9 +71,6 @@ class PaginationHandler:
     paginating during a purge.
     """
 
-    # when to remove a completed deletion/purge from the results map
-    CLEAR_PURGE_AFTER_MS = 1000 * 3600 * 24  # 24 hours
-
     def __init__(self, hs: "HomeServer"):
         self.hs = hs
         self.auth = hs.get_auth()
@@ -150,17 +82,11 @@ class PaginationHandler:
         self._room_shutdown_handler = hs.get_room_shutdown_handler()
         self._relations_handler = hs.get_relations_handler()
         self._worker_locks = hs.get_worker_locks_handler()
+        self._task_scheduler = hs.get_task_scheduler()
 
         self.pagination_lock = ReadWriteLock()
         # IDs of rooms in which there currently an active purge *or delete* operation.
         self._purges_in_progress_by_room: Set[str] = set()
-        # map from purge id to PurgeStatus
-        self._purges_by_id: Dict[str, PurgeStatus] = {}
-        # map from purge id to DeleteStatus
-        self._delete_by_id: Dict[str, DeleteStatus] = {}
-        # map from room id to delete ids
-        # Dict[`room_id`, List[`delete_id`]]
-        self._delete_by_room: Dict[str, List[str]] = {}
         self._event_serializer = hs.get_event_client_serializer()
 
         self._retention_default_max_lifetime = (
@@ -173,6 +99,9 @@ class PaginationHandler:
         self._retention_allowed_lifetime_max = (
             hs.config.retention.retention_allowed_lifetime_max
         )
+        self._forgotten_room_retention_period = (
+            hs.config.server.forgotten_room_retention_period
+        )
         self._is_master = hs.config.worker.worker_app is None
 
         if hs.config.retention.retention_enabled and self._is_master:
@@ -189,6 +118,14 @@ class PaginationHandler:
                     job.longest_max_lifetime,
                 )
 
+        self._task_scheduler.register_action(
+            self._purge_history, PURGE_HISTORY_ACTION_NAME
+        )
+        self._task_scheduler.register_action(self._purge_room, PURGE_ROOM_ACTION_NAME)
+        self._task_scheduler.register_action(
+            self._shutdown_and_purge_room, SHUTDOWN_AND_PURGE_ROOM_ACTION_NAME
+        )
+
     async def purge_history_for_rooms_in_range(
         self, min_ms: Optional[int], max_ms: Optional[int]
     ) -> None:
@@ -224,7 +161,7 @@ class PaginationHandler:
             include_null = False
 
         logger.info(
-            "[purge] Running purge job for %s < max_lifetime <= %s (include NULLs = %s)",
+            "[purge] Running retention purge job for %s < max_lifetime <= %s (include NULLs = %s)",
             min_ms,
             max_ms,
             include_null,
@@ -239,10 +176,10 @@ class PaginationHandler:
         for room_id, retention_policy in rooms.items():
             logger.info("[purge] Attempting to purge messages in room %s", room_id)
 
-            if room_id in self._purges_in_progress_by_room:
+            if len(await self.get_delete_tasks_by_room(room_id, only_active=True)) > 0:
                 logger.warning(
-                    "[purge] not purging room %s as there's an ongoing purge running"
-                    " for this room",
+                    "[purge] not purging room %s for retention as there's an ongoing purge"
+                    " running for this room",
                     room_id,
                 )
                 continue
@@ -295,27 +232,20 @@ class PaginationHandler:
             (stream, topo, _event_id) = r
             token = "t%d-%d" % (topo, stream)
 
-            purge_id = random_string(16)
-
-            self._purges_by_id[purge_id] = PurgeStatus()
-
-            logger.info(
-                "Starting purging events in room %s (purge_id %s)" % (room_id, purge_id)
-            )
+            logger.info("Starting purging events in room %s", room_id)
 
             # We want to purge everything, including local events, and to run the purge in
             # the background so that it's not blocking any other operation apart from
             # other purges in the same room.
             run_as_background_process(
-                "_purge_history",
-                self._purge_history,
-                purge_id,
+                PURGE_HISTORY_ACTION_NAME,
+                self.purge_history,
                 room_id,
                 token,
                 True,
             )
 
-    def start_purge_history(
+    async def start_purge_history(
         self, room_id: str, token: str, delete_local_events: bool = False
     ) -> str:
         """Start off a history purge on a room.
@@ -329,40 +259,58 @@ class PaginationHandler:
         Returns:
             unique ID for this purge transaction.
         """
-        if room_id in self._purges_in_progress_by_room:
-            raise SynapseError(
-                400, "History purge already in progress for %s" % (room_id,)
-            )
-
-        purge_id = random_string(16)
+        purge_id = await self._task_scheduler.schedule_task(
+            PURGE_HISTORY_ACTION_NAME,
+            resource_id=room_id,
+            params={"token": token, "delete_local_events": delete_local_events},
+        )
 
         # we log the purge_id here so that it can be tied back to the
         # request id in the log lines.
         logger.info("[purge] starting purge_id %s", purge_id)
 
-        self._purges_by_id[purge_id] = PurgeStatus()
-        run_as_background_process(
-            "purge_history",
-            self._purge_history,
-            purge_id,
-            room_id,
-            token,
-            delete_local_events,
-        )
         return purge_id
 
     async def _purge_history(
-        self, purge_id: str, room_id: str, token: str, delete_local_events: bool
-    ) -> None:
+        self,
+        task: ScheduledTask,
+    ) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]:
+        """
+        Scheduler action to purge some history of a room.
+        """
+        if (
+            task.resource_id is None
+            or task.params is None
+            or "token" not in task.params
+            or "delete_local_events" not in task.params
+        ):
+            return (
+                TaskStatus.FAILED,
+                None,
+                "Not enough parameters passed to _purge_history",
+            )
+        err = await self.purge_history(
+            task.resource_id,
+            task.params["token"],
+            task.params["delete_local_events"],
+        )
+        if err is not None:
+            return TaskStatus.FAILED, None, err
+        return TaskStatus.COMPLETE, None, None
+
+    async def purge_history(
+        self,
+        room_id: str,
+        token: str,
+        delete_local_events: bool,
+    ) -> Optional[str]:
         """Carry out a history purge on a room.
 
         Args:
-            purge_id: The ID for this purge.
             room_id: The room to purge from
             token: topological token to delete events before
             delete_local_events: True to delete local events as well as remote ones
         """
-        self._purges_in_progress_by_room.add(room_id)
         try:
             async with self._worker_locks.acquire_read_write_lock(
                 PURGE_PAGINATION_LOCK_NAME, room_id, write=True
@@ -371,57 +319,68 @@ class PaginationHandler:
                     room_id, token, delete_local_events
                 )
             logger.info("[purge] complete")
-            self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
+            return None
         except Exception:
             f = Failure()
             logger.error(
                 "[purge] failed", exc_info=(f.type, f.value, f.getTracebackObject())
             )
-            self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED
-            self._purges_by_id[purge_id].error = f.getErrorMessage()
-        finally:
-            self._purges_in_progress_by_room.discard(room_id)
-
-            # remove the purge from the list 24 hours after it completes
-            def clear_purge() -> None:
-                del self._purges_by_id[purge_id]
-
-            self.hs.get_reactor().callLater(
-                PaginationHandler.CLEAR_PURGE_AFTER_MS / 1000, clear_purge
-            )
-
-    def get_purge_status(self, purge_id: str) -> Optional[PurgeStatus]:
-        """Get the current status of an active purge
+            return f.getErrorMessage()
 
-        Args:
-            purge_id: purge_id returned by start_purge_history
-        """
-        return self._purges_by_id.get(purge_id)
-
-    def get_delete_status(self, delete_id: str) -> Optional[DeleteStatus]:
+    async def get_delete_task(self, delete_id: str) -> Optional[ScheduledTask]:
         """Get the current status of an active deleting
 
         Args:
             delete_id: delete_id returned by start_shutdown_and_purge_room
+                or start_purge_history.
         """
-        return self._delete_by_id.get(delete_id)
+        return await self._task_scheduler.get_task(delete_id)
 
-    def get_delete_ids_by_room(self, room_id: str) -> Optional[StrCollection]:
-        """Get all active delete ids by room
+    async def get_delete_tasks_by_room(
+        self, room_id: str, only_active: Optional[bool] = False
+    ) -> List[ScheduledTask]:
+        """Get complete, failed or active delete tasks by room
 
         Args:
             room_id: room_id that is deleted
+            only_active: if True, completed&failed tasks will be omitted
+        """
+        statuses = [TaskStatus.ACTIVE]
+        if not only_active:
+            statuses += [TaskStatus.COMPLETE, TaskStatus.FAILED]
+
+        return await self._task_scheduler.get_tasks(
+            actions=[PURGE_ROOM_ACTION_NAME, SHUTDOWN_AND_PURGE_ROOM_ACTION_NAME],
+            resource_id=room_id,
+            statuses=statuses,
+        )
+
+    async def _purge_room(
+        self,
+        task: ScheduledTask,
+    ) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]:
+        """
+        Scheduler action to purge a room.
         """
-        return self._delete_by_room.get(room_id)
+        if not task.resource_id:
+            raise Exception("No room id passed to purge_room task")
+        params = task.params if task.params else {}
+        await self.purge_room(task.resource_id, params.get("force", False))
+        return TaskStatus.COMPLETE, None, None
 
-    async def purge_room(self, room_id: str, force: bool = False) -> None:
+    async def purge_room(
+        self,
+        room_id: str,
+        force: bool,
+    ) -> None:
         """Purge the given room from the database.
-        This function is part the delete room v1 API.
 
         Args:
             room_id: room to be purged
             force: set true to skip checking for joined users.
         """
+        logger.info("starting purge room_id=%s force=%s", room_id, force)
+
         async with self._worker_locks.acquire_multi_read_write_lock(
             [
                 (PURGE_PAGINATION_LOCK_NAME, room_id),
@@ -430,13 +389,20 @@ class PaginationHandler:
             write=True,
         ):
             # first check that we have no users in this room
-            if not force:
-                joined = await self.store.is_host_joined(room_id, self._server_name)
-                if joined:
+            joined = await self.store.is_host_joined(room_id, self._server_name)
+            if joined:
+                if force:
+                    logger.info(
+                        "force-purging room %s with some local users still joined",
+                        room_id,
+                    )
+                else:
                     raise SynapseError(400, "Users are still joined to this room")
 
             await self._storage_controllers.purge_events.purge_room(room_id)
 
+        logger.info("purge complete for room_id %s", room_id)
+
     @trace
     async def get_messages(
         self,
@@ -711,177 +677,72 @@ class PaginationHandler:
 
     async def _shutdown_and_purge_room(
         self,
-        delete_id: str,
-        room_id: str,
-        requester_user_id: Optional[str],
-        new_room_user_id: Optional[str] = None,
-        new_room_name: Optional[str] = None,
-        message: Optional[str] = None,
-        block: bool = False,
-        purge: bool = True,
-        force_purge: bool = False,
-    ) -> None:
+        task: ScheduledTask,
+    ) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]:
         """
-        Shuts down and purges a room.
-
-        See `RoomShutdownHandler.shutdown_room` for details of creation of the new room
-
-        Args:
-            delete_id: The ID for this delete.
-            room_id: The ID of the room to shut down.
-            requester_user_id:
-                User who requested the action. Will be recorded as putting the room on the
-                blocking list.
-                If None, the action was not manually requested but instead
-                triggered automatically, e.g. through a Synapse module
-                or some other policy.
-                MUST NOT be None if block=True.
-            new_room_user_id:
-                If set, a new room will be created with this user ID
-                as the creator and admin, and all users in the old room will be
-                moved into that room. If not set, no new room will be created
-                and the users will just be removed from the old room.
-            new_room_name:
-                A string representing the name of the room that new users will
-                be invited to. Defaults to `Content Violation Notification`
-            message:
-                A string containing the first message that will be sent as
-                `new_room_user_id` in the new room. Ideally this will clearly
-                convey why the original room was shut down.
-                Defaults to `Sharing illegal content on this server is not
-                permitted and rooms in violation will be blocked.`
-            block:
-                If set to `true`, this room will be added to a blocking list,
-                preventing future attempts to join the room. Defaults to `false`.
-            purge:
-                If set to `true`, purge the given room from the database.
-            force_purge:
-                If set to `true`, the room will be purged from database
-                also if it fails to remove some users from room.
-
-        Saves a `RoomShutdownHandler.ShutdownRoomResponse` in `DeleteStatus`:
+        Scheduler action to shutdown and purge a room.
         """
+        if task.resource_id is None or task.params is None:
+            raise Exception(
+                "No room id and/or no parameters passed to shutdown_and_purge_room task"
+            )
 
-        self._purges_in_progress_by_room.add(room_id)
-        try:
-            async with self._worker_locks.acquire_read_write_lock(
-                PURGE_PAGINATION_LOCK_NAME, room_id, write=True
-            ):
-                self._delete_by_id[delete_id].status = DeleteStatus.STATUS_SHUTTING_DOWN
-                self._delete_by_id[
-                    delete_id
-                ].shutdown_room = await self._room_shutdown_handler.shutdown_room(
-                    room_id=room_id,
-                    requester_user_id=requester_user_id,
-                    new_room_user_id=new_room_user_id,
-                    new_room_name=new_room_name,
-                    message=message,
-                    block=block,
-                )
-                self._delete_by_id[delete_id].status = DeleteStatus.STATUS_PURGING
+        room_id = task.resource_id
 
-                if purge:
-                    logger.info("starting purge room_id %s", room_id)
+        async def update_result(result: Optional[JsonMapping]) -> None:
+            await self._task_scheduler.update_task(task.id, result=result)
 
-                    # first check that we have no users in this room
-                    if not force_purge:
-                        joined = await self.store.is_host_joined(
-                            room_id, self._server_name
-                        )
-                        if joined:
-                            raise SynapseError(
-                                400, "Users are still joined to this room"
-                            )
+        shutdown_result = (
+            cast(ShutdownRoomResponse, task.result) if task.result else None
+        )
 
-                    await self._storage_controllers.purge_events.purge_room(room_id)
+        shutdown_result = await self._room_shutdown_handler.shutdown_room(
+            room_id,
+            cast(ShutdownRoomParams, task.params),
+            shutdown_result,
+            update_result,
+        )
 
-            logger.info("purge complete for room_id %s", room_id)
-            self._delete_by_id[delete_id].status = DeleteStatus.STATUS_COMPLETE
-        except Exception:
-            f = Failure()
-            logger.error(
-                "failed",
-                exc_info=(f.type, f.value, f.getTracebackObject()),
-            )
-            self._delete_by_id[delete_id].status = DeleteStatus.STATUS_FAILED
-            self._delete_by_id[delete_id].error = f.getErrorMessage()
-        finally:
-            self._purges_in_progress_by_room.discard(room_id)
-
-            # remove the delete from the list 24 hours after it completes
-            def clear_delete() -> None:
-                del self._delete_by_id[delete_id]
-                self._delete_by_room[room_id].remove(delete_id)
-                if not self._delete_by_room[room_id]:
-                    del self._delete_by_room[room_id]
-
-            self.hs.get_reactor().callLater(
-                PaginationHandler.CLEAR_PURGE_AFTER_MS / 1000, clear_delete
+        if task.params.get("purge", False):
+            await self.purge_room(
+                room_id,
+                task.params.get("force_purge", False),
             )
 
-    def start_shutdown_and_purge_room(
+        return (TaskStatus.COMPLETE, shutdown_result, None)
+
+    async def start_shutdown_and_purge_room(
         self,
         room_id: str,
-        requester_user_id: Optional[str],
-        new_room_user_id: Optional[str] = None,
-        new_room_name: Optional[str] = None,
-        message: Optional[str] = None,
-        block: bool = False,
-        purge: bool = True,
-        force_purge: bool = False,
+        shutdown_params: ShutdownRoomParams,
     ) -> str:
         """Start off shut down and purge on a room.
 
         Args:
             room_id: The ID of the room to shut down.
-            requester_user_id:
-                User who requested the action and put the room on the
-                blocking list.
-                If None, the action was not manually requested but instead
-                triggered automatically, e.g. through a Synapse module
-                or some other policy.
-                MUST NOT be None if block=True.
-            new_room_user_id:
-                If set, a new room will be created with this user ID
-                as the creator and admin, and all users in the old room will be
-                moved into that room. If not set, no new room will be created
-                and the users will just be removed from the old room.
-            new_room_name:
-                A string representing the name of the room that new users will
-                be invited to. Defaults to `Content Violation Notification`
-            message:
-                A string containing the first message that will be sent as
-                `new_room_user_id` in the new room. Ideally this will clearly
-                convey why the original room was shut down.
-                Defaults to `Sharing illegal content on this server is not
-                permitted and rooms in violation will be blocked.`
-            block:
-                If set to `true`, this room will be added to a blocking list,
-                preventing future attempts to join the room. Defaults to `false`.
-            purge:
-                If set to `true`, purge the given room from the database.
-            force_purge:
-                If set to `true`, the room will be purged from database
-                also if it fails to remove some users from room.
+            shutdown_params: parameters for the shutdown
 
         Returns:
             unique ID for this delete transaction.
         """
-        if room_id in self._purges_in_progress_by_room:
-            raise SynapseError(
-                400, "History purge already in progress for %s" % (room_id,)
-            )
+        if len(await self.get_delete_tasks_by_room(room_id, only_active=True)) > 0:
+            raise SynapseError(400, "Purge already in progress for %s" % (room_id,))
 
         # This check is double to `RoomShutdownHandler.shutdown_room`
         # But here the requester get a direct response / error with HTTP request
         # and do not have to check the purge status
+        new_room_user_id = shutdown_params["new_room_user_id"]
         if new_room_user_id is not None:
             if not self.hs.is_mine_id(new_room_user_id):
                 raise SynapseError(
                     400, "User must be our own: %s" % (new_room_user_id,)
                 )
 
-        delete_id = random_string(16)
+        delete_id = await self._task_scheduler.schedule_task(
+            SHUTDOWN_AND_PURGE_ROOM_ACTION_NAME,
+            resource_id=room_id,
+            params=shutdown_params,
+        )
 
         # we log the delete_id here so that it can be tied back to the
         # request id in the log lines.
@@ -891,19 +752,4 @@ class PaginationHandler:
             delete_id,
         )
 
-        self._delete_by_id[delete_id] = DeleteStatus()
-        self._delete_by_room.setdefault(room_id, []).append(delete_id)
-        run_as_background_process(
-            "shutdown_and_purge_room",
-            self._shutdown_and_purge_room,
-            delete_id,
-            room_id,
-            requester_user_id,
-            new_room_user_id,
-            new_room_name,
-            message,
-            block,
-            purge,
-            force_purge,
-        )
         return delete_id