diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index aa26911aed..cd64142735 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
-from typing import TYPE_CHECKING, Any, Dict, Optional, Set
+from typing import TYPE_CHECKING, Any, Collection, Dict, List, Optional, Set
import attr
@@ -22,7 +22,7 @@ from twisted.python.failure import Failure
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import SynapseError
from synapse.api.filtering import Filter
-from synapse.logging.context import run_in_background
+from synapse.handlers.room import ShutdownRoomResponse
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.state import StateFilter
from synapse.streams.config import PaginationConfig
@@ -56,11 +56,62 @@ class PurgeStatus:
STATUS_FAILED: "failed",
}
+ # Save the error message if an error occurs
+ error: str = ""
+
# Tracks whether this request has completed. One of STATUS_{ACTIVE,COMPLETE,FAILED}.
status: int = STATUS_ACTIVE
def asdict(self) -> JsonDict:
- return {"status": PurgeStatus.STATUS_TEXT[self.status]}
+ ret = {"status": PurgeStatus.STATUS_TEXT[self.status]}
+ if self.error:
+ ret["error"] = self.error
+ return ret
+
+
+@attr.s(slots=True, auto_attribs=True)
+class DeleteStatus:
+ """Object tracking the status of a delete room request
+
+ This class contains information on the progress of a delete room request, for
+ return by get_delete_status.
+ """
+
+ STATUS_PURGING = 0
+ STATUS_COMPLETE = 1
+ STATUS_FAILED = 2
+ STATUS_SHUTTING_DOWN = 3
+
+ STATUS_TEXT = {
+ STATUS_PURGING: "purging",
+ STATUS_COMPLETE: "complete",
+ STATUS_FAILED: "failed",
+ STATUS_SHUTTING_DOWN: "shutting_down",
+ }
+
+ # Tracks whether this request has completed.
+ # One of STATUS_{PURGING,COMPLETE,FAILED,SHUTTING_DOWN}.
+ status: int = STATUS_PURGING
+
+ # Save the error message if an error occurs
+ error: str = ""
+
+ # Saves the result of an action to give it back to REST API
+ shutdown_room: ShutdownRoomResponse = {
+ "kicked_users": [],
+ "failed_to_kick_users": [],
+ "local_aliases": [],
+ "new_room_id": None,
+ }
+
+ def asdict(self) -> JsonDict:
+ ret = {
+ "status": DeleteStatus.STATUS_TEXT[self.status],
+ "shutdown_room": self.shutdown_room,
+ }
+ if self.error:
+ ret["error"] = self.error
+ return ret
class PaginationHandler:
@@ -70,6 +121,9 @@ class PaginationHandler:
paginating during a purge.
"""
+ # when to remove a completed deletion/purge from the results map
+ CLEAR_PURGE_AFTER_MS = 1000 * 3600 * 24 # 24 hours
+
def __init__(self, hs: "HomeServer"):
self.hs = hs
self.auth = hs.get_auth()
@@ -78,11 +132,18 @@ class PaginationHandler:
self.state_store = self.storage.state
self.clock = hs.get_clock()
self._server_name = hs.hostname
+ self._room_shutdown_handler = hs.get_room_shutdown_handler()
self.pagination_lock = ReadWriteLock()
+ # IDs of rooms in which there currently an active purge *or delete* operation.
self._purges_in_progress_by_room: Set[str] = set()
# map from purge id to PurgeStatus
self._purges_by_id: Dict[str, PurgeStatus] = {}
+ # map from purge id to DeleteStatus
+ self._delete_by_id: Dict[str, DeleteStatus] = {}
+ # map from room id to delete ids
+ # Dict[`room_id`, List[`delete_id`]]
+ self._delete_by_room: Dict[str, List[str]] = {}
self._event_serializer = hs.get_event_client_serializer()
self._retention_default_max_lifetime = (
@@ -265,8 +326,13 @@ class PaginationHandler:
logger.info("[purge] starting purge_id %s", purge_id)
self._purges_by_id[purge_id] = PurgeStatus()
- run_in_background(
- self._purge_history, purge_id, room_id, token, delete_local_events
+ run_as_background_process(
+ "purge_history",
+ self._purge_history,
+ purge_id,
+ room_id,
+ token,
+ delete_local_events,
)
return purge_id
@@ -276,7 +342,7 @@ class PaginationHandler:
"""Carry out a history purge on a room.
Args:
- purge_id: The id for this purge
+ purge_id: The ID for this purge.
room_id: The room to purge from
token: topological token to delete events before
delete_local_events: True to delete local events as well as remote ones
@@ -295,6 +361,7 @@ class PaginationHandler:
"[purge] failed", exc_info=(f.type, f.value, f.getTracebackObject()) # type: ignore
)
self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED
+ self._purges_by_id[purge_id].error = f.getErrorMessage()
finally:
self._purges_in_progress_by_room.discard(room_id)
@@ -302,7 +369,9 @@ class PaginationHandler:
def clear_purge() -> None:
del self._purges_by_id[purge_id]
- self.hs.get_reactor().callLater(24 * 3600, clear_purge)
+ self.hs.get_reactor().callLater(
+ PaginationHandler.CLEAR_PURGE_AFTER_MS / 1000, clear_purge
+ )
def get_purge_status(self, purge_id: str) -> Optional[PurgeStatus]:
"""Get the current status of an active purge
@@ -312,8 +381,25 @@ class PaginationHandler:
"""
return self._purges_by_id.get(purge_id)
+ def get_delete_status(self, delete_id: str) -> Optional[DeleteStatus]:
+ """Get the current status of an active deleting
+
+ Args:
+ delete_id: delete_id returned by start_shutdown_and_purge_room
+ """
+ return self._delete_by_id.get(delete_id)
+
+ def get_delete_ids_by_room(self, room_id: str) -> Optional[Collection[str]]:
+ """Get all active delete ids by room
+
+ Args:
+ room_id: room_id that is deleted
+ """
+ return self._delete_by_room.get(room_id)
+
async def purge_room(self, room_id: str, force: bool = False) -> None:
"""Purge the given room from the database.
+ This function is part the delete room v1 API.
Args:
room_id: room to be purged
@@ -472,3 +558,192 @@ class PaginationHandler:
)
return chunk
+
+ async def _shutdown_and_purge_room(
+ self,
+ delete_id: str,
+ room_id: str,
+ requester_user_id: str,
+ new_room_user_id: Optional[str] = None,
+ new_room_name: Optional[str] = None,
+ message: Optional[str] = None,
+ block: bool = False,
+ purge: bool = True,
+ force_purge: bool = False,
+ ) -> None:
+ """
+ Shuts down and purges a room.
+
+ See `RoomShutdownHandler.shutdown_room` for details of creation of the new room
+
+ Args:
+ delete_id: The ID for this delete.
+ room_id: The ID of the room to shut down.
+ requester_user_id:
+ User who requested the action. Will be recorded as putting the room on the
+ blocking list.
+ new_room_user_id:
+ If set, a new room will be created with this user ID
+ as the creator and admin, and all users in the old room will be
+ moved into that room. If not set, no new room will be created
+ and the users will just be removed from the old room.
+ new_room_name:
+ A string representing the name of the room that new users will
+ be invited to. Defaults to `Content Violation Notification`
+ message:
+ A string containing the first message that will be sent as
+ `new_room_user_id` in the new room. Ideally this will clearly
+ convey why the original room was shut down.
+ Defaults to `Sharing illegal content on this server is not
+ permitted and rooms in violation will be blocked.`
+ block:
+ If set to `true`, this room will be added to a blocking list,
+ preventing future attempts to join the room. Defaults to `false`.
+ purge:
+ If set to `true`, purge the given room from the database.
+ force_purge:
+ If set to `true`, the room will be purged from database
+ also if it fails to remove some users from room.
+
+ Saves a `RoomShutdownHandler.ShutdownRoomResponse` in `DeleteStatus`:
+ """
+
+ self._purges_in_progress_by_room.add(room_id)
+ try:
+ with await self.pagination_lock.write(room_id):
+ self._delete_by_id[delete_id].status = DeleteStatus.STATUS_SHUTTING_DOWN
+ self._delete_by_id[
+ delete_id
+ ].shutdown_room = await self._room_shutdown_handler.shutdown_room(
+ room_id=room_id,
+ requester_user_id=requester_user_id,
+ new_room_user_id=new_room_user_id,
+ new_room_name=new_room_name,
+ message=message,
+ block=block,
+ )
+ self._delete_by_id[delete_id].status = DeleteStatus.STATUS_PURGING
+
+ if purge:
+ logger.info("starting purge room_id %s", room_id)
+
+ # first check that we have no users in this room
+ if not force_purge:
+ joined = await self.store.is_host_joined(
+ room_id, self._server_name
+ )
+ if joined:
+ raise SynapseError(
+ 400, "Users are still joined to this room"
+ )
+
+ await self.storage.purge_events.purge_room(room_id)
+
+ logger.info("complete")
+ self._delete_by_id[delete_id].status = DeleteStatus.STATUS_COMPLETE
+ except Exception:
+ f = Failure()
+ logger.error(
+ "failed",
+ exc_info=(f.type, f.value, f.getTracebackObject()), # type: ignore
+ )
+ self._delete_by_id[delete_id].status = DeleteStatus.STATUS_FAILED
+ self._delete_by_id[delete_id].error = f.getErrorMessage()
+ finally:
+ self._purges_in_progress_by_room.discard(room_id)
+
+ # remove the delete from the list 24 hours after it completes
+ def clear_delete() -> None:
+ del self._delete_by_id[delete_id]
+ self._delete_by_room[room_id].remove(delete_id)
+ if not self._delete_by_room[room_id]:
+ del self._delete_by_room[room_id]
+
+ self.hs.get_reactor().callLater(
+ PaginationHandler.CLEAR_PURGE_AFTER_MS / 1000, clear_delete
+ )
+
+ def start_shutdown_and_purge_room(
+ self,
+ room_id: str,
+ requester_user_id: str,
+ new_room_user_id: Optional[str] = None,
+ new_room_name: Optional[str] = None,
+ message: Optional[str] = None,
+ block: bool = False,
+ purge: bool = True,
+ force_purge: bool = False,
+ ) -> str:
+ """Start off shut down and purge on a room.
+
+ Args:
+ room_id: The ID of the room to shut down.
+ requester_user_id:
+ User who requested the action and put the room on the
+ blocking list.
+ new_room_user_id:
+ If set, a new room will be created with this user ID
+ as the creator and admin, and all users in the old room will be
+ moved into that room. If not set, no new room will be created
+ and the users will just be removed from the old room.
+ new_room_name:
+ A string representing the name of the room that new users will
+ be invited to. Defaults to `Content Violation Notification`
+ message:
+ A string containing the first message that will be sent as
+ `new_room_user_id` in the new room. Ideally this will clearly
+ convey why the original room was shut down.
+ Defaults to `Sharing illegal content on this server is not
+ permitted and rooms in violation will be blocked.`
+ block:
+ If set to `true`, this room will be added to a blocking list,
+ preventing future attempts to join the room. Defaults to `false`.
+ purge:
+ If set to `true`, purge the given room from the database.
+ force_purge:
+ If set to `true`, the room will be purged from database
+ also if it fails to remove some users from room.
+
+ Returns:
+ unique ID for this delete transaction.
+ """
+ if room_id in self._purges_in_progress_by_room:
+ raise SynapseError(
+ 400, "History purge already in progress for %s" % (room_id,)
+ )
+
+ # This check is double to `RoomShutdownHandler.shutdown_room`
+ # But here the requester get a direct response / error with HTTP request
+ # and do not have to check the purge status
+ if new_room_user_id is not None:
+ if not self.hs.is_mine_id(new_room_user_id):
+ raise SynapseError(
+ 400, "User must be our own: %s" % (new_room_user_id,)
+ )
+
+ delete_id = random_string(16)
+
+ # we log the delete_id here so that it can be tied back to the
+ # request id in the log lines.
+ logger.info(
+ "starting shutdown room_id %s with delete_id %s",
+ room_id,
+ delete_id,
+ )
+
+ self._delete_by_id[delete_id] = DeleteStatus()
+ self._delete_by_room.setdefault(room_id, []).append(delete_id)
+ run_as_background_process(
+ "shutdown_and_purge_room",
+ self._shutdown_and_purge_room,
+ delete_id,
+ room_id,
+ requester_user_id,
+ new_room_user_id,
+ new_room_name,
+ message,
+ block,
+ purge,
+ force_purge,
+ )
+ return delete_id
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 11af30eee7..f9a099c4f3 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -1279,6 +1279,17 @@ class RoomEventSource(EventSource[RoomStreamToken, EventBase]):
class ShutdownRoomResponse(TypedDict):
+ """
+ Attributes:
+ kicked_users: An array of users (`user_id`) that were kicked.
+ failed_to_kick_users:
+ An array of users (`user_id`) that that were not kicked.
+ local_aliases:
+ An array of strings representing the local aliases that were
+ migrated from the old room to the new.
+ new_room_id: A string representing the room ID of the new room.
+ """
+
kicked_users: List[str]
failed_to_kick_users: List[str]
local_aliases: List[str]
@@ -1286,7 +1297,6 @@ class ShutdownRoomResponse(TypedDict):
class RoomShutdownHandler:
-
DEFAULT_MESSAGE = (
"Sharing illegal content on this server is not permitted and rooms in"
" violation will be blocked."
@@ -1299,7 +1309,6 @@ class RoomShutdownHandler:
self._room_creation_handler = hs.get_room_creation_handler()
self._replication = hs.get_replication_data_handler()
self.event_creation_handler = hs.get_event_creation_handler()
- self.state = hs.get_state_handler()
self.store = hs.get_datastore()
async def shutdown_room(
|