diff options
author | Mathieu Velten <matmaul@gmail.com> | 2023-07-17 14:42:23 +0200 |
---|---|---|
committer | Mathieu Velten <mathieuv@matrix.org> | 2023-07-25 12:24:55 +0200 |
commit | 5065d7df75a6ea96da7d5dd6a897ba4661f90c1e (patch) | |
tree | 863f1b42ddfe5f8399b926173c0d761ea42ff51c | |
parent | Merge branch 'mv/task-scheduler' into mv/purge-room-when-forgotten-wip (diff) | |
download | synapse-github/mv/purge-room-when-forgotten-wip.tar.xz |
Use task scheduler github/mv/purge-room-when-forgotten-wip mv/purge-room-when-forgotten-wip
-rw-r--r-- | synapse/handlers/pagination.py | 362 | ||||
-rw-r--r-- | synapse/handlers/room.py | 91 | ||||
-rw-r--r-- | synapse/handlers/room_member.py | 12 | ||||
-rw-r--r-- | synapse/rest/admin/__init__.py | 17 | ||||
-rw-r--r-- | synapse/rest/admin/rooms.py | 49 | ||||
-rw-r--r-- | synapse/storage/databases/main/roommember.py | 108 | ||||
-rw-r--r-- | synapse/storage/schema/main/delta/78/03_rooms_to_delete.sql | 27 | ||||
-rw-r--r-- | tests/rest/admin/test_room.py | 112 | ||||
-rw-r--r-- | tests/rest/admin/test_server_notice.py | 5 | ||||
-rw-r--r-- | tests/rest/client/test_rooms.py | 5 |
10 files changed, 213 insertions, 575 deletions
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index 982b38cc43..c52676709d 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -12,9 +12,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import json import logging -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set +from typing import TYPE_CHECKING, List, Optional, Set, Tuple from twisted.python.failure import Failure @@ -22,12 +21,19 @@ from synapse.api.constants import Direction, EventTypes, Membership from synapse.api.errors import SynapseError from synapse.api.filtering import Filter from synapse.events.utils import SerializeEventConfig -from synapse.handlers.room import DeleteStatus, ShutdownRoomParams, ShutdownRoomResponse +from synapse.handlers.room import ShutdownRoomParams from synapse.logging.opentracing import trace from synapse.metrics.background_process_metrics import run_as_background_process from synapse.rest.admin._base import assert_user_is_admin from synapse.streams.config import PaginationConfig -from synapse.types import JsonDict, Requester, StreamKeyType +from synapse.types import ( + JsonDict, + JsonMapping, + Requester, + ScheduledTask, + StreamKeyType, + TaskStatus, +) from synapse.types.state import StateFilter from synapse.util.async_helpers import ReadWriteLock from synapse.util.stringutils import random_string @@ -52,12 +58,6 @@ class PaginationHandler: paginating during a purge. """ - # when to remove a completed deletion/purge from the results map - CLEAR_PURGE_AFTER_MS = 1000 * 3600 * 24 # 24 hours - - # how often to run the purge rooms loop - PURGE_ROOMS_INTERVAL_MS = 1000 * 3600 # 1 hour - def __init__(self, hs: "HomeServer"): self.hs = hs self.auth = hs.get_auth() @@ -68,6 +68,7 @@ class PaginationHandler: self._server_name = hs.hostname self._room_shutdown_handler = hs.get_room_shutdown_handler() self._relations_handler = hs.get_relations_handler() + self._task_scheduler = hs.get_task_scheduler() self.pagination_lock = ReadWriteLock() # IDs of rooms in which there currently an active purge *or delete* operation. @@ -101,95 +102,11 @@ class PaginationHandler: job.longest_max_lifetime, ) - if self._is_master: - self.clock.looping_call( - run_as_background_process, - PaginationHandler.PURGE_ROOMS_INTERVAL_MS, - "purge_rooms", - self.purge_rooms, - ) - - async def purge_rooms(self) -> None: - """This takes care of restoring unfinished purge/shutdown rooms from the DB. - It also takes care to launch scheduled ones, like rooms that has been fully - forgotten. - - It should be run regularly. - """ - rooms_to_delete = await self.store.get_rooms_to_delete() - for r in rooms_to_delete: - room_id = r["room_id"] - delete_id = r["delete_id"] - status = r["status"] - action = r["action"] - timestamp = r["timestamp"] - - if ( - status == DeleteStatus.STATUS_COMPLETE - or status == DeleteStatus.STATUS_FAILED - ): - # remove the delete from the list 24 hours after it completes or fails - ms_since_completed = self.clock.time_msec() - timestamp - if ms_since_completed >= PaginationHandler.CLEAR_PURGE_AFTER_MS: - await self.store.delete_room_to_delete(room_id, delete_id) - - continue - - if room_id in self._purges_in_progress_by_room: - # a delete background task is already running (or has run) - # for this room id, let's ignore it for now - continue - - # If the database says we were last in the middle of shutting down the room, - # let's continue the shutdown process. - shutdown_response = None - if ( - action == DeleteStatus.ACTION_SHUTDOWN - and status == DeleteStatus.STATUS_SHUTTING_DOWN - ): - shutdown_params = json.loads(r["params"]) - if r["response"]: - shutdown_response = json.loads(r["response"]) - await self._shutdown_and_purge_room( - room_id, - delete_id, - shutdown_params=shutdown_params, - shutdown_response=shutdown_response, - ) - continue - - # If the database says we were last in the middle of purging the room, - # let's continue the purge process. - if status == DeleteStatus.STATUS_PURGING: - purge_now = True - # Or if we're at or past the scheduled purge time, let's start that one as well - elif status == DeleteStatus.STATUS_SCHEDULED and ( - timestamp is None or self.clock.time_msec() >= timestamp - ): - purge_now = True - - # TODO 2 stages purge, keep memberships for a while so we don't "break" sync - if purge_now: - params = {} - if r["params"]: - params = json.loads(r["params"]) - - if action == DeleteStatus.ACTION_PURGE_HISTORY: - if "token" in params: - await self._purge_history( - delete_id, - room_id, - params["token"], - params.get("delete_local_events", False), - True, - ) - elif action == DeleteStatus.ACTION_PURGE: - await self.purge_room( - room_id, - delete_id, - params.get("force", False), - shutdown_response=shutdown_response, - ) + self._task_scheduler.register_action(self._purge_history, "purge_history") + self._task_scheduler.register_action(self._purge_room, "purge_room") + self._task_scheduler.register_action( + self._shutdown_and_purge_room, "shutdown_and_purge_room" + ) async def purge_history_for_rooms_in_range( self, min_ms: Optional[int], max_ms: Optional[int] @@ -241,14 +158,6 @@ class PaginationHandler: for room_id, retention_policy in rooms.items(): logger.info("[purge] Attempting to purge messages in room %s", room_id) - if room_id in self._purges_in_progress_by_room: - logger.warning( - "[purge] not purging room %s as there's an ongoing purge running" - " for this room", - room_id, - ) - continue - # If max_lifetime is None, it means that the room has no retention policy. # Given we only retrieve such rooms when there's a default retention policy # defined in the server's configuration, we can safely assume that's the @@ -330,46 +239,49 @@ class PaginationHandler: Returns: unique ID for this purge transaction. """ - if room_id in self._purges_in_progress_by_room: - raise SynapseError( - 400, "History purge already in progress for %s" % (room_id,) - ) - - purge_id = random_string(16) + purge_id = await self._task_scheduler.schedule_task( + "purge_history", + resource_id=room_id, + params={"token": token, "delete_local_events": delete_local_events}, + ) # we log the purge_id here so that it can be tied back to the # request id in the log lines. logger.info("[purge] starting purge_id %s", purge_id) - await self.store.upsert_room_to_delete( - room_id, - purge_id, - DeleteStatus.ACTION_PURGE_HISTORY, - DeleteStatus.STATUS_PURGING, - params=json.dumps( - {"token": token, "delete_local_events": delete_local_events} - ), - ) - - run_as_background_process( - "purge_history", - self._purge_history, - purge_id, - room_id, - token, - delete_local_events, - True, - ) return purge_id async def _purge_history( self, - purge_id: str, + task: ScheduledTask, + first_launch: bool, + ) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]: + if ( + task.resource_id is None + or task.params is None + or "token" not in task.params + or "delete_local_events" not in task.params + ): + return ( + TaskStatus.FAILED, + None, + "Not enough parameters passed to _purge_history", + ) + err = await self.purge_history( + task.resource_id, + task.params["token"], + task.params["delete_local_events"], + ) + if err is not None: + return TaskStatus.FAILED, None, err + return TaskStatus.COMPLETE, None, None + + async def purge_history( + self, room_id: str, token: str, delete_local_events: bool, - update_rooms_to_delete_table: bool, - ) -> None: + ) -> Optional[str]: """Carry out a history purge on a room. Args: @@ -382,88 +294,54 @@ class PaginationHandler: functionality since we don't need to explicitly restore those, they will be relaunch by the retention logic. """ - self._purges_in_progress_by_room.add(room_id) try: async with self.pagination_lock.write(room_id): await self._storage_controllers.purge_events.purge_history( room_id, token, delete_local_events ) logger.info("[purge] complete") - if update_rooms_to_delete_table: - await self.store.upsert_room_to_delete( - room_id, - purge_id, - DeleteStatus.ACTION_PURGE_HISTORY, - DeleteStatus.STATUS_COMPLETE, - timestamp=self.clock.time_msec(), - ) + return None except Exception: f = Failure() logger.error( "[purge] failed", exc_info=(f.type, f.value, f.getTracebackObject()) ) - if update_rooms_to_delete_table: - await self.store.upsert_room_to_delete( - room_id, - purge_id, - DeleteStatus.ACTION_PURGE_HISTORY, - DeleteStatus.STATUS_FAILED, - error=f.getErrorMessage(), - timestamp=self.clock.time_msec(), - ) - finally: - self._purges_in_progress_by_room.discard(room_id) - - if update_rooms_to_delete_table: - # remove the purge from the list 24 hours after it completes - async def clear_purge() -> None: - await self.store.delete_room_to_delete(room_id, purge_id) - - self.hs.get_reactor().callLater( - PaginationHandler.CLEAR_PURGE_AFTER_MS / 1000, clear_purge - ) + return f.getErrorMessage() - @staticmethod - def _convert_to_delete_status(res: Dict[str, Any]) -> DeleteStatus: - status = DeleteStatus() - status.delete_id = res["delete_id"] - status.action = res["action"] - status.status = res["status"] - if "error" in res: - status.error = res["error"] - - if status.action == DeleteStatus.ACTION_SHUTDOWN and res["response"]: - status.shutdown_room = json.loads(res["response"]) - - return status - - async def get_delete_status(self, delete_id: str) -> Optional[DeleteStatus]: + async def get_delete_task(self, delete_id: str) -> Optional[ScheduledTask]: """Get the current status of an active deleting Args: delete_id: delete_id returned by start_shutdown_and_purge_room or start_purge_history. """ - res = await self.store.get_room_to_delete(delete_id) - if res: - return PaginationHandler._convert_to_delete_status(res) - return None + return await self._task_scheduler.get_task(delete_id) - async def get_delete_statuses_by_room(self, room_id: str) -> List[DeleteStatus]: + async def get_delete_tasks_by_room(self, room_id: str) -> List[ScheduledTask]: """Get all active delete statuses by room Args: room_id: room_id that is deleted """ - res = await self.store.get_rooms_to_delete(room_id) - return [PaginationHandler._convert_to_delete_status(r) for r in res] + return await self._task_scheduler.get_tasks( + actions=["purge_room", "shutdown_and_purge_room"], resource_ids=[room_id] + ) + + async def _purge_room( + self, + task: ScheduledTask, + first_launch: bool, + ) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]: + if not task.resource_id: + raise Exception("No room id passed to purge_room task") + params = task.params if task.params else {} + await self.purge_room(task.resource_id, params.get("force", False)) + return TaskStatus.COMPLETE, None, None async def purge_room( self, room_id: str, - delete_id: str, - force: bool = False, - shutdown_response: Optional[ShutdownRoomResponse] = None, + force: bool, ) -> None: """Purge the given room from the database. @@ -475,10 +353,6 @@ class PaginationHandler: """ logger.info("starting purge room_id=%s force=%s", room_id, force) - action = DeleteStatus.ACTION_PURGE - if shutdown_response: - action = DeleteStatus.ACTION_SHUTDOWN - async with self.pagination_lock.write(room_id): # first check that we have no users in this room joined = await self.store.is_host_joined(room_id, self._server_name) @@ -491,25 +365,8 @@ class PaginationHandler: else: raise SynapseError(400, "Users are still joined to this room") - await self.store.upsert_room_to_delete( - room_id, - delete_id, - action, - DeleteStatus.STATUS_PURGING, - response=json.dumps(shutdown_response), - ) - await self._storage_controllers.purge_events.purge_room(room_id) - await self.store.upsert_room_to_delete( - room_id, - delete_id, - action, - DeleteStatus.STATUS_COMPLETE, - timestamp=self.clock.time_msec(), - response=json.dumps(shutdown_response), - ) - logger.info("purge complete for room_id %s", room_id) @trace @@ -789,11 +646,9 @@ class PaginationHandler: async def _shutdown_and_purge_room( self, - room_id: str, - delete_id: str, - shutdown_params: ShutdownRoomParams, - shutdown_response: Optional[ShutdownRoomResponse] = None, - ) -> None: + task: ScheduledTask, + first_launch: bool, + ) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]: """ Shuts down and purges a room. @@ -807,50 +662,36 @@ class PaginationHandler: Keeps track of the `DeleteStatus` (and `ShutdownRoomResponse`) in `self._delete_by_id` and persisted in DB """ - - self._purges_in_progress_by_room.add(room_id) - try: - shutdown_response = await self._room_shutdown_handler.shutdown_room( - room_id=room_id, - delete_id=delete_id, - shutdown_params=shutdown_params, - shutdown_response=shutdown_response, + if task.resource_id is None or task.params is None: + raise Exception( + "No room id and/or no parameters passed to shutdown_and_purge_room task" ) - if shutdown_params["purge"]: - await self.purge_room( - room_id, - delete_id, - shutdown_params["force_purge"], - shutdown_response=shutdown_response, - ) + room_id = task.resource_id - await self.store.upsert_room_to_delete( - room_id, - delete_id, - DeleteStatus.ACTION_SHUTDOWN, - DeleteStatus.STATUS_COMPLETE, - timestamp=self.clock.time_msec(), - response=json.dumps(shutdown_response), - ) - except Exception: - f = Failure() - logger.error( - "failed", - exc_info=(f.type, f.value, f.getTracebackObject()), - ) - await self.store.upsert_room_to_delete( + async def update_result(result: Optional[JsonMapping]) -> None: + await self._task_scheduler.update_task(task.id, result=result) + + shutdown_result = await self._room_shutdown_handler.shutdown_room( + room_id, task.params, task.result, update_result + ) + + if task.params.get("purge", False): + await self.purge_room( room_id, - delete_id, - DeleteStatus.ACTION_SHUTDOWN, - DeleteStatus.STATUS_FAILED, - timestamp=self.clock.time_msec(), - error=f.getErrorMessage(), + task.params.get("force_purge", False), ) - finally: - self._purges_in_progress_by_room.discard(room_id) - def start_shutdown_and_purge_room( + return (TaskStatus.COMPLETE, shutdown_result, None) + + async def get_current_delete_tasks(self, room_id: str) -> List[ScheduledTask]: + return await self._task_scheduler.get_tasks( + actions=["purge_history", "purge_room", "shutdown_and_purge_room"], + resource_ids=[room_id], + statuses=[TaskStatus.ACTIVE, TaskStatus.SCHEDULED], + ) + + async def start_shutdown_and_purge_room( self, room_id: str, shutdown_params: ShutdownRoomParams, @@ -864,7 +705,7 @@ class PaginationHandler: Returns: unique ID for this delete transaction. """ - if room_id in self._purges_in_progress_by_room: + if len(await self.get_current_delete_tasks(room_id)) > 0: raise SynapseError(400, "Purge already in progress for %s" % (room_id,)) # This check is double to `RoomShutdownHandler.shutdown_room` @@ -877,7 +718,11 @@ class PaginationHandler: 400, "User must be our own: %s" % (new_room_user_id,) ) - delete_id = random_string(16) + delete_id = await self._task_scheduler.schedule_task( + "shutdown_and_purge_room", + resource_id=room_id, + params=shutdown_params, + ) # we log the delete_id here so that it can be tied back to the # request id in the log lines. @@ -887,11 +732,4 @@ class PaginationHandler: delete_id, ) - run_as_background_process( - "shutdown_and_purge_room", - self._shutdown_and_purge_room, - room_id, - delete_id, - shutdown_params, - ) return delete_id diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index f7f9d9d2f5..ce678d41fc 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -14,14 +14,13 @@ """Contains functions for performing actions on rooms.""" import itertools -import json import logging import math import random import string from collections import OrderedDict from http import HTTPStatus -from typing import TYPE_CHECKING, Any, Awaitable, Dict, List, Optional, Tuple +from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, List, Optional, Tuple import attr from typing_extensions import TypedDict @@ -59,6 +58,7 @@ from synapse.rest.admin._base import assert_user_is_admin from synapse.streams import EventSource from synapse.types import ( JsonDict, + JsonMapping, MutableStateMap, Requester, RoomAlias, @@ -1883,10 +1883,12 @@ class RoomShutdownHandler: async def shutdown_room( self, room_id: str, - delete_id: str, - shutdown_params: ShutdownRoomParams, - shutdown_response: Optional[ShutdownRoomResponse] = None, - ) -> ShutdownRoomResponse: + params: JsonMapping, + result: Optional[JsonMapping] = None, + update_result_fct: Optional[ + Callable[[Optional[JsonMapping]], Awaitable[None]] + ] = None, + ) -> Optional[JsonMapping]: """ Shuts down a room. Moves all local users and room aliases automatically to a new room if `new_room_user_id` is set. Otherwise local users only @@ -1908,21 +1910,16 @@ class RoomShutdownHandler: Returns: a dict matching `ShutdownRoomResponse`. """ - - requester_user_id = shutdown_params["requester_user_id"] - new_room_user_id = shutdown_params["new_room_user_id"] - block = shutdown_params["block"] + requester_user_id = params["requester_user_id"] + new_room_user_id = params["new_room_user_id"] + block = params["block"] new_room_name = ( - shutdown_params["new_room_name"] - if shutdown_params["new_room_name"] + params["new_room_name"] + if params["new_room_name"] else self.DEFAULT_ROOM_NAME ) - message = ( - shutdown_params["message"] - if shutdown_params["message"] - else self.DEFAULT_MESSAGE - ) + message = params["message"] if params["message"] else self.DEFAULT_MESSAGE if not RoomID.is_valid(room_id): raise SynapseError(400, "%s is not a legal room ID" % (room_id,)) @@ -1934,21 +1931,15 @@ class RoomShutdownHandler: 403, "Shutdown of this room is forbidden", Codes.FORBIDDEN ) - if not shutdown_response: - shutdown_response = { + result = ( + dict(result) + if result + else { "kicked_users": [], "failed_to_kick_users": [], "local_aliases": [], "new_room_id": None, } - - await self.store.upsert_room_to_delete( - room_id, - delete_id, - DeleteStatus.ACTION_SHUTDOWN, - DeleteStatus.STATUS_SHUTTING_DOWN, - params=json.dumps(shutdown_params), - response=json.dumps(shutdown_response), ) # Action the block first (even if the room doesn't exist yet) @@ -1959,9 +1950,9 @@ class RoomShutdownHandler: if not await self.store.get_room(room_id): # if we don't know about the room, there is nothing left to do. - return shutdown_response + return result - new_room_id = shutdown_response.get("new_room_id") + new_room_id = result.get("new_room_id") if new_room_user_id is not None and new_room_id is None: if not self.hs.is_mine_id(new_room_user_id): raise SynapseError( @@ -1982,15 +1973,9 @@ class RoomShutdownHandler: ratelimit=False, ) - shutdown_response["new_room_id"] = new_room_id - await self.store.upsert_room_to_delete( - room_id, - delete_id, - DeleteStatus.ACTION_SHUTDOWN, - DeleteStatus.STATUS_SHUTTING_DOWN, - params=json.dumps(shutdown_params), - response=json.dumps(shutdown_response), - ) + result["new_room_id"] = new_room_id + if update_result_fct: + await update_result_fct(result) logger.info( "Shutting down room %r, joining to new room: %r", room_id, new_room_id @@ -2053,28 +2038,16 @@ class RoomShutdownHandler: require_consent=False, ) - shutdown_response["kicked_users"].append(user_id) - await self.store.upsert_room_to_delete( - room_id, - delete_id, - DeleteStatus.ACTION_SHUTDOWN, - DeleteStatus.STATUS_SHUTTING_DOWN, - params=json.dumps(shutdown_params), - response=json.dumps(shutdown_response), - ) + result["kicked_users"].append(user_id) + if update_result_fct: + await update_result_fct(result) except Exception: logger.exception( "Failed to leave old room and join new room for %r", user_id ) - shutdown_response["failed_to_kick_users"].append(user_id) - await self.store.upsert_room_to_delete( - room_id, - delete_id, - DeleteStatus.ACTION_SHUTDOWN, - DeleteStatus.STATUS_SHUTTING_DOWN, - params=json.dumps(shutdown_params), - response=json.dumps(shutdown_response), - ) + result["failed_to_kick_users"].append(user_id) + if update_result_fct: + await update_result_fct(result) # Send message in new room and move aliases if new_room_user_id: @@ -2093,7 +2066,7 @@ class RoomShutdownHandler: ratelimit=False, ) - shutdown_response["local_aliases"] = list( + result["local_aliases"] = list( await self.store.get_aliases_for_room(room_id) ) @@ -2102,6 +2075,6 @@ class RoomShutdownHandler: room_id, new_room_id, requester_user_id ) else: - shutdown_response["local_aliases"] = [] + result["local_aliases"] = [] - return shutdown_response + return result diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index e3147924f7..db98cf856c 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -38,7 +38,6 @@ from synapse.event_auth import get_named_level, get_power_level_event from synapse.events import EventBase from synapse.events.snapshot import EventContext from synapse.handlers.profile import MAX_AVATAR_URL_LEN, MAX_DISPLAYNAME_LEN -from synapse.handlers.room import DeleteStatus from synapse.handlers.state_deltas import MatchChange, StateDeltasHandler from synapse.logging import opentracing from synapse.metrics import event_processing_positions @@ -57,7 +56,6 @@ from synapse.types import ( from synapse.types.state import StateFilter from synapse.util.async_helpers import Linearizer from synapse.util.distributor import user_left_room -from synapse.util.stringutils import random_string if TYPE_CHECKING: from synapse.server import HomeServer @@ -96,6 +94,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): self.event_creation_handler = hs.get_event_creation_handler() self.account_data_handler = hs.get_account_data_handler() self.event_auth_handler = hs.get_event_auth_handler() + self.task_scheduler = hs.get_task_scheduler() self.member_linearizer: Linearizer = Linearizer(name="member") self.member_as_limiter = Linearizer(max_count=10, name="member_as_limiter") @@ -318,12 +317,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): and self._purge_retention_period and await self.store.is_locally_forgotten_room(room_id) ): - delete_id = random_string(16) - await self.store.upsert_room_to_delete( - room_id, - delete_id, - DeleteStatus.ACTION_PURGE, - DeleteStatus.STATUS_SCHEDULED, + await self.task_scheduler.schedule_task( + "purge_room", + resource_id=room_id, timestamp=self.clock.time_msec() + self._purge_retention_period, ) diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py index 0cabdd1dc6..fafa9ea428 100644 --- a/synapse/rest/admin/__init__.py +++ b/synapse/rest/admin/__init__.py @@ -93,7 +93,7 @@ from synapse.rest.admin.users import ( UserTokenRestServlet, WhoisRestServlet, ) -from synapse.types import JsonDict, RoomStreamToken +from synapse.types import JsonDict, RoomStreamToken, TaskStatus from synapse.util import SYNAPSE_VERSION if TYPE_CHECKING: @@ -215,12 +215,21 @@ class PurgeHistoryStatusRestServlet(RestServlet): ) -> Tuple[int, JsonDict]: await assert_requester_is_admin(self.auth, request) - purge_status = await self.pagination_handler.get_delete_status(purge_id) - if purge_status is None: + purge_task = await self.pagination_handler.get_delete_task(purge_id) + if purge_task is None or purge_task.action != "purge_history": raise NotFoundError("purge id '%s' not found" % purge_id) + result = { + "status": purge_task.status + if purge_task.status == TaskStatus.COMPLETE + or purge_task.status == TaskStatus.FAILED + else "active", + } + if purge_task.error: + result["error"] = purge_task.error + # TODO active vs purging etc - return HTTPStatus.OK, purge_status.asdict(use_purge_history_format=True) + return HTTPStatus.OK, result ######################################################################################## diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py index 9e31d018b1..62cdb9af38 100644 --- a/synapse/rest/admin/rooms.py +++ b/synapse/rest/admin/rooms.py @@ -19,7 +19,6 @@ from urllib import parse as urlparse from synapse.api.constants import Direction, EventTypes, JoinRules, Membership from synapse.api.errors import AuthError, Codes, NotFoundError, SynapseError from synapse.api.filtering import Filter -from synapse.handlers.room import DeleteStatus from synapse.http.servlet import ( ResolveRoomIdMixin, RestServlet, @@ -37,10 +36,16 @@ from synapse.rest.admin._base import ( ) from synapse.storage.databases.main.room import RoomSortOrder from synapse.streams.config import PaginationConfig -from synapse.types import JsonDict, RoomID, UserID, create_requester +from synapse.types import ( + JsonDict, + RoomID, + ScheduledTask, + TaskStatus, + UserID, + create_requester, +) from synapse.types.state import StateFilter from synapse.util import json_decoder -from synapse.util.stringutils import random_string if TYPE_CHECKING: from synapse.api.auth import Auth @@ -119,7 +124,7 @@ class RoomRestV2Servlet(RestServlet): 403, "Shutdown of this room is forbidden", Codes.FORBIDDEN ) - delete_id = self._pagination_handler.start_shutdown_and_purge_room( + delete_id = await self._pagination_handler.start_shutdown_and_purge_room( room_id=room_id, shutdown_params={ "new_room_user_id": content.get("new_room_user_id"), @@ -135,6 +140,14 @@ class RoomRestV2Servlet(RestServlet): return HTTPStatus.OK, {"delete_id": delete_id} +def _convert_delete_task_to_response(task: ScheduledTask) -> JsonDict: + return { + "delete_id": task.id, + "status": task.status, + "shutdown_room": task.result, + } + + class DeleteRoomStatusByRoomIdRestServlet(RestServlet): """Get the status of the delete room background task.""" @@ -154,16 +167,14 @@ class DeleteRoomStatusByRoomIdRestServlet(RestServlet): HTTPStatus.BAD_REQUEST, "%s is not a legal room ID" % (room_id,) ) - delete_statuses = await self._pagination_handler.get_delete_statuses_by_room( - room_id - ) + delete_tasks = await self._pagination_handler.get_delete_tasks_by_room(room_id) response = [] - for delete_status in delete_statuses: + for delete_task in delete_tasks: # We ignore scheduled deletes because currently they are only used # for automatically purging forgotten room after X time. - if delete_status.status != DeleteStatus.STATUS_SCHEDULED: - response += [delete_status.asdict()] + if delete_task.status != TaskStatus.SCHEDULED: + response += [_convert_delete_task_to_response(delete_task)] if response: return HTTPStatus.OK, {"results": cast(JsonDict, response)} @@ -185,11 +196,14 @@ class DeleteRoomStatusByDeleteIdRestServlet(RestServlet): ) -> Tuple[int, JsonDict]: await assert_requester_is_admin(self._auth, request) - delete_status = await self._pagination_handler.get_delete_status(delete_id) - if delete_status is None: + delete_task = await self._pagination_handler.get_delete_task(delete_id) + if delete_task is None or ( + delete_task.action != "purge_room" + and delete_task.action != "shutdown_and_purge_room" + ): raise NotFoundError("delete id '%s' not found" % delete_id) - return HTTPStatus.OK, cast(JsonDict, delete_status.asdict()) + return HTTPStatus.OK, _convert_delete_task_to_response(delete_task) class ListRoomRestServlet(RestServlet): @@ -351,12 +365,9 @@ class RoomRestServlet(RestServlet): Codes.BAD_JSON, ) - delete_id = random_string(16) - ret = await room_shutdown_handler.shutdown_room( room_id=room_id, - delete_id=delete_id, - shutdown_params={ + params={ "new_room_user_id": content.get("new_room_user_id"), "new_room_name": content.get("room_name"), "message": content.get("message"), @@ -370,9 +381,7 @@ class RoomRestServlet(RestServlet): # Purge room if purge: try: - await pagination_handler.purge_room( - room_id, delete_id, force=force_purge - ) + await pagination_handler.purge_room(room_id, force=force_purge) except NotFoundError: if block: # We can block unknown rooms with this endpoint, in which case diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 2c0b573d4c..582875c91a 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -17,7 +17,6 @@ from itertools import chain from typing import ( TYPE_CHECKING, AbstractSet, - Any, Collection, Dict, FrozenSet, @@ -1284,113 +1283,6 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): # If any rows still exist it means someone has not forgotten this room yet return not rows[0][0] - async def upsert_room_to_delete( - self, - room_id: str, - delete_id: str, - action: str, - status: str, - timestamp: Optional[int] = None, - params: Optional[str] = None, - response: Optional[str] = None, - error: Optional[str] = None, - ) -> None: - """Insert or update a room to shutdown/purge. - - Args: - room_id: The room ID to shutdown/purge - delete_id: The delete ID identifying this action - action: the type of job, mainly `shutdown` `purge` or `purge_history` - status: Current status of the delete. Cf `DeleteStatus` for possible values - timestamp: Time of the last update. If status is `wait_purge`, - then it specifies when to do the purge, with an empty value specifying ASAP - error: Error message to return, if any - params: JSON representation of delete job parameters - response: JSON representation of delete current status - """ - await self.db_pool.simple_upsert( - "rooms_to_delete", - { - "room_id": room_id, - "delete_id": delete_id, - }, - { - "action": action, - "status": status, - "timestamp": timestamp, - "params": params, - "response": response, - "error": error, - }, - desc="upsert_room_to_delete", - ) - - async def delete_room_to_delete(self, room_id: str, delete_id: str) -> None: - """Remove a room from the list of rooms to purge. - - Args: - room_id: The room ID matching the delete to remove - delete_id: The delete ID identifying the delete to remove - """ - - await self.db_pool.simple_delete( - "rooms_to_delete", - keyvalues={ - "room_id": room_id, - "delete_id": delete_id, - }, - desc="delete_room_to_delete", - ) - - async def get_rooms_to_delete( - self, room_id: Optional[str] = None - ) -> List[Dict[str, Any]]: - """Returns all delete jobs. This includes those that have been - interrupted by a stop/restart of synapse, but also scheduled ones - like locally forgotten rooms. - - Args: - room_id: if specified, will only return the delete jobs for a specific room - - """ - keyvalues = {} - if room_id is not None: - keyvalues["room_id"] = room_id - - return await self.db_pool.simple_select_list( - table="rooms_to_delete", - keyvalues=keyvalues, - retcols=( - "room_id", - "delete_id", - "action", - "status", - "timestamp", - "params", - "response", - "error", - ), - desc="rooms_to_delete_fetch", - ) - - async def get_room_to_delete(self, delete_id: str) -> Optional[Dict[str, Any]]: - """Return the delete job identified by delete_id.""" - return await self.db_pool.simple_select_one( - table="rooms_to_delete", - keyvalues={"delete_id": delete_id}, - retcols=( - "room_id", - "delete_id", - "action", - "status", - "timestamp", - "params", - "response", - "error", - ), - desc="rooms_to_delete_fetch", - ) - async def get_rooms_user_has_been_in(self, user_id: str) -> Set[str]: """Get all rooms that the user has ever been in. diff --git a/synapse/storage/schema/main/delta/78/03_rooms_to_delete.sql b/synapse/storage/schema/main/delta/78/03_rooms_to_delete.sql deleted file mode 100644 index 8f9c8c7010..0000000000 --- a/synapse/storage/schema/main/delta/78/03_rooms_to_delete.sql +++ /dev/null @@ -1,27 +0,0 @@ -/* Copyright 2023 The Matrix.org Foundation C.I.C - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - --- cf upsert_room_to_delete docstring for the meaning of the fields. -CREATE TABLE IF NOT EXISTS rooms_to_delete( - room_id text NOT NULL, - delete_id text NOT NULL, - action text NOT NULL, - status text NOT NULL, - timestamp bigint, - params text, - response text, - error text, - UNIQUE(room_id, delete_id) -); diff --git a/tests/rest/admin/test_room.py b/tests/rest/admin/test_room.py index ba8afbc2b9..6b1138acb6 100644 --- a/tests/rest/admin/test_room.py +++ b/tests/rest/admin/test_room.py @@ -24,12 +24,11 @@ from twisted.test.proto_helpers import MemoryReactor import synapse.rest.admin from synapse.api.constants import EventTypes, Membership, RoomTypes from synapse.api.errors import Codes -from synapse.handlers.pagination import DeleteStatus, PaginationHandler from synapse.rest.client import directory, events, login, room from synapse.server import HomeServer from synapse.types import UserID from synapse.util import Clock -from synapse.util.stringutils import random_string +from synapse.util.task_scheduler import TaskScheduler from tests import unittest @@ -50,6 +49,7 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase): def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.event_creation_handler = hs.get_event_creation_handler() + self.task_scheduler = hs.get_task_scheduler() hs.config.consent.user_consent_version = "1" consent_uri_builder = Mock() @@ -480,6 +480,7 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase): def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.event_creation_handler = hs.get_event_creation_handler() + self.task_scheduler = hs.get_task_scheduler() hs.config.consent.user_consent_version = "1" consent_uri_builder = Mock() @@ -668,7 +669,7 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase): delete_id1 = channel.json_body["delete_id"] # go ahead - self.reactor.advance(PaginationHandler.CLEAR_PURGE_AFTER_MS / 1000 / 2) + self.reactor.advance(TaskScheduler.KEEP_TASKS_FOR_MS / 1000 / 2) # second task channel = self.make_request( @@ -700,7 +701,7 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase): # get status after more than clearing time for first task # second task is not cleared - self.reactor.advance(PaginationHandler.CLEAR_PURGE_AFTER_MS / 1000 / 2) + self.reactor.advance(TaskScheduler.KEEP_TASKS_FOR_MS / 1000 / 2) channel = self.make_request( "GET", @@ -714,7 +715,7 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase): self.assertEqual(delete_id2, channel.json_body["results"][0]["delete_id"]) # get status after more than clearing time for all tasks - self.reactor.advance(PaginationHandler.CLEAR_PURGE_AFTER_MS / 1000 / 2) + self.reactor.advance(TaskScheduler.KEEP_TASKS_FOR_MS / 1000 / 2) channel = self.make_request( "GET", @@ -725,48 +726,6 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase): self.assertEqual(404, channel.code, msg=channel.json_body) self.assertEqual(Codes.NOT_FOUND, channel.json_body["errcode"]) - def test_delete_same_room_twice(self) -> None: - """Test that the call for delete a room at second time gives an exception.""" - - body = {"new_room_user_id": self.admin_user} - - # first call to delete room - # and do not wait for finish the task - first_channel = self.make_request( - "DELETE", - self.url.encode("ascii"), - content=body, - access_token=self.admin_user_tok, - await_result=False, - ) - - # second call to delete room - second_channel = self.make_request( - "DELETE", - self.url.encode("ascii"), - content=body, - access_token=self.admin_user_tok, - ) - - self.assertEqual(400, second_channel.code, msg=second_channel.json_body) - self.assertEqual(Codes.UNKNOWN, second_channel.json_body["errcode"]) - self.assertEqual( - f"Purge already in progress for {self.room_id}", - second_channel.json_body["error"], - ) - - # get result of first call - first_channel.await_result() - self.assertEqual(200, first_channel.code, msg=first_channel.json_body) - self.assertIn("delete_id", first_channel.json_body) - - # check status after finish the task - self._test_result( - first_channel.json_body["delete_id"], - self.other_user, - expect_new_room=True, - ) - def test_purge_room_and_block(self) -> None: """Test to purge a room and block it. Members will not be moved to a new room and will not receive a message. @@ -1005,7 +964,7 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase): self._is_purged(room_id) - def test_resume_purge_room(self) -> None: + def test_scheduled_purge_room(self) -> None: # Create a test room room_id = self.helper.create_room_as( self.admin_user, @@ -1013,12 +972,12 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase): ) self.helper.leave(room_id, user=self.admin_user, tok=self.admin_user_tok) + # Schedule a purge 10 seconds in the future self.get_success( - self.store.upsert_room_to_delete( - room_id, - random_string(16), - DeleteStatus.ACTION_PURGE, - DeleteStatus.STATUS_PURGING, + self.task_scheduler.schedule_task( + "purge_room", + resource_id=room_id, + timestamp=self.clock.time_msec() + 10 * 1000, ) ) @@ -1026,38 +985,34 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase): with self.assertRaises(AssertionError): self._is_purged(room_id) - # Advance one hour in the future past `PURGE_ROOMS_INTERVAL_MS` so that - # the automatic purging takes place and resumes the purge + # Advance one hour in the future past `TaskScheduler.SCHEDULE_INTERVAL_MS` so that + # the automatic purging takes place and launch the purge self.reactor.advance(ONE_HOUR_IN_S) self._is_purged(room_id) - def test_resume_shutdown_room(self) -> None: + def test_schedule_shutdown_room(self) -> None: # Create a test room room_id = self.helper.create_room_as( self.other_user, tok=self.other_user_tok, ) - delete_id = random_string(16) - - self.get_success( - self.store.upsert_room_to_delete( - room_id, - delete_id, - DeleteStatus.ACTION_SHUTDOWN, - DeleteStatus.STATUS_SHUTTING_DOWN, - params=json.dumps( - { - "requester_user_id": self.admin_user, - "new_room_user_id": self.admin_user, - "new_room_name": None, - "message": None, - "block": False, - "purge": True, - "force_purge": True, - } - ), + # Schedule a shutdown 10 seconds in the future + delete_id = self.get_success( + self.task_scheduler.schedule_task( + "shutdown_and_purge_room", + resource_id=room_id, + params={ + "requester_user_id": self.admin_user, + "new_room_user_id": self.admin_user, + "new_room_name": None, + "message": None, + "block": False, + "purge": True, + "force_purge": True, + }, + timestamp=self.clock.time_msec() + 10 * 1000, ) ) @@ -1068,7 +1023,7 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase): with self.assertRaises(AssertionError): self._is_purged(room_id) - # Advance one hour in the future past `PURGE_ROOMS_INTERVAL_MS` so that + # Advance one hour in the future past `TaskScheduler.SCHEDULE_INTERVAL_MS` so that # the automatic purging takes place and resumes the purge self.reactor.advance(ONE_HOUR_IN_S) @@ -2081,14 +2036,11 @@ class RoomMessagesTestCase(unittest.HomeserverTestCase): self.assertEqual(len(chunk), 2, [event["content"] for event in chunk]) # Purge every event before the second event. - purge_id = random_string(16) self.get_success( - pagination_handler._purge_history( - purge_id=purge_id, + pagination_handler.purge_history( room_id=self.room_id, token=second_token_str, delete_local_events=True, - update_rooms_to_delete_table=True, ) ) diff --git a/tests/rest/admin/test_server_notice.py b/tests/rest/admin/test_server_notice.py index d14da9fd0e..dfd14f5751 100644 --- a/tests/rest/admin/test_server_notice.py +++ b/tests/rest/admin/test_server_notice.py @@ -414,13 +414,12 @@ class ServerNoticeTestCase(unittest.HomeserverTestCase): self.assertEqual(messages[0]["content"]["body"], "test msg one") self.assertEqual(messages[0]["sender"], "@notices:test") - delete_id = random_string(16) + random_string(16) # shut down and purge room self.get_success( self.room_shutdown_handler.shutdown_room( first_room_id, - delete_id, { "requester_user_id": self.admin_user, "new_room_user_id": None, @@ -432,7 +431,7 @@ class ServerNoticeTestCase(unittest.HomeserverTestCase): }, ) ) - self.get_success(self.pagination_handler.purge_room(first_room_id, "delete_id")) + self.get_success(self.pagination_handler.purge_room(first_room_id, force=False)) # user is not member anymore self._check_invite_and_join_status(self.other_user, 0, 0) diff --git a/tests/rest/client/test_rooms.py b/tests/rest/client/test_rooms.py index 98c3f99d11..88643cca9e 100644 --- a/tests/rest/client/test_rooms.py +++ b/tests/rest/client/test_rooms.py @@ -2088,14 +2088,11 @@ class RoomMessageListTestCase(RoomBase): self.assertEqual(len(chunk), 2, [event["content"] for event in chunk]) # Purge every event before the second event. - purge_id = random_string(16) self.get_success( - pagination_handler._purge_history( - purge_id=purge_id, + pagination_handler.purge_history( room_id=self.room_id, token=second_token_str, delete_local_events=True, - update_rooms_to_delete_table=True, ) ) |