diff options
-rw-r--r-- | changelog.d/15488.feature | 1 | ||||
-rw-r--r-- | docs/usage/configuration/config_documentation.md | 19 | ||||
-rw-r--r-- | synapse/config/server.py | 9 | ||||
-rw-r--r-- | synapse/handlers/pagination.py | 491 | ||||
-rw-r--r-- | synapse/handlers/room.py | 250 | ||||
-rw-r--r-- | synapse/handlers/room_member.py | 24 | ||||
-rw-r--r-- | synapse/rest/admin/__init__.py | 7 | ||||
-rw-r--r-- | synapse/rest/admin/rooms.py | 67 | ||||
-rw-r--r-- | synapse/storage/databases/main/roommember.py | 108 | ||||
-rw-r--r-- | synapse/storage/schema/main/delta/78/03_rooms_to_delete.sql | 27 | ||||
-rw-r--r-- | tests/rest/admin/test_room.py | 134 | ||||
-rw-r--r-- | tests/rest/admin/test_server_notice.py | 21 | ||||
-rw-r--r-- | tests/rest/client/test_rooms.py | 3 |
13 files changed, 810 insertions, 351 deletions
diff --git a/changelog.d/15488.feature b/changelog.d/15488.feature new file mode 100644 index 0000000000..8684d84192 --- /dev/null +++ b/changelog.d/15488.feature @@ -0,0 +1 @@ +Add automatic purge after all users forgotten a room. Also add restore of purge/shutdown rooms after a synapse restart. diff --git a/docs/usage/configuration/config_documentation.md b/docs/usage/configuration/config_documentation.md index 22cd1772dc..69b4a6b035 100644 --- a/docs/usage/configuration/config_documentation.md +++ b/docs/usage/configuration/config_documentation.md @@ -923,7 +923,7 @@ allowed_avatar_mimetypes: ["image/png", "image/jpeg", "image/gif"] How long to keep redacted events in unredacted form in the database. After this period redacted events get replaced with their redacted form in the DB. -Synapse will check whether the rentention period has concluded for redacted +Synapse will check whether the retention period has concluded for redacted events every 5 minutes. Thus, even if this option is set to `0`, Synapse may still take up to 5 minutes to purge redacted events from the database. @@ -934,6 +934,23 @@ Example configuration: redaction_retention_period: 28d ``` --- +--- +### `purge_retention_period` + +How long to keep locally forgotten room in the DB. After this period the room +will be fully purged from the DB. + +Synapse will check whether the retention period has concluded for room +purges every hour. Thus, even if this option is set to `0`, Synapse may +still take up to one hour to purge forgotten rooms from the database. + +Defaults to `7d`. Set to `null` to disable. + +Example configuration: +```yaml +purge_retention_period: 28d +``` +--- ### `user_ips_max_age` How long to track users' last seen time and IPs in the database. diff --git a/synapse/config/server.py b/synapse/config/server.py index b46fa51593..cb93e56678 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -486,6 +486,15 @@ class ServerConfig(Config): else: self.redaction_retention_period = None + # How long to keep locally forgotten rooms before purging them. + purge_retention_period = config.get("purge_retention_period", "7d") + if purge_retention_period is not None: + self.purge_retention_period: Optional[int] = self.parse_duration( + purge_retention_period + ) + else: + self.purge_retention_period = None + # How long to keep entries in the `users_ips` table. user_ips_max_age = config.get("user_ips_max_age", "28d") if user_ips_max_age is not None: diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index 19b8728db9..982b38cc43 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -12,10 +12,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import json import logging -from typing import TYPE_CHECKING, Dict, List, Optional, Set - -import attr +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set from twisted.python.failure import Failure @@ -23,12 +22,12 @@ from synapse.api.constants import Direction, EventTypes, Membership from synapse.api.errors import SynapseError from synapse.api.filtering import Filter from synapse.events.utils import SerializeEventConfig -from synapse.handlers.room import ShutdownRoomResponse +from synapse.handlers.room import DeleteStatus, ShutdownRoomParams, ShutdownRoomResponse from synapse.logging.opentracing import trace from synapse.metrics.background_process_metrics import run_as_background_process from synapse.rest.admin._base import assert_user_is_admin from synapse.streams.config import PaginationConfig -from synapse.types import JsonDict, Requester, StrCollection, StreamKeyType +from synapse.types import JsonDict, Requester, StreamKeyType from synapse.types.state import StateFilter from synapse.util.async_helpers import ReadWriteLock from synapse.util.stringutils import random_string @@ -46,82 +45,6 @@ logger = logging.getLogger(__name__) BACKFILL_BECAUSE_TOO_MANY_GAPS_THRESHOLD = 3 -@attr.s(slots=True, auto_attribs=True) -class PurgeStatus: - """Object tracking the status of a purge request - - This class contains information on the progress of a purge request, for - return by get_purge_status. - """ - - STATUS_ACTIVE = 0 - STATUS_COMPLETE = 1 - STATUS_FAILED = 2 - - STATUS_TEXT = { - STATUS_ACTIVE: "active", - STATUS_COMPLETE: "complete", - STATUS_FAILED: "failed", - } - - # Save the error message if an error occurs - error: str = "" - - # Tracks whether this request has completed. One of STATUS_{ACTIVE,COMPLETE,FAILED}. - status: int = STATUS_ACTIVE - - def asdict(self) -> JsonDict: - ret = {"status": PurgeStatus.STATUS_TEXT[self.status]} - if self.error: - ret["error"] = self.error - return ret - - -@attr.s(slots=True, auto_attribs=True) -class DeleteStatus: - """Object tracking the status of a delete room request - - This class contains information on the progress of a delete room request, for - return by get_delete_status. - """ - - STATUS_PURGING = 0 - STATUS_COMPLETE = 1 - STATUS_FAILED = 2 - STATUS_SHUTTING_DOWN = 3 - - STATUS_TEXT = { - STATUS_PURGING: "purging", - STATUS_COMPLETE: "complete", - STATUS_FAILED: "failed", - STATUS_SHUTTING_DOWN: "shutting_down", - } - - # Tracks whether this request has completed. - # One of STATUS_{PURGING,COMPLETE,FAILED,SHUTTING_DOWN}. - status: int = STATUS_PURGING - - # Save the error message if an error occurs - error: str = "" - - # Saves the result of an action to give it back to REST API - shutdown_room: ShutdownRoomResponse = { - "kicked_users": [], - "failed_to_kick_users": [], - "local_aliases": [], - "new_room_id": None, - } - - def asdict(self) -> JsonDict: - ret = { - "status": DeleteStatus.STATUS_TEXT[self.status], - "shutdown_room": self.shutdown_room, - } - if self.error: - ret["error"] = self.error - return ret - - class PaginationHandler: """Handles pagination and purge history requests. @@ -132,6 +55,9 @@ class PaginationHandler: # when to remove a completed deletion/purge from the results map CLEAR_PURGE_AFTER_MS = 1000 * 3600 * 24 # 24 hours + # how often to run the purge rooms loop + PURGE_ROOMS_INTERVAL_MS = 1000 * 3600 # 1 hour + def __init__(self, hs: "HomeServer"): self.hs = hs self.auth = hs.get_auth() @@ -146,13 +72,6 @@ class PaginationHandler: self.pagination_lock = ReadWriteLock() # IDs of rooms in which there currently an active purge *or delete* operation. self._purges_in_progress_by_room: Set[str] = set() - # map from purge id to PurgeStatus - self._purges_by_id: Dict[str, PurgeStatus] = {} - # map from purge id to DeleteStatus - self._delete_by_id: Dict[str, DeleteStatus] = {} - # map from room id to delete ids - # Dict[`room_id`, List[`delete_id`]] - self._delete_by_room: Dict[str, List[str]] = {} self._event_serializer = hs.get_event_client_serializer() self._retention_default_max_lifetime = ( @@ -165,6 +84,7 @@ class PaginationHandler: self._retention_allowed_lifetime_max = ( hs.config.retention.retention_allowed_lifetime_max ) + self._purge_retention_period = hs.config.server.purge_retention_period self._is_master = hs.config.worker.worker_app is None if hs.config.retention.retention_enabled and self._is_master: @@ -181,6 +101,96 @@ class PaginationHandler: job.longest_max_lifetime, ) + if self._is_master: + self.clock.looping_call( + run_as_background_process, + PaginationHandler.PURGE_ROOMS_INTERVAL_MS, + "purge_rooms", + self.purge_rooms, + ) + + async def purge_rooms(self) -> None: + """This takes care of restoring unfinished purge/shutdown rooms from the DB. + It also takes care to launch scheduled ones, like rooms that has been fully + forgotten. + + It should be run regularly. + """ + rooms_to_delete = await self.store.get_rooms_to_delete() + for r in rooms_to_delete: + room_id = r["room_id"] + delete_id = r["delete_id"] + status = r["status"] + action = r["action"] + timestamp = r["timestamp"] + + if ( + status == DeleteStatus.STATUS_COMPLETE + or status == DeleteStatus.STATUS_FAILED + ): + # remove the delete from the list 24 hours after it completes or fails + ms_since_completed = self.clock.time_msec() - timestamp + if ms_since_completed >= PaginationHandler.CLEAR_PURGE_AFTER_MS: + await self.store.delete_room_to_delete(room_id, delete_id) + + continue + + if room_id in self._purges_in_progress_by_room: + # a delete background task is already running (or has run) + # for this room id, let's ignore it for now + continue + + # If the database says we were last in the middle of shutting down the room, + # let's continue the shutdown process. + shutdown_response = None + if ( + action == DeleteStatus.ACTION_SHUTDOWN + and status == DeleteStatus.STATUS_SHUTTING_DOWN + ): + shutdown_params = json.loads(r["params"]) + if r["response"]: + shutdown_response = json.loads(r["response"]) + await self._shutdown_and_purge_room( + room_id, + delete_id, + shutdown_params=shutdown_params, + shutdown_response=shutdown_response, + ) + continue + + # If the database says we were last in the middle of purging the room, + # let's continue the purge process. + if status == DeleteStatus.STATUS_PURGING: + purge_now = True + # Or if we're at or past the scheduled purge time, let's start that one as well + elif status == DeleteStatus.STATUS_SCHEDULED and ( + timestamp is None or self.clock.time_msec() >= timestamp + ): + purge_now = True + + # TODO 2 stages purge, keep memberships for a while so we don't "break" sync + if purge_now: + params = {} + if r["params"]: + params = json.loads(r["params"]) + + if action == DeleteStatus.ACTION_PURGE_HISTORY: + if "token" in params: + await self._purge_history( + delete_id, + room_id, + params["token"], + params.get("delete_local_events", False), + True, + ) + elif action == DeleteStatus.ACTION_PURGE: + await self.purge_room( + room_id, + delete_id, + params.get("force", False), + shutdown_response=shutdown_response, + ) + async def purge_history_for_rooms_in_range( self, min_ms: Optional[int], max_ms: Optional[int] ) -> None: @@ -289,8 +299,6 @@ class PaginationHandler: purge_id = random_string(16) - self._purges_by_id[purge_id] = PurgeStatus() - logger.info( "Starting purging events in room %s (purge_id %s)" % (room_id, purge_id) ) @@ -305,9 +313,10 @@ class PaginationHandler: room_id, token, True, + False, ) - def start_purge_history( + async def start_purge_history( self, room_id: str, token: str, delete_local_events: bool = False ) -> str: """Start off a history purge on a room. @@ -332,7 +341,16 @@ class PaginationHandler: # request id in the log lines. logger.info("[purge] starting purge_id %s", purge_id) - self._purges_by_id[purge_id] = PurgeStatus() + await self.store.upsert_room_to_delete( + room_id, + purge_id, + DeleteStatus.ACTION_PURGE_HISTORY, + DeleteStatus.STATUS_PURGING, + params=json.dumps( + {"token": token, "delete_local_events": delete_local_events} + ), + ) + run_as_background_process( "purge_history", self._purge_history, @@ -340,11 +358,17 @@ class PaginationHandler: room_id, token, delete_local_events, + True, ) return purge_id async def _purge_history( - self, purge_id: str, room_id: str, token: str, delete_local_events: bool + self, + purge_id: str, + room_id: str, + token: str, + delete_local_events: bool, + update_rooms_to_delete_table: bool, ) -> None: """Carry out a history purge on a room. @@ -353,6 +377,10 @@ class PaginationHandler: room_id: The room to purge from token: topological token to delete events before delete_local_events: True to delete local events as well as remote ones + update_rooms_to_delete_table: True if we don't want to update/persist this + purge history action to the DB to be restorable. Used with the retention + functionality since we don't need to explicitly restore those, they + will be relaunch by the retention logic. """ self._purges_in_progress_by_room.add(room_id) try: @@ -361,66 +389,129 @@ class PaginationHandler: room_id, token, delete_local_events ) logger.info("[purge] complete") - self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE + if update_rooms_to_delete_table: + await self.store.upsert_room_to_delete( + room_id, + purge_id, + DeleteStatus.ACTION_PURGE_HISTORY, + DeleteStatus.STATUS_COMPLETE, + timestamp=self.clock.time_msec(), + ) except Exception: f = Failure() logger.error( "[purge] failed", exc_info=(f.type, f.value, f.getTracebackObject()) ) - self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED - self._purges_by_id[purge_id].error = f.getErrorMessage() + if update_rooms_to_delete_table: + await self.store.upsert_room_to_delete( + room_id, + purge_id, + DeleteStatus.ACTION_PURGE_HISTORY, + DeleteStatus.STATUS_FAILED, + error=f.getErrorMessage(), + timestamp=self.clock.time_msec(), + ) finally: self._purges_in_progress_by_room.discard(room_id) - # remove the purge from the list 24 hours after it completes - def clear_purge() -> None: - del self._purges_by_id[purge_id] + if update_rooms_to_delete_table: + # remove the purge from the list 24 hours after it completes + async def clear_purge() -> None: + await self.store.delete_room_to_delete(room_id, purge_id) - self.hs.get_reactor().callLater( - PaginationHandler.CLEAR_PURGE_AFTER_MS / 1000, clear_purge - ) + self.hs.get_reactor().callLater( + PaginationHandler.CLEAR_PURGE_AFTER_MS / 1000, clear_purge + ) - def get_purge_status(self, purge_id: str) -> Optional[PurgeStatus]: - """Get the current status of an active purge + @staticmethod + def _convert_to_delete_status(res: Dict[str, Any]) -> DeleteStatus: + status = DeleteStatus() + status.delete_id = res["delete_id"] + status.action = res["action"] + status.status = res["status"] + if "error" in res: + status.error = res["error"] - Args: - purge_id: purge_id returned by start_purge_history - """ - return self._purges_by_id.get(purge_id) + if status.action == DeleteStatus.ACTION_SHUTDOWN and res["response"]: + status.shutdown_room = json.loads(res["response"]) + + return status - def get_delete_status(self, delete_id: str) -> Optional[DeleteStatus]: + async def get_delete_status(self, delete_id: str) -> Optional[DeleteStatus]: """Get the current status of an active deleting Args: delete_id: delete_id returned by start_shutdown_and_purge_room + or start_purge_history. """ - return self._delete_by_id.get(delete_id) + res = await self.store.get_room_to_delete(delete_id) + if res: + return PaginationHandler._convert_to_delete_status(res) + return None - def get_delete_ids_by_room(self, room_id: str) -> Optional[StrCollection]: - """Get all active delete ids by room + async def get_delete_statuses_by_room(self, room_id: str) -> List[DeleteStatus]: + """Get all active delete statuses by room Args: room_id: room_id that is deleted """ - return self._delete_by_room.get(room_id) + res = await self.store.get_rooms_to_delete(room_id) + return [PaginationHandler._convert_to_delete_status(r) for r in res] - async def purge_room(self, room_id: str, force: bool = False) -> None: + async def purge_room( + self, + room_id: str, + delete_id: str, + force: bool = False, + shutdown_response: Optional[ShutdownRoomResponse] = None, + ) -> None: """Purge the given room from the database. - This function is part the delete room v1 API. Args: room_id: room to be purged + delete_id: the delete ID for this purge force: set true to skip checking for joined users. + shutdown_response: optional response coming from the shutdown phase """ + logger.info("starting purge room_id=%s force=%s", room_id, force) + + action = DeleteStatus.ACTION_PURGE + if shutdown_response: + action = DeleteStatus.ACTION_SHUTDOWN + async with self.pagination_lock.write(room_id): # first check that we have no users in this room - if not force: - joined = await self.store.is_host_joined(room_id, self._server_name) - if joined: + joined = await self.store.is_host_joined(room_id, self._server_name) + if joined: + if force: + logger.info( + "force-purging room %s with some local users still joined", + room_id, + ) + else: raise SynapseError(400, "Users are still joined to this room") + await self.store.upsert_room_to_delete( + room_id, + delete_id, + action, + DeleteStatus.STATUS_PURGING, + response=json.dumps(shutdown_response), + ) + await self._storage_controllers.purge_events.purge_room(room_id) + await self.store.upsert_room_to_delete( + room_id, + delete_id, + action, + DeleteStatus.STATUS_COMPLETE, + timestamp=self.clock.time_msec(), + response=json.dumps(shutdown_response), + ) + + logger.info("purge complete for room_id %s", room_id) + @trace async def get_messages( self, @@ -698,15 +789,10 @@ class PaginationHandler: async def _shutdown_and_purge_room( self, - delete_id: str, room_id: str, - requester_user_id: str, - new_room_user_id: Optional[str] = None, - new_room_name: Optional[str] = None, - message: Optional[str] = None, - block: bool = False, - purge: bool = True, - force_purge: bool = False, + delete_id: str, + shutdown_params: ShutdownRoomParams, + shutdown_response: Optional[ShutdownRoomResponse] = None, ) -> None: """ Shuts down and purges a room. @@ -716,142 +802,75 @@ class PaginationHandler: Args: delete_id: The ID for this delete. room_id: The ID of the room to shut down. - requester_user_id: - User who requested the action. Will be recorded as putting the room on the - blocking list. - new_room_user_id: - If set, a new room will be created with this user ID - as the creator and admin, and all users in the old room will be - moved into that room. If not set, no new room will be created - and the users will just be removed from the old room. - new_room_name: - A string representing the name of the room that new users will - be invited to. Defaults to `Content Violation Notification` - message: - A string containing the first message that will be sent as - `new_room_user_id` in the new room. Ideally this will clearly - convey why the original room was shut down. - Defaults to `Sharing illegal content on this server is not - permitted and rooms in violation will be blocked.` - block: - If set to `true`, this room will be added to a blocking list, - preventing future attempts to join the room. Defaults to `false`. - purge: - If set to `true`, purge the given room from the database. - force_purge: - If set to `true`, the room will be purged from database - also if it fails to remove some users from room. - - Saves a `RoomShutdownHandler.ShutdownRoomResponse` in `DeleteStatus`: + shutdown_params: parameters for the shutdown, cf `RoomShutdownHandler.ShutdownRoomParams` + shutdown_response: current status of the shutdown, if it was interrupted + + Keeps track of the `DeleteStatus` (and `ShutdownRoomResponse`) in `self._delete_by_id` and persisted in DB """ self._purges_in_progress_by_room.add(room_id) try: - async with self.pagination_lock.write(room_id): - self._delete_by_id[delete_id].status = DeleteStatus.STATUS_SHUTTING_DOWN - self._delete_by_id[ - delete_id - ].shutdown_room = await self._room_shutdown_handler.shutdown_room( - room_id=room_id, - requester_user_id=requester_user_id, - new_room_user_id=new_room_user_id, - new_room_name=new_room_name, - message=message, - block=block, - ) - self._delete_by_id[delete_id].status = DeleteStatus.STATUS_PURGING - - if purge: - logger.info("starting purge room_id %s", room_id) - - # first check that we have no users in this room - if not force_purge: - joined = await self.store.is_host_joined( - room_id, self._server_name - ) - if joined: - raise SynapseError( - 400, "Users are still joined to this room" - ) + shutdown_response = await self._room_shutdown_handler.shutdown_room( + room_id=room_id, + delete_id=delete_id, + shutdown_params=shutdown_params, + shutdown_response=shutdown_response, + ) - await self._storage_controllers.purge_events.purge_room(room_id) + if shutdown_params["purge"]: + await self.purge_room( + room_id, + delete_id, + shutdown_params["force_purge"], + shutdown_response=shutdown_response, + ) - logger.info("purge complete for room_id %s", room_id) - self._delete_by_id[delete_id].status = DeleteStatus.STATUS_COMPLETE + await self.store.upsert_room_to_delete( + room_id, + delete_id, + DeleteStatus.ACTION_SHUTDOWN, + DeleteStatus.STATUS_COMPLETE, + timestamp=self.clock.time_msec(), + response=json.dumps(shutdown_response), + ) except Exception: f = Failure() logger.error( "failed", exc_info=(f.type, f.value, f.getTracebackObject()), ) - self._delete_by_id[delete_id].status = DeleteStatus.STATUS_FAILED - self._delete_by_id[delete_id].error = f.getErrorMessage() + await self.store.upsert_room_to_delete( + room_id, + delete_id, + DeleteStatus.ACTION_SHUTDOWN, + DeleteStatus.STATUS_FAILED, + timestamp=self.clock.time_msec(), + error=f.getErrorMessage(), + ) finally: self._purges_in_progress_by_room.discard(room_id) - # remove the delete from the list 24 hours after it completes - def clear_delete() -> None: - del self._delete_by_id[delete_id] - self._delete_by_room[room_id].remove(delete_id) - if not self._delete_by_room[room_id]: - del self._delete_by_room[room_id] - - self.hs.get_reactor().callLater( - PaginationHandler.CLEAR_PURGE_AFTER_MS / 1000, clear_delete - ) - def start_shutdown_and_purge_room( self, room_id: str, - requester_user_id: str, - new_room_user_id: Optional[str] = None, - new_room_name: Optional[str] = None, - message: Optional[str] = None, - block: bool = False, - purge: bool = True, - force_purge: bool = False, + shutdown_params: ShutdownRoomParams, ) -> str: """Start off shut down and purge on a room. Args: room_id: The ID of the room to shut down. - requester_user_id: - User who requested the action and put the room on the - blocking list. - new_room_user_id: - If set, a new room will be created with this user ID - as the creator and admin, and all users in the old room will be - moved into that room. If not set, no new room will be created - and the users will just be removed from the old room. - new_room_name: - A string representing the name of the room that new users will - be invited to. Defaults to `Content Violation Notification` - message: - A string containing the first message that will be sent as - `new_room_user_id` in the new room. Ideally this will clearly - convey why the original room was shut down. - Defaults to `Sharing illegal content on this server is not - permitted and rooms in violation will be blocked.` - block: - If set to `true`, this room will be added to a blocking list, - preventing future attempts to join the room. Defaults to `false`. - purge: - If set to `true`, purge the given room from the database. - force_purge: - If set to `true`, the room will be purged from database - also if it fails to remove some users from room. + shutdown_params: parameters for the shutdown, cf `RoomShutdownHandler.ShutdownRoomParams` Returns: unique ID for this delete transaction. """ if room_id in self._purges_in_progress_by_room: - raise SynapseError( - 400, "History purge already in progress for %s" % (room_id,) - ) + raise SynapseError(400, "Purge already in progress for %s" % (room_id,)) # This check is double to `RoomShutdownHandler.shutdown_room` # But here the requester get a direct response / error with HTTP request # and do not have to check the purge status + new_room_user_id = shutdown_params["new_room_user_id"] if new_room_user_id is not None: if not self.hs.is_mine_id(new_room_user_id): raise SynapseError( @@ -868,19 +887,11 @@ class PaginationHandler: delete_id, ) - self._delete_by_id[delete_id] = DeleteStatus() - self._delete_by_room.setdefault(room_id, []).append(delete_id) run_as_background_process( "shutdown_and_purge_room", self._shutdown_and_purge_room, - delete_id, room_id, - requester_user_id, - new_room_user_id, - new_room_name, - message, - block, - purge, - force_purge, + delete_id, + shutdown_params, ) return delete_id diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index bf907b7881..f7f9d9d2f5 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -14,6 +14,7 @@ """Contains functions for performing actions on rooms.""" import itertools +import json import logging import math import random @@ -54,7 +55,6 @@ from synapse.events import EventBase from synapse.events.snapshot import UnpersistedEventContext from synapse.events.utils import copy_and_fixup_power_levels_contents from synapse.handlers.relations import BundledAggregations -from synapse.module_api import NOT_SPAM from synapse.rest.admin._base import assert_user_is_admin from synapse.streams import EventSource from synapse.types import ( @@ -454,7 +454,7 @@ class RoomCreationHandler: spam_check = await self._spam_checker_module_callbacks.user_may_create_room( user_id ) - if spam_check != NOT_SPAM: + if spam_check != self._spam_checker_module_callbacks.NOT_SPAM: raise SynapseError( 403, "You are not permitted to create rooms", @@ -768,7 +768,7 @@ class RoomCreationHandler: spam_check = await self._spam_checker_module_callbacks.user_may_create_room( user_id ) - if spam_check != NOT_SPAM: + if spam_check != self._spam_checker_module_callbacks.NOT_SPAM: raise SynapseError( 403, "You are not permitted to create rooms", @@ -1750,6 +1750,45 @@ class RoomEventSource(EventSource[RoomStreamToken, EventBase]): return self.store.get_current_room_stream_token_for_room_id(room_id) +class ShutdownRoomParams(TypedDict): + """ + Attributes: + requester_user_id: + User who requested the action. Will be recorded as putting the room on the + blocking list. + new_room_user_id: + If set, a new room will be created with this user ID + as the creator and admin, and all users in the old room will be + moved into that room. If not set, no new room will be created + and the users will just be removed from the old room. + new_room_name: + A string representing the name of the room that new users will + be invited to. Defaults to `Content Violation Notification` + message: + A string containing the first message that will be sent as + `new_room_user_id` in the new room. Ideally this will clearly + convey why the original room was shut down. + Defaults to `Sharing illegal content on this server is not + permitted and rooms in violation will be blocked.` + block: + If set to `true`, this room will be added to a blocking list, + preventing future attempts to join the room. Defaults to `false`. + purge: + If set to `true`, purge the given room from the database. + force_purge: + If set to `true`, the room will be purged from database + even if there are still users joined to the room. + """ + + requester_user_id: str + new_room_user_id: Optional[str] + new_room_name: Optional[str] + message: Optional[str] + block: bool + purge: bool + force_purge: bool + + class ShutdownRoomResponse(TypedDict): """ Attributes: @@ -1768,6 +1807,63 @@ class ShutdownRoomResponse(TypedDict): new_room_id: Optional[str] +@attr.s(slots=True, auto_attribs=True) +class DeleteStatus: + """Object tracking the status of a delete room request + + This class contains information on the progress of a delete room request, for + return by get_delete_status. + """ + + ACTION_SHUTDOWN = "shutdown" + ACTION_PURGE = "purge" + ACTION_PURGE_HISTORY = "purge_history" + + # Scheduled delete waiting to be launch at a specific time + STATUS_SCHEDULED = "scheduled" + STATUS_SHUTTING_DOWN = "shutting_down" + STATUS_PURGING = "purging" + STATUS_COMPLETE = "complete" + STATUS_FAILED = "failed" + + delete_id: str = "" + + action: str = ACTION_PURGE + + # Tracks whether this request has completed. + # One of STATUS_{PURGING,COMPLETE,FAILED,SHUTTING_DOWN,WAIT_PURGE}. + status: str = STATUS_PURGING + + # Save the error message if an error occurs + error: str = "" + + # Saves the result of an action to give it back to REST API + shutdown_room: ShutdownRoomResponse = { + "kicked_users": [], + "failed_to_kick_users": [], + "local_aliases": [], + "new_room_id": None, + } + + def asdict(self, use_purge_history_format: bool = False) -> JsonDict: + if not use_purge_history_format: + ret = { + "delete_id": self.delete_id, + "status": self.status, + "shutdown_room": self.shutdown_room, + } + else: + ret = { + "status": self.status + if self.status == DeleteStatus.STATUS_COMPLETE + or self.status == DeleteStatus.STATUS_FAILED + else "active", + } + if self.error: + ret["error"] = self.error + return ret + + class RoomShutdownHandler: DEFAULT_MESSAGE = ( "Sharing illegal content on this server is not permitted and rooms in" @@ -1787,11 +1883,9 @@ class RoomShutdownHandler: async def shutdown_room( self, room_id: str, - requester_user_id: str, - new_room_user_id: Optional[str] = None, - new_room_name: Optional[str] = None, - message: Optional[str] = None, - block: bool = False, + delete_id: str, + shutdown_params: ShutdownRoomParams, + shutdown_response: Optional[ShutdownRoomResponse] = None, ) -> ShutdownRoomResponse: """ Shuts down a room. Moves all local users and room aliases automatically @@ -1808,48 +1902,27 @@ class RoomShutdownHandler: Args: room_id: The ID of the room to shut down. - requester_user_id: - User who requested the action and put the room on the - blocking list. - new_room_user_id: - If set, a new room will be created with this user ID - as the creator and admin, and all users in the old room will be - moved into that room. If not set, no new room will be created - and the users will just be removed from the old room. - new_room_name: - A string representing the name of the room that new users will - be invited to. Defaults to `Content Violation Notification` - message: - A string containing the first message that will be sent as - `new_room_user_id` in the new room. Ideally this will clearly - convey why the original room was shut down. - Defaults to `Sharing illegal content on this server is not - permitted and rooms in violation will be blocked.` - block: - If set to `True`, users will be prevented from joining the old - room. This option can also be used to pre-emptively block a room, - even if it's unknown to this homeserver. In this case, the room - will be blocked, and no further action will be taken. If `False`, - attempting to delete an unknown room is invalid. - - Defaults to `False`. - - Returns: a dict containing the following keys: - kicked_users: An array of users (`user_id`) that were kicked. - failed_to_kick_users: - An array of users (`user_id`) that that were not kicked. - local_aliases: - An array of strings representing the local aliases that were - migrated from the old room to the new. - new_room_id: - A string representing the room ID of the new room, or None if - no such room was created. + delete_id: The delete ID identifying this delete request + shutdown_params: parameters for the shutdown, cf `ShutdownRoomParams` + shutdown_response: current status of the shutdown, if it was interrupted + + Returns: a dict matching `ShutdownRoomResponse`. """ - if not new_room_name: - new_room_name = self.DEFAULT_ROOM_NAME - if not message: - message = self.DEFAULT_MESSAGE + requester_user_id = shutdown_params["requester_user_id"] + new_room_user_id = shutdown_params["new_room_user_id"] + block = shutdown_params["block"] + + new_room_name = ( + shutdown_params["new_room_name"] + if shutdown_params["new_room_name"] + else self.DEFAULT_ROOM_NAME + ) + message = ( + shutdown_params["message"] + if shutdown_params["message"] + else self.DEFAULT_MESSAGE + ) if not RoomID.is_valid(room_id): raise SynapseError(400, "%s is not a legal room ID" % (room_id,)) @@ -1861,6 +1934,23 @@ class RoomShutdownHandler: 403, "Shutdown of this room is forbidden", Codes.FORBIDDEN ) + if not shutdown_response: + shutdown_response = { + "kicked_users": [], + "failed_to_kick_users": [], + "local_aliases": [], + "new_room_id": None, + } + + await self.store.upsert_room_to_delete( + room_id, + delete_id, + DeleteStatus.ACTION_SHUTDOWN, + DeleteStatus.STATUS_SHUTTING_DOWN, + params=json.dumps(shutdown_params), + response=json.dumps(shutdown_response), + ) + # Action the block first (even if the room doesn't exist yet) if block: # This will work even if the room is already blocked, but that is @@ -1869,14 +1959,10 @@ class RoomShutdownHandler: if not await self.store.get_room(room_id): # if we don't know about the room, there is nothing left to do. - return { - "kicked_users": [], - "failed_to_kick_users": [], - "local_aliases": [], - "new_room_id": None, - } + return shutdown_response - if new_room_user_id is not None: + new_room_id = shutdown_response.get("new_room_id") + if new_room_user_id is not None and new_room_id is None: if not self.hs.is_mine_id(new_room_user_id): raise SynapseError( 400, "User must be our own: %s" % (new_room_user_id,) @@ -1896,6 +1982,16 @@ class RoomShutdownHandler: ratelimit=False, ) + shutdown_response["new_room_id"] = new_room_id + await self.store.upsert_room_to_delete( + room_id, + delete_id, + DeleteStatus.ACTION_SHUTDOWN, + DeleteStatus.STATUS_SHUTTING_DOWN, + params=json.dumps(shutdown_params), + response=json.dumps(shutdown_response), + ) + logger.info( "Shutting down room %r, joining to new room: %r", room_id, new_room_id ) @@ -1909,12 +2005,9 @@ class RoomShutdownHandler: stream_id, ) else: - new_room_id = None logger.info("Shutting down room %r", room_id) users = await self.store.get_users_in_room(room_id) - kicked_users = [] - failed_to_kick_users = [] for user_id in users: if not self.hs.is_mine_id(user_id): continue @@ -1943,7 +2036,9 @@ class RoomShutdownHandler: stream_id, ) - await self.room_member_handler.forget(target_requester.user, room_id) + await self.room_member_handler.forget( + target_requester.user, room_id, do_not_schedule_purge=True + ) # Join users to new room if new_room_user_id: @@ -1958,15 +2053,35 @@ class RoomShutdownHandler: require_consent=False, ) - kicked_users.append(user_id) + shutdown_response["kicked_users"].append(user_id) + await self.store.upsert_room_to_delete( + room_id, + delete_id, + DeleteStatus.ACTION_SHUTDOWN, + DeleteStatus.STATUS_SHUTTING_DOWN, + params=json.dumps(shutdown_params), + response=json.dumps(shutdown_response), + ) except Exception: logger.exception( "Failed to leave old room and join new room for %r", user_id ) - failed_to_kick_users.append(user_id) + shutdown_response["failed_to_kick_users"].append(user_id) + await self.store.upsert_room_to_delete( + room_id, + delete_id, + DeleteStatus.ACTION_SHUTDOWN, + DeleteStatus.STATUS_SHUTTING_DOWN, + params=json.dumps(shutdown_params), + response=json.dumps(shutdown_response), + ) # Send message in new room and move aliases if new_room_user_id: + room_creator_requester = create_requester( + new_room_user_id, authenticated_entity=requester_user_id + ) + await self.event_creation_handler.create_and_send_nonmember_event( room_creator_requester, { @@ -1978,18 +2093,15 @@ class RoomShutdownHandler: ratelimit=False, ) - aliases_for_room = await self.store.get_aliases_for_room(room_id) + shutdown_response["local_aliases"] = list( + await self.store.get_aliases_for_room(room_id) + ) assert new_room_id is not None await self.store.update_aliases_for_room( room_id, new_room_id, requester_user_id ) else: - aliases_for_room = [] + shutdown_response["local_aliases"] = [] - return { - "kicked_users": kicked_users, - "failed_to_kick_users": failed_to_kick_users, - "local_aliases": list(aliases_for_room), - "new_room_id": new_room_id, - } + return shutdown_response diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 82e4fa7363..e3147924f7 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -38,6 +38,7 @@ from synapse.event_auth import get_named_level, get_power_level_event from synapse.events import EventBase from synapse.events.snapshot import EventContext from synapse.handlers.profile import MAX_AVATAR_URL_LEN, MAX_DISPLAYNAME_LEN +from synapse.handlers.room import DeleteStatus from synapse.handlers.state_deltas import MatchChange, StateDeltasHandler from synapse.logging import opentracing from synapse.metrics import event_processing_positions @@ -56,6 +57,7 @@ from synapse.types import ( from synapse.types.state import StateFilter from synapse.util.async_helpers import Linearizer from synapse.util.distributor import user_left_room +from synapse.util.stringutils import random_string if TYPE_CHECKING: from synapse.server import HomeServer @@ -176,6 +178,8 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): self._msc3970_enabled = hs.config.experimental.msc3970_enabled + self._purge_retention_period = hs.config.server.purge_retention_period + def _on_user_joined_room(self, event_id: str, room_id: str) -> None: """Notify the rate limiter that a room join has occurred. @@ -285,7 +289,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): """ raise NotImplementedError() - async def forget(self, user: UserID, room_id: str) -> None: + async def forget( + self, user: UserID, room_id: str, do_not_schedule_purge: bool = False + ) -> None: user_id = user.to_string() member = await self._storage_controllers.state.get_current_state_event( @@ -305,6 +311,22 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): # the table `current_state_events` and `get_current_state_events` is `None`. await self.store.forget(user_id, room_id) + # If everyone locally has left the room, then there is no reason for us to keep the + # room around and we automatically purge room after a little bit + if ( + not do_not_schedule_purge + and self._purge_retention_period + and await self.store.is_locally_forgotten_room(room_id) + ): + delete_id = random_string(16) + await self.store.upsert_room_to_delete( + room_id, + delete_id, + DeleteStatus.ACTION_PURGE, + DeleteStatus.STATUS_SCHEDULED, + timestamp=self.clock.time_msec() + self._purge_retention_period, + ) + async def ratelimit_multiple_invites( self, requester: Optional[Requester], diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py index fe8177ed4d..0cabdd1dc6 100644 --- a/synapse/rest/admin/__init__.py +++ b/synapse/rest/admin/__init__.py @@ -196,7 +196,7 @@ class PurgeHistoryRestServlet(RestServlet): errcode=Codes.BAD_JSON, ) - purge_id = self.pagination_handler.start_purge_history( + purge_id = await self.pagination_handler.start_purge_history( room_id, token, delete_local_events=delete_local_events ) @@ -215,11 +215,12 @@ class PurgeHistoryStatusRestServlet(RestServlet): ) -> Tuple[int, JsonDict]: await assert_requester_is_admin(self.auth, request) - purge_status = self.pagination_handler.get_purge_status(purge_id) + purge_status = await self.pagination_handler.get_delete_status(purge_id) if purge_status is None: raise NotFoundError("purge id '%s' not found" % purge_id) - return HTTPStatus.OK, purge_status.asdict() + # TODO active vs purging etc + return HTTPStatus.OK, purge_status.asdict(use_purge_history_format=True) ######################################################################################## diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py index 1d65560265..9e31d018b1 100644 --- a/synapse/rest/admin/rooms.py +++ b/synapse/rest/admin/rooms.py @@ -19,6 +19,7 @@ from urllib import parse as urlparse from synapse.api.constants import Direction, EventTypes, JoinRules, Membership from synapse.api.errors import AuthError, Codes, NotFoundError, SynapseError from synapse.api.filtering import Filter +from synapse.handlers.room import DeleteStatus from synapse.http.servlet import ( ResolveRoomIdMixin, RestServlet, @@ -39,6 +40,7 @@ from synapse.streams.config import PaginationConfig from synapse.types import JsonDict, RoomID, UserID, create_requester from synapse.types.state import StateFilter from synapse.util import json_decoder +from synapse.util.stringutils import random_string if TYPE_CHECKING: from synapse.api.auth import Auth @@ -119,13 +121,15 @@ class RoomRestV2Servlet(RestServlet): delete_id = self._pagination_handler.start_shutdown_and_purge_room( room_id=room_id, - new_room_user_id=content.get("new_room_user_id"), - new_room_name=content.get("room_name"), - message=content.get("message"), - requester_user_id=requester.user.to_string(), - block=block, - purge=purge, - force_purge=force_purge, + shutdown_params={ + "new_room_user_id": content.get("new_room_user_id"), + "new_room_name": content.get("room_name"), + "message": content.get("message"), + "requester_user_id": requester.user.to_string(), + "block": block, + "purge": purge, + "force_purge": force_purge, + }, ) return HTTPStatus.OK, {"delete_id": delete_id} @@ -150,21 +154,21 @@ class DeleteRoomStatusByRoomIdRestServlet(RestServlet): HTTPStatus.BAD_REQUEST, "%s is not a legal room ID" % (room_id,) ) - delete_ids = self._pagination_handler.get_delete_ids_by_room(room_id) - if delete_ids is None: - raise NotFoundError("No delete task for room_id '%s' found" % room_id) + delete_statuses = await self._pagination_handler.get_delete_statuses_by_room( + room_id + ) response = [] - for delete_id in delete_ids: - delete = self._pagination_handler.get_delete_status(delete_id) - if delete: - response += [ - { - "delete_id": delete_id, - **delete.asdict(), - } - ] - return HTTPStatus.OK, {"results": cast(JsonDict, response)} + for delete_status in delete_statuses: + # We ignore scheduled deletes because currently they are only used + # for automatically purging forgotten room after X time. + if delete_status.status != DeleteStatus.STATUS_SCHEDULED: + response += [delete_status.asdict()] + + if response: + return HTTPStatus.OK, {"results": cast(JsonDict, response)} + else: + raise NotFoundError("No delete task for room_id '%s' found" % room_id) class DeleteRoomStatusByDeleteIdRestServlet(RestServlet): @@ -181,7 +185,7 @@ class DeleteRoomStatusByDeleteIdRestServlet(RestServlet): ) -> Tuple[int, JsonDict]: await assert_requester_is_admin(self._auth, request) - delete_status = self._pagination_handler.get_delete_status(delete_id) + delete_status = await self._pagination_handler.get_delete_status(delete_id) if delete_status is None: raise NotFoundError("delete id '%s' not found" % delete_id) @@ -347,19 +351,28 @@ class RoomRestServlet(RestServlet): Codes.BAD_JSON, ) + delete_id = random_string(16) + ret = await room_shutdown_handler.shutdown_room( room_id=room_id, - new_room_user_id=content.get("new_room_user_id"), - new_room_name=content.get("room_name"), - message=content.get("message"), - requester_user_id=requester.user.to_string(), - block=block, + delete_id=delete_id, + shutdown_params={ + "new_room_user_id": content.get("new_room_user_id"), + "new_room_name": content.get("room_name"), + "message": content.get("message"), + "requester_user_id": requester.user.to_string(), + "block": block, + "purge": purge, + "force_purge": force_purge, + }, ) # Purge room if purge: try: - await pagination_handler.purge_room(room_id, force=force_purge) + await pagination_handler.purge_room( + room_id, delete_id, force=force_purge + ) except NotFoundError: if block: # We can block unknown rooms with this endpoint, in which case diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 582875c91a..2c0b573d4c 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -17,6 +17,7 @@ from itertools import chain from typing import ( TYPE_CHECKING, AbstractSet, + Any, Collection, Dict, FrozenSet, @@ -1283,6 +1284,113 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): # If any rows still exist it means someone has not forgotten this room yet return not rows[0][0] + async def upsert_room_to_delete( + self, + room_id: str, + delete_id: str, + action: str, + status: str, + timestamp: Optional[int] = None, + params: Optional[str] = None, + response: Optional[str] = None, + error: Optional[str] = None, + ) -> None: + """Insert or update a room to shutdown/purge. + + Args: + room_id: The room ID to shutdown/purge + delete_id: The delete ID identifying this action + action: the type of job, mainly `shutdown` `purge` or `purge_history` + status: Current status of the delete. Cf `DeleteStatus` for possible values + timestamp: Time of the last update. If status is `wait_purge`, + then it specifies when to do the purge, with an empty value specifying ASAP + error: Error message to return, if any + params: JSON representation of delete job parameters + response: JSON representation of delete current status + """ + await self.db_pool.simple_upsert( + "rooms_to_delete", + { + "room_id": room_id, + "delete_id": delete_id, + }, + { + "action": action, + "status": status, + "timestamp": timestamp, + "params": params, + "response": response, + "error": error, + }, + desc="upsert_room_to_delete", + ) + + async def delete_room_to_delete(self, room_id: str, delete_id: str) -> None: + """Remove a room from the list of rooms to purge. + + Args: + room_id: The room ID matching the delete to remove + delete_id: The delete ID identifying the delete to remove + """ + + await self.db_pool.simple_delete( + "rooms_to_delete", + keyvalues={ + "room_id": room_id, + "delete_id": delete_id, + }, + desc="delete_room_to_delete", + ) + + async def get_rooms_to_delete( + self, room_id: Optional[str] = None + ) -> List[Dict[str, Any]]: + """Returns all delete jobs. This includes those that have been + interrupted by a stop/restart of synapse, but also scheduled ones + like locally forgotten rooms. + + Args: + room_id: if specified, will only return the delete jobs for a specific room + + """ + keyvalues = {} + if room_id is not None: + keyvalues["room_id"] = room_id + + return await self.db_pool.simple_select_list( + table="rooms_to_delete", + keyvalues=keyvalues, + retcols=( + "room_id", + "delete_id", + "action", + "status", + "timestamp", + "params", + "response", + "error", + ), + desc="rooms_to_delete_fetch", + ) + + async def get_room_to_delete(self, delete_id: str) -> Optional[Dict[str, Any]]: + """Return the delete job identified by delete_id.""" + return await self.db_pool.simple_select_one( + table="rooms_to_delete", + keyvalues={"delete_id": delete_id}, + retcols=( + "room_id", + "delete_id", + "action", + "status", + "timestamp", + "params", + "response", + "error", + ), + desc="rooms_to_delete_fetch", + ) + async def get_rooms_user_has_been_in(self, user_id: str) -> Set[str]: """Get all rooms that the user has ever been in. diff --git a/synapse/storage/schema/main/delta/78/03_rooms_to_delete.sql b/synapse/storage/schema/main/delta/78/03_rooms_to_delete.sql new file mode 100644 index 0000000000..8f9c8c7010 --- /dev/null +++ b/synapse/storage/schema/main/delta/78/03_rooms_to_delete.sql @@ -0,0 +1,27 @@ +/* Copyright 2023 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- cf upsert_room_to_delete docstring for the meaning of the fields. +CREATE TABLE IF NOT EXISTS rooms_to_delete( + room_id text NOT NULL, + delete_id text NOT NULL, + action text NOT NULL, + status text NOT NULL, + timestamp bigint, + params text, + response text, + error text, + UNIQUE(room_id, delete_id) +); diff --git a/tests/rest/admin/test_room.py b/tests/rest/admin/test_room.py index eb50086c50..ba8afbc2b9 100644 --- a/tests/rest/admin/test_room.py +++ b/tests/rest/admin/test_room.py @@ -24,9 +24,10 @@ from twisted.test.proto_helpers import MemoryReactor import synapse.rest.admin from synapse.api.constants import EventTypes, Membership, RoomTypes from synapse.api.errors import Codes -from synapse.handlers.pagination import PaginationHandler, PurgeStatus +from synapse.handlers.pagination import DeleteStatus, PaginationHandler from synapse.rest.client import directory, events, login, room from synapse.server import HomeServer +from synapse.types import UserID from synapse.util import Clock from synapse.util.stringutils import random_string @@ -35,6 +36,9 @@ from tests import unittest """Tests admin REST events for /rooms paths.""" +ONE_HOUR_IN_S = 3600 + + class DeleteRoomTestCase(unittest.HomeserverTestCase): servlets = [ synapse.rest.admin.register_servlets, @@ -502,6 +506,9 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase): ) self.url_status_by_delete_id = "/_synapse/admin/v2/rooms/delete_status/" + self.room_member_handler = hs.get_room_member_handler() + self.pagination_handler = hs.get_pagination_handler() + @parameterized.expand( [ ("DELETE", "/_synapse/admin/v2/rooms/%s"), @@ -686,8 +693,10 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase): self.assertEqual(2, len(channel.json_body["results"])) self.assertEqual("complete", channel.json_body["results"][0]["status"]) self.assertEqual("complete", channel.json_body["results"][1]["status"]) - self.assertEqual(delete_id1, channel.json_body["results"][0]["delete_id"]) - self.assertEqual(delete_id2, channel.json_body["results"][1]["delete_id"]) + delete_ids = {delete_id1, delete_id2} + self.assertTrue(channel.json_body["results"][0]["delete_id"] in delete_ids) + delete_ids.remove(channel.json_body["results"][0]["delete_id"]) + self.assertTrue(channel.json_body["results"][1]["delete_id"] in delete_ids) # get status after more than clearing time for first task # second task is not cleared @@ -742,7 +751,7 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase): self.assertEqual(400, second_channel.code, msg=second_channel.json_body) self.assertEqual(Codes.UNKNOWN, second_channel.json_body["errcode"]) self.assertEqual( - f"History purge already in progress for {self.room_id}", + f"Purge already in progress for {self.room_id}", second_channel.json_body["error"], ) @@ -972,6 +981,121 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase): # Assert we can no longer peek into the room self._assert_peek(self.room_id, expect_code=403) + @unittest.override_config({"purge_retention_period": "1d"}) + def test_purge_forgotten_room(self) -> None: + # Create a test room + room_id = self.helper.create_room_as( + self.admin_user, + tok=self.admin_user_tok, + ) + + self.helper.leave(room_id, user=self.admin_user, tok=self.admin_user_tok) + self.get_success( + self.room_member_handler.forget( + UserID.from_string(self.admin_user), room_id + ) + ) + + # Test that room is not yet purged + with self.assertRaises(AssertionError): + self._is_purged(room_id) + + # Advance 24 hours in the future, past the `purge_retention_period` + self.reactor.advance(24 * ONE_HOUR_IN_S) + + self._is_purged(room_id) + + def test_resume_purge_room(self) -> None: + # Create a test room + room_id = self.helper.create_room_as( + self.admin_user, + tok=self.admin_user_tok, + ) + self.helper.leave(room_id, user=self.admin_user, tok=self.admin_user_tok) + + self.get_success( + self.store.upsert_room_to_delete( + room_id, + random_string(16), + DeleteStatus.ACTION_PURGE, + DeleteStatus.STATUS_PURGING, + ) + ) + + # Test that room is not yet purged + with self.assertRaises(AssertionError): + self._is_purged(room_id) + + # Advance one hour in the future past `PURGE_ROOMS_INTERVAL_MS` so that + # the automatic purging takes place and resumes the purge + self.reactor.advance(ONE_HOUR_IN_S) + + self._is_purged(room_id) + + def test_resume_shutdown_room(self) -> None: + # Create a test room + room_id = self.helper.create_room_as( + self.other_user, + tok=self.other_user_tok, + ) + + delete_id = random_string(16) + + self.get_success( + self.store.upsert_room_to_delete( + room_id, + delete_id, + DeleteStatus.ACTION_SHUTDOWN, + DeleteStatus.STATUS_SHUTTING_DOWN, + params=json.dumps( + { + "requester_user_id": self.admin_user, + "new_room_user_id": self.admin_user, + "new_room_name": None, + "message": None, + "block": False, + "purge": True, + "force_purge": True, + } + ), + ) + ) + + # Test that room is not yet shutdown + self._is_member(room_id, self.other_user) + + # Test that room is not yet purged + with self.assertRaises(AssertionError): + self._is_purged(room_id) + + # Advance one hour in the future past `PURGE_ROOMS_INTERVAL_MS` so that + # the automatic purging takes place and resumes the purge + self.reactor.advance(ONE_HOUR_IN_S) + + # Test that all users has been kicked (room is shutdown) + self._has_no_members(room_id) + + self._is_purged(room_id) + + # Retrieve delete results + result = self.make_request( + "GET", + self.url_status_by_delete_id + delete_id, + access_token=self.admin_user_tok, + ) + self.assertEqual(200, result.code, msg=result.json_body) + + # Check that the user is in kicked_users + self.assertIn( + self.other_user, result.json_body["shutdown_room"]["kicked_users"] + ) + + new_room_id = result.json_body["shutdown_room"]["new_room_id"] + self.assertTrue(new_room_id) + + # Check that the user is actually in the new room + self._is_member(new_room_id, self.other_user) + def _is_blocked(self, room_id: str, expect: bool = True) -> None: """Assert that the room is blocked or not""" d = self.store.is_room_blocked(room_id) @@ -1958,13 +2082,13 @@ class RoomMessagesTestCase(unittest.HomeserverTestCase): # Purge every event before the second event. purge_id = random_string(16) - pagination_handler._purges_by_id[purge_id] = PurgeStatus() self.get_success( pagination_handler._purge_history( purge_id=purge_id, room_id=self.room_id, token=second_token_str, delete_local_events=True, + update_rooms_to_delete_table=True, ) ) diff --git a/tests/rest/admin/test_server_notice.py b/tests/rest/admin/test_server_notice.py index 28b999573e..d14da9fd0e 100644 --- a/tests/rest/admin/test_server_notice.py +++ b/tests/rest/admin/test_server_notice.py @@ -22,6 +22,7 @@ from synapse.server import HomeServer from synapse.storage.roommember import RoomsForUser from synapse.types import JsonDict from synapse.util import Clock +from synapse.util.stringutils import random_string from tests import unittest from tests.unittest import override_config @@ -413,11 +414,25 @@ class ServerNoticeTestCase(unittest.HomeserverTestCase): self.assertEqual(messages[0]["content"]["body"], "test msg one") self.assertEqual(messages[0]["sender"], "@notices:test") + delete_id = random_string(16) + # shut down and purge room self.get_success( - self.room_shutdown_handler.shutdown_room(first_room_id, self.admin_user) - ) - self.get_success(self.pagination_handler.purge_room(first_room_id)) + self.room_shutdown_handler.shutdown_room( + first_room_id, + delete_id, + { + "requester_user_id": self.admin_user, + "new_room_user_id": None, + "new_room_name": None, + "message": None, + "block": False, + "purge": True, + "force_purge": False, + }, + ) + ) + self.get_success(self.pagination_handler.purge_room(first_room_id, "delete_id")) # user is not member anymore self._check_invite_and_join_status(self.other_user, 0, 0) diff --git a/tests/rest/client/test_rooms.py b/tests/rest/client/test_rooms.py index f1b4e1ad2f..98c3f99d11 100644 --- a/tests/rest/client/test_rooms.py +++ b/tests/rest/client/test_rooms.py @@ -41,7 +41,6 @@ from synapse.api.errors import Codes, HttpResponseException from synapse.appservice import ApplicationService from synapse.events import EventBase from synapse.events.snapshot import EventContext -from synapse.handlers.pagination import PurgeStatus from synapse.rest import admin from synapse.rest.client import account, directory, login, profile, register, room, sync from synapse.server import HomeServer @@ -2090,13 +2089,13 @@ class RoomMessageListTestCase(RoomBase): # Purge every event before the second event. purge_id = random_string(16) - pagination_handler._purges_by_id[purge_id] = PurgeStatus() self.get_success( pagination_handler._purge_history( purge_id=purge_id, room_id=self.room_id, token=second_token_str, delete_local_events=True, + update_rooms_to_delete_table=True, ) ) |