summary refs log tree commit diff
path: root/synapse/handlers/pagination.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/pagination.py')
-rw-r--r--synapse/handlers/pagination.py291
1 files changed, 283 insertions, 8 deletions
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index abfe7be0e3..cd64142735 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
-from typing import TYPE_CHECKING, Any, Dict, Optional, Set
+from typing import TYPE_CHECKING, Any, Collection, Dict, List, Optional, Set
 
 import attr
 
@@ -22,7 +22,7 @@ from twisted.python.failure import Failure
 from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import SynapseError
 from synapse.api.filtering import Filter
-from synapse.logging.context import run_in_background
+from synapse.handlers.room import ShutdownRoomResponse
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage.state import StateFilter
 from synapse.streams.config import PaginationConfig
@@ -56,11 +56,62 @@ class PurgeStatus:
         STATUS_FAILED: "failed",
     }
 
+    # Save the error message if an error occurs
+    error: str = ""
+
     # Tracks whether this request has completed. One of STATUS_{ACTIVE,COMPLETE,FAILED}.
     status: int = STATUS_ACTIVE
 
     def asdict(self) -> JsonDict:
-        return {"status": PurgeStatus.STATUS_TEXT[self.status]}
+        ret = {"status": PurgeStatus.STATUS_TEXT[self.status]}
+        if self.error:
+            ret["error"] = self.error
+        return ret
+
+
+@attr.s(slots=True, auto_attribs=True)
+class DeleteStatus:
+    """Object tracking the status of a delete room request
+
+    This class contains information on the progress of a delete room request, for
+    return by get_delete_status.
+    """
+
+    STATUS_PURGING = 0
+    STATUS_COMPLETE = 1
+    STATUS_FAILED = 2
+    STATUS_SHUTTING_DOWN = 3
+
+    STATUS_TEXT = {
+        STATUS_PURGING: "purging",
+        STATUS_COMPLETE: "complete",
+        STATUS_FAILED: "failed",
+        STATUS_SHUTTING_DOWN: "shutting_down",
+    }
+
+    # Tracks whether this request has completed.
+    # One of STATUS_{PURGING,COMPLETE,FAILED,SHUTTING_DOWN}.
+    status: int = STATUS_PURGING
+
+    # Save the error message if an error occurs
+    error: str = ""
+
+    # Saves the result of an action to give it back to REST API
+    shutdown_room: ShutdownRoomResponse = {
+        "kicked_users": [],
+        "failed_to_kick_users": [],
+        "local_aliases": [],
+        "new_room_id": None,
+    }
+
+    def asdict(self) -> JsonDict:
+        ret = {
+            "status": DeleteStatus.STATUS_TEXT[self.status],
+            "shutdown_room": self.shutdown_room,
+        }
+        if self.error:
+            ret["error"] = self.error
+        return ret
 
 
 class PaginationHandler:
@@ -70,6 +121,9 @@ class PaginationHandler:
     paginating during a purge.
     """
 
+    # when to remove a completed deletion/purge from the results map
+    CLEAR_PURGE_AFTER_MS = 1000 * 3600 * 24  # 24 hours
+
     def __init__(self, hs: "HomeServer"):
         self.hs = hs
         self.auth = hs.get_auth()
@@ -78,11 +132,18 @@ class PaginationHandler:
         self.state_store = self.storage.state
         self.clock = hs.get_clock()
         self._server_name = hs.hostname
+        self._room_shutdown_handler = hs.get_room_shutdown_handler()
 
         self.pagination_lock = ReadWriteLock()
+        # IDs of rooms in which there currently an active purge *or delete* operation.
         self._purges_in_progress_by_room: Set[str] = set()
         # map from purge id to PurgeStatus
         self._purges_by_id: Dict[str, PurgeStatus] = {}
+        # map from purge id to DeleteStatus
+        self._delete_by_id: Dict[str, DeleteStatus] = {}
+        # map from room id to delete ids
+        # Dict[`room_id`, List[`delete_id`]]
+        self._delete_by_room: Dict[str, List[str]] = {}
         self._event_serializer = hs.get_event_client_serializer()
 
         self._retention_default_max_lifetime = (
@@ -265,8 +326,13 @@ class PaginationHandler:
         logger.info("[purge] starting purge_id %s", purge_id)
 
         self._purges_by_id[purge_id] = PurgeStatus()
-        run_in_background(
-            self._purge_history, purge_id, room_id, token, delete_local_events
+        run_as_background_process(
+            "purge_history",
+            self._purge_history,
+            purge_id,
+            room_id,
+            token,
+            delete_local_events,
         )
         return purge_id
 
@@ -276,7 +342,7 @@ class PaginationHandler:
         """Carry out a history purge on a room.
 
         Args:
-            purge_id: The id for this purge
+            purge_id: The ID for this purge.
             room_id: The room to purge from
             token: topological token to delete events before
             delete_local_events: True to delete local events as well as remote ones
@@ -295,6 +361,7 @@ class PaginationHandler:
                 "[purge] failed", exc_info=(f.type, f.value, f.getTracebackObject())  # type: ignore
             )
             self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED
+            self._purges_by_id[purge_id].error = f.getErrorMessage()
         finally:
             self._purges_in_progress_by_room.discard(room_id)
 
@@ -302,7 +369,9 @@ class PaginationHandler:
             def clear_purge() -> None:
                 del self._purges_by_id[purge_id]
 
-            self.hs.get_reactor().callLater(24 * 3600, clear_purge)
+            self.hs.get_reactor().callLater(
+                PaginationHandler.CLEAR_PURGE_AFTER_MS / 1000, clear_purge
+            )
 
     def get_purge_status(self, purge_id: str) -> Optional[PurgeStatus]:
         """Get the current status of an active purge
@@ -312,8 +381,25 @@ class PaginationHandler:
         """
         return self._purges_by_id.get(purge_id)
 
+    def get_delete_status(self, delete_id: str) -> Optional[DeleteStatus]:
+        """Get the current status of an active deleting
+
+        Args:
+            delete_id: delete_id returned by start_shutdown_and_purge_room
+        """
+        return self._delete_by_id.get(delete_id)
+
+    def get_delete_ids_by_room(self, room_id: str) -> Optional[Collection[str]]:
+        """Get all active delete ids by room
+
+        Args:
+            room_id: room_id that is deleted
+        """
+        return self._delete_by_room.get(room_id)
+
     async def purge_room(self, room_id: str, force: bool = False) -> None:
         """Purge the given room from the database.
+        This function is part the delete room v1 API.
 
         Args:
             room_id: room to be purged
@@ -424,7 +510,7 @@ class PaginationHandler:
 
         if events:
             if event_filter:
-                events = event_filter.filter(events)
+                events = await event_filter.filter(events)
 
             events = await filter_events_for_client(
                 self.storage, user_id, events, is_peeking=(member_event_id is None)
@@ -472,3 +558,192 @@ class PaginationHandler:
             )
 
         return chunk
+
+    async def _shutdown_and_purge_room(
+        self,
+        delete_id: str,
+        room_id: str,
+        requester_user_id: str,
+        new_room_user_id: Optional[str] = None,
+        new_room_name: Optional[str] = None,
+        message: Optional[str] = None,
+        block: bool = False,
+        purge: bool = True,
+        force_purge: bool = False,
+    ) -> None:
+        """
+        Shuts down and purges a room.
+
+        See `RoomShutdownHandler.shutdown_room` for details of creation of the new room
+
+        Args:
+            delete_id: The ID for this delete.
+            room_id: The ID of the room to shut down.
+            requester_user_id:
+                User who requested the action. Will be recorded as putting the room on the
+                blocking list.
+            new_room_user_id:
+                If set, a new room will be created with this user ID
+                as the creator and admin, and all users in the old room will be
+                moved into that room. If not set, no new room will be created
+                and the users will just be removed from the old room.
+            new_room_name:
+                A string representing the name of the room that new users will
+                be invited to. Defaults to `Content Violation Notification`
+            message:
+                A string containing the first message that will be sent as
+                `new_room_user_id` in the new room. Ideally this will clearly
+                convey why the original room was shut down.
+                Defaults to `Sharing illegal content on this server is not
+                permitted and rooms in violation will be blocked.`
+            block:
+                If set to `true`, this room will be added to a blocking list,
+                preventing future attempts to join the room. Defaults to `false`.
+            purge:
+                If set to `true`, purge the given room from the database.
+            force_purge:
+                If set to `true`, the room will be purged from database
+                also if it fails to remove some users from room.
+
+        Saves a `RoomShutdownHandler.ShutdownRoomResponse` in `DeleteStatus`:
+        """
+
+        self._purges_in_progress_by_room.add(room_id)
+        try:
+            with await self.pagination_lock.write(room_id):
+                self._delete_by_id[delete_id].status = DeleteStatus.STATUS_SHUTTING_DOWN
+                self._delete_by_id[
+                    delete_id
+                ].shutdown_room = await self._room_shutdown_handler.shutdown_room(
+                    room_id=room_id,
+                    requester_user_id=requester_user_id,
+                    new_room_user_id=new_room_user_id,
+                    new_room_name=new_room_name,
+                    message=message,
+                    block=block,
+                )
+                self._delete_by_id[delete_id].status = DeleteStatus.STATUS_PURGING
+
+                if purge:
+                    logger.info("starting purge room_id %s", room_id)
+
+                    # first check that we have no users in this room
+                    if not force_purge:
+                        joined = await self.store.is_host_joined(
+                            room_id, self._server_name
+                        )
+                        if joined:
+                            raise SynapseError(
+                                400, "Users are still joined to this room"
+                            )
+
+                    await self.storage.purge_events.purge_room(room_id)
+
+            logger.info("complete")
+            self._delete_by_id[delete_id].status = DeleteStatus.STATUS_COMPLETE
+        except Exception:
+            f = Failure()
+            logger.error(
+                "failed",
+                exc_info=(f.type, f.value, f.getTracebackObject()),  # type: ignore
+            )
+            self._delete_by_id[delete_id].status = DeleteStatus.STATUS_FAILED
+            self._delete_by_id[delete_id].error = f.getErrorMessage()
+        finally:
+            self._purges_in_progress_by_room.discard(room_id)
+
+            # remove the delete from the list 24 hours after it completes
+            def clear_delete() -> None:
+                del self._delete_by_id[delete_id]
+                self._delete_by_room[room_id].remove(delete_id)
+                if not self._delete_by_room[room_id]:
+                    del self._delete_by_room[room_id]
+
+            self.hs.get_reactor().callLater(
+                PaginationHandler.CLEAR_PURGE_AFTER_MS / 1000, clear_delete
+            )
+
+    def start_shutdown_and_purge_room(
+        self,
+        room_id: str,
+        requester_user_id: str,
+        new_room_user_id: Optional[str] = None,
+        new_room_name: Optional[str] = None,
+        message: Optional[str] = None,
+        block: bool = False,
+        purge: bool = True,
+        force_purge: bool = False,
+    ) -> str:
+        """Start off shut down and purge on a room.
+
+        Args:
+            room_id: The ID of the room to shut down.
+            requester_user_id:
+                User who requested the action and put the room on the
+                blocking list.
+            new_room_user_id:
+                If set, a new room will be created with this user ID
+                as the creator and admin, and all users in the old room will be
+                moved into that room. If not set, no new room will be created
+                and the users will just be removed from the old room.
+            new_room_name:
+                A string representing the name of the room that new users will
+                be invited to. Defaults to `Content Violation Notification`
+            message:
+                A string containing the first message that will be sent as
+                `new_room_user_id` in the new room. Ideally this will clearly
+                convey why the original room was shut down.
+                Defaults to `Sharing illegal content on this server is not
+                permitted and rooms in violation will be blocked.`
+            block:
+                If set to `true`, this room will be added to a blocking list,
+                preventing future attempts to join the room. Defaults to `false`.
+            purge:
+                If set to `true`, purge the given room from the database.
+            force_purge:
+                If set to `true`, the room will be purged from database
+                also if it fails to remove some users from room.
+
+        Returns:
+            unique ID for this delete transaction.
+        """
+        if room_id in self._purges_in_progress_by_room:
+            raise SynapseError(
+                400, "History purge already in progress for %s" % (room_id,)
+            )
+
+        # This check is double to `RoomShutdownHandler.shutdown_room`
+        # But here the requester get a direct response / error with HTTP request
+        # and do not have to check the purge status
+        if new_room_user_id is not None:
+            if not self.hs.is_mine_id(new_room_user_id):
+                raise SynapseError(
+                    400, "User must be our own: %s" % (new_room_user_id,)
+                )
+
+        delete_id = random_string(16)
+
+        # we log the delete_id here so that it can be tied back to the
+        # request id in the log lines.
+        logger.info(
+            "starting shutdown room_id %s with delete_id %s",
+            room_id,
+            delete_id,
+        )
+
+        self._delete_by_id[delete_id] = DeleteStatus()
+        self._delete_by_room.setdefault(room_id, []).append(delete_id)
+        run_as_background_process(
+            "shutdown_and_purge_room",
+            self._shutdown_and_purge_room,
+            delete_id,
+            room_id,
+            requester_user_id,
+            new_room_user_id,
+            new_room_name,
+            message,
+            block,
+            purge,
+            force_purge,
+        )
+        return delete_id