diff options
Diffstat (limited to 'synapse/handlers/pagination.py')
-rw-r--r-- | synapse/handlers/pagination.py | 464 |
1 files changed, 155 insertions, 309 deletions
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index 19cf5a2b43..878f267a4e 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -13,9 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import TYPE_CHECKING, Dict, List, Optional, Set - -import attr +from typing import TYPE_CHECKING, List, Optional, Set, Tuple, cast from twisted.python.failure import Failure @@ -23,16 +21,22 @@ from synapse.api.constants import Direction, EventTypes, Membership from synapse.api.errors import SynapseError from synapse.api.filtering import Filter from synapse.events.utils import SerializeEventConfig -from synapse.handlers.room import ShutdownRoomResponse +from synapse.handlers.room import ShutdownRoomParams, ShutdownRoomResponse from synapse.handlers.worker_lock import NEW_EVENT_DURING_PURGE_LOCK_NAME from synapse.logging.opentracing import trace from synapse.metrics.background_process_metrics import run_as_background_process from synapse.rest.admin._base import assert_user_is_admin from synapse.streams.config import PaginationConfig -from synapse.types import JsonDict, Requester, StrCollection, StreamKeyType +from synapse.types import ( + JsonDict, + JsonMapping, + Requester, + ScheduledTask, + StreamKeyType, + TaskStatus, +) from synapse.types.state import StateFilter from synapse.util.async_helpers import ReadWriteLock -from synapse.util.stringutils import random_string from synapse.visibility import filter_events_for_client if TYPE_CHECKING: @@ -53,80 +57,11 @@ BACKFILL_BECAUSE_TOO_MANY_GAPS_THRESHOLD = 3 PURGE_PAGINATION_LOCK_NAME = "purge_pagination_lock" -@attr.s(slots=True, auto_attribs=True) -class PurgeStatus: - """Object tracking the status of a purge request - - This class contains information on the progress of a purge request, for - return by get_purge_status. - """ - - STATUS_ACTIVE = 0 - STATUS_COMPLETE = 1 - STATUS_FAILED = 2 - - STATUS_TEXT = { - STATUS_ACTIVE: "active", - STATUS_COMPLETE: "complete", - STATUS_FAILED: "failed", - } - - # Save the error message if an error occurs - error: str = "" - - # Tracks whether this request has completed. One of STATUS_{ACTIVE,COMPLETE,FAILED}. - status: int = STATUS_ACTIVE - - def asdict(self) -> JsonDict: - ret = {"status": PurgeStatus.STATUS_TEXT[self.status]} - if self.error: - ret["error"] = self.error - return ret - - -@attr.s(slots=True, auto_attribs=True) -class DeleteStatus: - """Object tracking the status of a delete room request +PURGE_HISTORY_ACTION_NAME = "purge_history" - This class contains information on the progress of a delete room request, for - return by get_delete_status. - """ +PURGE_ROOM_ACTION_NAME = "purge_room" - STATUS_PURGING = 0 - STATUS_COMPLETE = 1 - STATUS_FAILED = 2 - STATUS_SHUTTING_DOWN = 3 - - STATUS_TEXT = { - STATUS_PURGING: "purging", - STATUS_COMPLETE: "complete", - STATUS_FAILED: "failed", - STATUS_SHUTTING_DOWN: "shutting_down", - } - - # Tracks whether this request has completed. - # One of STATUS_{PURGING,COMPLETE,FAILED,SHUTTING_DOWN}. - status: int = STATUS_PURGING - - # Save the error message if an error occurs - error: str = "" - - # Saves the result of an action to give it back to REST API - shutdown_room: ShutdownRoomResponse = { - "kicked_users": [], - "failed_to_kick_users": [], - "local_aliases": [], - "new_room_id": None, - } - - def asdict(self) -> JsonDict: - ret = { - "status": DeleteStatus.STATUS_TEXT[self.status], - "shutdown_room": self.shutdown_room, - } - if self.error: - ret["error"] = self.error - return ret +SHUTDOWN_AND_PURGE_ROOM_ACTION_NAME = "shutdown_and_purge_room" class PaginationHandler: @@ -136,9 +71,6 @@ class PaginationHandler: paginating during a purge. """ - # when to remove a completed deletion/purge from the results map - CLEAR_PURGE_AFTER_MS = 1000 * 3600 * 24 # 24 hours - def __init__(self, hs: "HomeServer"): self.hs = hs self.auth = hs.get_auth() @@ -150,17 +82,11 @@ class PaginationHandler: self._room_shutdown_handler = hs.get_room_shutdown_handler() self._relations_handler = hs.get_relations_handler() self._worker_locks = hs.get_worker_locks_handler() + self._task_scheduler = hs.get_task_scheduler() self.pagination_lock = ReadWriteLock() # IDs of rooms in which there currently an active purge *or delete* operation. self._purges_in_progress_by_room: Set[str] = set() - # map from purge id to PurgeStatus - self._purges_by_id: Dict[str, PurgeStatus] = {} - # map from purge id to DeleteStatus - self._delete_by_id: Dict[str, DeleteStatus] = {} - # map from room id to delete ids - # Dict[`room_id`, List[`delete_id`]] - self._delete_by_room: Dict[str, List[str]] = {} self._event_serializer = hs.get_event_client_serializer() self._retention_default_max_lifetime = ( @@ -173,6 +99,9 @@ class PaginationHandler: self._retention_allowed_lifetime_max = ( hs.config.retention.retention_allowed_lifetime_max ) + self._forgotten_room_retention_period = ( + hs.config.server.forgotten_room_retention_period + ) self._is_master = hs.config.worker.worker_app is None if hs.config.retention.retention_enabled and self._is_master: @@ -189,6 +118,14 @@ class PaginationHandler: job.longest_max_lifetime, ) + self._task_scheduler.register_action( + self._purge_history, PURGE_HISTORY_ACTION_NAME + ) + self._task_scheduler.register_action(self._purge_room, PURGE_ROOM_ACTION_NAME) + self._task_scheduler.register_action( + self._shutdown_and_purge_room, SHUTDOWN_AND_PURGE_ROOM_ACTION_NAME + ) + async def purge_history_for_rooms_in_range( self, min_ms: Optional[int], max_ms: Optional[int] ) -> None: @@ -224,7 +161,7 @@ class PaginationHandler: include_null = False logger.info( - "[purge] Running purge job for %s < max_lifetime <= %s (include NULLs = %s)", + "[purge] Running retention purge job for %s < max_lifetime <= %s (include NULLs = %s)", min_ms, max_ms, include_null, @@ -239,10 +176,10 @@ class PaginationHandler: for room_id, retention_policy in rooms.items(): logger.info("[purge] Attempting to purge messages in room %s", room_id) - if room_id in self._purges_in_progress_by_room: + if len(await self.get_delete_tasks_by_room(room_id, only_active=True)) > 0: logger.warning( - "[purge] not purging room %s as there's an ongoing purge running" - " for this room", + "[purge] not purging room %s for retention as there's an ongoing purge" + " running for this room", room_id, ) continue @@ -295,27 +232,20 @@ class PaginationHandler: (stream, topo, _event_id) = r token = "t%d-%d" % (topo, stream) - purge_id = random_string(16) - - self._purges_by_id[purge_id] = PurgeStatus() - - logger.info( - "Starting purging events in room %s (purge_id %s)" % (room_id, purge_id) - ) + logger.info("Starting purging events in room %s", room_id) # We want to purge everything, including local events, and to run the purge in # the background so that it's not blocking any other operation apart from # other purges in the same room. run_as_background_process( - "_purge_history", - self._purge_history, - purge_id, + PURGE_HISTORY_ACTION_NAME, + self.purge_history, room_id, token, True, ) - def start_purge_history( + async def start_purge_history( self, room_id: str, token: str, delete_local_events: bool = False ) -> str: """Start off a history purge on a room. @@ -329,40 +259,58 @@ class PaginationHandler: Returns: unique ID for this purge transaction. """ - if room_id in self._purges_in_progress_by_room: - raise SynapseError( - 400, "History purge already in progress for %s" % (room_id,) - ) - - purge_id = random_string(16) + purge_id = await self._task_scheduler.schedule_task( + PURGE_HISTORY_ACTION_NAME, + resource_id=room_id, + params={"token": token, "delete_local_events": delete_local_events}, + ) # we log the purge_id here so that it can be tied back to the # request id in the log lines. logger.info("[purge] starting purge_id %s", purge_id) - self._purges_by_id[purge_id] = PurgeStatus() - run_as_background_process( - "purge_history", - self._purge_history, - purge_id, - room_id, - token, - delete_local_events, - ) return purge_id async def _purge_history( - self, purge_id: str, room_id: str, token: str, delete_local_events: bool - ) -> None: + self, + task: ScheduledTask, + ) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]: + """ + Scheduler action to purge some history of a room. + """ + if ( + task.resource_id is None + or task.params is None + or "token" not in task.params + or "delete_local_events" not in task.params + ): + return ( + TaskStatus.FAILED, + None, + "Not enough parameters passed to _purge_history", + ) + err = await self.purge_history( + task.resource_id, + task.params["token"], + task.params["delete_local_events"], + ) + if err is not None: + return TaskStatus.FAILED, None, err + return TaskStatus.COMPLETE, None, None + + async def purge_history( + self, + room_id: str, + token: str, + delete_local_events: bool, + ) -> Optional[str]: """Carry out a history purge on a room. Args: - purge_id: The ID for this purge. room_id: The room to purge from token: topological token to delete events before delete_local_events: True to delete local events as well as remote ones """ - self._purges_in_progress_by_room.add(room_id) try: async with self._worker_locks.acquire_read_write_lock( PURGE_PAGINATION_LOCK_NAME, room_id, write=True @@ -371,57 +319,68 @@ class PaginationHandler: room_id, token, delete_local_events ) logger.info("[purge] complete") - self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE + return None except Exception: f = Failure() logger.error( "[purge] failed", exc_info=(f.type, f.value, f.getTracebackObject()) ) - self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED - self._purges_by_id[purge_id].error = f.getErrorMessage() - finally: - self._purges_in_progress_by_room.discard(room_id) - - # remove the purge from the list 24 hours after it completes - def clear_purge() -> None: - del self._purges_by_id[purge_id] - - self.hs.get_reactor().callLater( - PaginationHandler.CLEAR_PURGE_AFTER_MS / 1000, clear_purge - ) - - def get_purge_status(self, purge_id: str) -> Optional[PurgeStatus]: - """Get the current status of an active purge + return f.getErrorMessage() - Args: - purge_id: purge_id returned by start_purge_history - """ - return self._purges_by_id.get(purge_id) - - def get_delete_status(self, delete_id: str) -> Optional[DeleteStatus]: + async def get_delete_task(self, delete_id: str) -> Optional[ScheduledTask]: """Get the current status of an active deleting Args: delete_id: delete_id returned by start_shutdown_and_purge_room + or start_purge_history. """ - return self._delete_by_id.get(delete_id) + return await self._task_scheduler.get_task(delete_id) - def get_delete_ids_by_room(self, room_id: str) -> Optional[StrCollection]: - """Get all active delete ids by room + async def get_delete_tasks_by_room( + self, room_id: str, only_active: Optional[bool] = False + ) -> List[ScheduledTask]: + """Get complete, failed or active delete tasks by room Args: room_id: room_id that is deleted + only_active: if True, completed&failed tasks will be omitted + """ + statuses = [TaskStatus.ACTIVE] + if not only_active: + statuses += [TaskStatus.COMPLETE, TaskStatus.FAILED] + + return await self._task_scheduler.get_tasks( + actions=[PURGE_ROOM_ACTION_NAME, SHUTDOWN_AND_PURGE_ROOM_ACTION_NAME], + resource_id=room_id, + statuses=statuses, + ) + + async def _purge_room( + self, + task: ScheduledTask, + ) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]: + """ + Scheduler action to purge a room. """ - return self._delete_by_room.get(room_id) + if not task.resource_id: + raise Exception("No room id passed to purge_room task") + params = task.params if task.params else {} + await self.purge_room(task.resource_id, params.get("force", False)) + return TaskStatus.COMPLETE, None, None - async def purge_room(self, room_id: str, force: bool = False) -> None: + async def purge_room( + self, + room_id: str, + force: bool, + ) -> None: """Purge the given room from the database. - This function is part the delete room v1 API. Args: room_id: room to be purged force: set true to skip checking for joined users. """ + logger.info("starting purge room_id=%s force=%s", room_id, force) + async with self._worker_locks.acquire_multi_read_write_lock( [ (PURGE_PAGINATION_LOCK_NAME, room_id), @@ -430,13 +389,20 @@ class PaginationHandler: write=True, ): # first check that we have no users in this room - if not force: - joined = await self.store.is_host_joined(room_id, self._server_name) - if joined: + joined = await self.store.is_host_joined(room_id, self._server_name) + if joined: + if force: + logger.info( + "force-purging room %s with some local users still joined", + room_id, + ) + else: raise SynapseError(400, "Users are still joined to this room") await self._storage_controllers.purge_events.purge_room(room_id) + logger.info("purge complete for room_id %s", room_id) + @trace async def get_messages( self, @@ -711,177 +677,72 @@ class PaginationHandler: async def _shutdown_and_purge_room( self, - delete_id: str, - room_id: str, - requester_user_id: Optional[str], - new_room_user_id: Optional[str] = None, - new_room_name: Optional[str] = None, - message: Optional[str] = None, - block: bool = False, - purge: bool = True, - force_purge: bool = False, - ) -> None: + task: ScheduledTask, + ) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]: """ - Shuts down and purges a room. - - See `RoomShutdownHandler.shutdown_room` for details of creation of the new room - - Args: - delete_id: The ID for this delete. - room_id: The ID of the room to shut down. - requester_user_id: - User who requested the action. Will be recorded as putting the room on the - blocking list. - If None, the action was not manually requested but instead - triggered automatically, e.g. through a Synapse module - or some other policy. - MUST NOT be None if block=True. - new_room_user_id: - If set, a new room will be created with this user ID - as the creator and admin, and all users in the old room will be - moved into that room. If not set, no new room will be created - and the users will just be removed from the old room. - new_room_name: - A string representing the name of the room that new users will - be invited to. Defaults to `Content Violation Notification` - message: - A string containing the first message that will be sent as - `new_room_user_id` in the new room. Ideally this will clearly - convey why the original room was shut down. - Defaults to `Sharing illegal content on this server is not - permitted and rooms in violation will be blocked.` - block: - If set to `true`, this room will be added to a blocking list, - preventing future attempts to join the room. Defaults to `false`. - purge: - If set to `true`, purge the given room from the database. - force_purge: - If set to `true`, the room will be purged from database - also if it fails to remove some users from room. - - Saves a `RoomShutdownHandler.ShutdownRoomResponse` in `DeleteStatus`: + Scheduler action to shutdown and purge a room. """ + if task.resource_id is None or task.params is None: + raise Exception( + "No room id and/or no parameters passed to shutdown_and_purge_room task" + ) - self._purges_in_progress_by_room.add(room_id) - try: - async with self._worker_locks.acquire_read_write_lock( - PURGE_PAGINATION_LOCK_NAME, room_id, write=True - ): - self._delete_by_id[delete_id].status = DeleteStatus.STATUS_SHUTTING_DOWN - self._delete_by_id[ - delete_id - ].shutdown_room = await self._room_shutdown_handler.shutdown_room( - room_id=room_id, - requester_user_id=requester_user_id, - new_room_user_id=new_room_user_id, - new_room_name=new_room_name, - message=message, - block=block, - ) - self._delete_by_id[delete_id].status = DeleteStatus.STATUS_PURGING + room_id = task.resource_id - if purge: - logger.info("starting purge room_id %s", room_id) + async def update_result(result: Optional[JsonMapping]) -> None: + await self._task_scheduler.update_task(task.id, result=result) - # first check that we have no users in this room - if not force_purge: - joined = await self.store.is_host_joined( - room_id, self._server_name - ) - if joined: - raise SynapseError( - 400, "Users are still joined to this room" - ) + shutdown_result = ( + cast(ShutdownRoomResponse, task.result) if task.result else None + ) - await self._storage_controllers.purge_events.purge_room(room_id) + shutdown_result = await self._room_shutdown_handler.shutdown_room( + room_id, + cast(ShutdownRoomParams, task.params), + shutdown_result, + update_result, + ) - logger.info("purge complete for room_id %s", room_id) - self._delete_by_id[delete_id].status = DeleteStatus.STATUS_COMPLETE - except Exception: - f = Failure() - logger.error( - "failed", - exc_info=(f.type, f.value, f.getTracebackObject()), - ) - self._delete_by_id[delete_id].status = DeleteStatus.STATUS_FAILED - self._delete_by_id[delete_id].error = f.getErrorMessage() - finally: - self._purges_in_progress_by_room.discard(room_id) - - # remove the delete from the list 24 hours after it completes - def clear_delete() -> None: - del self._delete_by_id[delete_id] - self._delete_by_room[room_id].remove(delete_id) - if not self._delete_by_room[room_id]: - del self._delete_by_room[room_id] - - self.hs.get_reactor().callLater( - PaginationHandler.CLEAR_PURGE_AFTER_MS / 1000, clear_delete + if task.params.get("purge", False): + await self.purge_room( + room_id, + task.params.get("force_purge", False), ) - def start_shutdown_and_purge_room( + return (TaskStatus.COMPLETE, shutdown_result, None) + + async def start_shutdown_and_purge_room( self, room_id: str, - requester_user_id: Optional[str], - new_room_user_id: Optional[str] = None, - new_room_name: Optional[str] = None, - message: Optional[str] = None, - block: bool = False, - purge: bool = True, - force_purge: bool = False, + shutdown_params: ShutdownRoomParams, ) -> str: """Start off shut down and purge on a room. Args: room_id: The ID of the room to shut down. - requester_user_id: - User who requested the action and put the room on the - blocking list. - If None, the action was not manually requested but instead - triggered automatically, e.g. through a Synapse module - or some other policy. - MUST NOT be None if block=True. - new_room_user_id: - If set, a new room will be created with this user ID - as the creator and admin, and all users in the old room will be - moved into that room. If not set, no new room will be created - and the users will just be removed from the old room. - new_room_name: - A string representing the name of the room that new users will - be invited to. Defaults to `Content Violation Notification` - message: - A string containing the first message that will be sent as - `new_room_user_id` in the new room. Ideally this will clearly - convey why the original room was shut down. - Defaults to `Sharing illegal content on this server is not - permitted and rooms in violation will be blocked.` - block: - If set to `true`, this room will be added to a blocking list, - preventing future attempts to join the room. Defaults to `false`. - purge: - If set to `true`, purge the given room from the database. - force_purge: - If set to `true`, the room will be purged from database - also if it fails to remove some users from room. + shutdown_params: parameters for the shutdown Returns: unique ID for this delete transaction. """ - if room_id in self._purges_in_progress_by_room: - raise SynapseError( - 400, "History purge already in progress for %s" % (room_id,) - ) + if len(await self.get_delete_tasks_by_room(room_id, only_active=True)) > 0: + raise SynapseError(400, "Purge already in progress for %s" % (room_id,)) # This check is double to `RoomShutdownHandler.shutdown_room` # But here the requester get a direct response / error with HTTP request # and do not have to check the purge status + new_room_user_id = shutdown_params["new_room_user_id"] if new_room_user_id is not None: if not self.hs.is_mine_id(new_room_user_id): raise SynapseError( 400, "User must be our own: %s" % (new_room_user_id,) ) - delete_id = random_string(16) + delete_id = await self._task_scheduler.schedule_task( + SHUTDOWN_AND_PURGE_ROOM_ACTION_NAME, + resource_id=room_id, + params=shutdown_params, + ) # we log the delete_id here so that it can be tied back to the # request id in the log lines. @@ -891,19 +752,4 @@ class PaginationHandler: delete_id, ) - self._delete_by_id[delete_id] = DeleteStatus() - self._delete_by_room.setdefault(room_id, []).append(delete_id) - run_as_background_process( - "shutdown_and_purge_room", - self._shutdown_and_purge_room, - delete_id, - room_id, - requester_user_id, - new_room_user_id, - new_room_name, - message, - block, - purge, - force_purge, - ) return delete_id |