diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 19cf5a2b43..878f267a4e 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -13,9 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
-from typing import TYPE_CHECKING, Dict, List, Optional, Set
-
-import attr
+from typing import TYPE_CHECKING, List, Optional, Set, Tuple, cast
from twisted.python.failure import Failure
@@ -23,16 +21,22 @@ from synapse.api.constants import Direction, EventTypes, Membership
from synapse.api.errors import SynapseError
from synapse.api.filtering import Filter
from synapse.events.utils import SerializeEventConfig
-from synapse.handlers.room import ShutdownRoomResponse
+from synapse.handlers.room import ShutdownRoomParams, ShutdownRoomResponse
from synapse.handlers.worker_lock import NEW_EVENT_DURING_PURGE_LOCK_NAME
from synapse.logging.opentracing import trace
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.rest.admin._base import assert_user_is_admin
from synapse.streams.config import PaginationConfig
-from synapse.types import JsonDict, Requester, StrCollection, StreamKeyType
+from synapse.types import (
+ JsonDict,
+ JsonMapping,
+ Requester,
+ ScheduledTask,
+ StreamKeyType,
+ TaskStatus,
+)
from synapse.types.state import StateFilter
from synapse.util.async_helpers import ReadWriteLock
-from synapse.util.stringutils import random_string
from synapse.visibility import filter_events_for_client
if TYPE_CHECKING:
@@ -53,80 +57,11 @@ BACKFILL_BECAUSE_TOO_MANY_GAPS_THRESHOLD = 3
PURGE_PAGINATION_LOCK_NAME = "purge_pagination_lock"
-@attr.s(slots=True, auto_attribs=True)
-class PurgeStatus:
- """Object tracking the status of a purge request
-
- This class contains information on the progress of a purge request, for
- return by get_purge_status.
- """
-
- STATUS_ACTIVE = 0
- STATUS_COMPLETE = 1
- STATUS_FAILED = 2
-
- STATUS_TEXT = {
- STATUS_ACTIVE: "active",
- STATUS_COMPLETE: "complete",
- STATUS_FAILED: "failed",
- }
-
- # Save the error message if an error occurs
- error: str = ""
-
- # Tracks whether this request has completed. One of STATUS_{ACTIVE,COMPLETE,FAILED}.
- status: int = STATUS_ACTIVE
-
- def asdict(self) -> JsonDict:
- ret = {"status": PurgeStatus.STATUS_TEXT[self.status]}
- if self.error:
- ret["error"] = self.error
- return ret
-
-
-@attr.s(slots=True, auto_attribs=True)
-class DeleteStatus:
- """Object tracking the status of a delete room request
+PURGE_HISTORY_ACTION_NAME = "purge_history"
- This class contains information on the progress of a delete room request, for
- return by get_delete_status.
- """
+PURGE_ROOM_ACTION_NAME = "purge_room"
- STATUS_PURGING = 0
- STATUS_COMPLETE = 1
- STATUS_FAILED = 2
- STATUS_SHUTTING_DOWN = 3
-
- STATUS_TEXT = {
- STATUS_PURGING: "purging",
- STATUS_COMPLETE: "complete",
- STATUS_FAILED: "failed",
- STATUS_SHUTTING_DOWN: "shutting_down",
- }
-
- # Tracks whether this request has completed.
- # One of STATUS_{PURGING,COMPLETE,FAILED,SHUTTING_DOWN}.
- status: int = STATUS_PURGING
-
- # Save the error message if an error occurs
- error: str = ""
-
- # Saves the result of an action to give it back to REST API
- shutdown_room: ShutdownRoomResponse = {
- "kicked_users": [],
- "failed_to_kick_users": [],
- "local_aliases": [],
- "new_room_id": None,
- }
-
- def asdict(self) -> JsonDict:
- ret = {
- "status": DeleteStatus.STATUS_TEXT[self.status],
- "shutdown_room": self.shutdown_room,
- }
- if self.error:
- ret["error"] = self.error
- return ret
+SHUTDOWN_AND_PURGE_ROOM_ACTION_NAME = "shutdown_and_purge_room"
class PaginationHandler:
@@ -136,9 +71,6 @@ class PaginationHandler:
paginating during a purge.
"""
- # when to remove a completed deletion/purge from the results map
- CLEAR_PURGE_AFTER_MS = 1000 * 3600 * 24 # 24 hours
-
def __init__(self, hs: "HomeServer"):
self.hs = hs
self.auth = hs.get_auth()
@@ -150,17 +82,11 @@ class PaginationHandler:
self._room_shutdown_handler = hs.get_room_shutdown_handler()
self._relations_handler = hs.get_relations_handler()
self._worker_locks = hs.get_worker_locks_handler()
+ self._task_scheduler = hs.get_task_scheduler()
self.pagination_lock = ReadWriteLock()
# IDs of rooms in which there currently an active purge *or delete* operation.
self._purges_in_progress_by_room: Set[str] = set()
- # map from purge id to PurgeStatus
- self._purges_by_id: Dict[str, PurgeStatus] = {}
- # map from purge id to DeleteStatus
- self._delete_by_id: Dict[str, DeleteStatus] = {}
- # map from room id to delete ids
- # Dict[`room_id`, List[`delete_id`]]
- self._delete_by_room: Dict[str, List[str]] = {}
self._event_serializer = hs.get_event_client_serializer()
self._retention_default_max_lifetime = (
@@ -173,6 +99,9 @@ class PaginationHandler:
self._retention_allowed_lifetime_max = (
hs.config.retention.retention_allowed_lifetime_max
)
+ self._forgotten_room_retention_period = (
+ hs.config.server.forgotten_room_retention_period
+ )
self._is_master = hs.config.worker.worker_app is None
if hs.config.retention.retention_enabled and self._is_master:
@@ -189,6 +118,14 @@ class PaginationHandler:
job.longest_max_lifetime,
)
+ self._task_scheduler.register_action(
+ self._purge_history, PURGE_HISTORY_ACTION_NAME
+ )
+ self._task_scheduler.register_action(self._purge_room, PURGE_ROOM_ACTION_NAME)
+ self._task_scheduler.register_action(
+ self._shutdown_and_purge_room, SHUTDOWN_AND_PURGE_ROOM_ACTION_NAME
+ )
+
async def purge_history_for_rooms_in_range(
self, min_ms: Optional[int], max_ms: Optional[int]
) -> None:
@@ -224,7 +161,7 @@ class PaginationHandler:
include_null = False
logger.info(
- "[purge] Running purge job for %s < max_lifetime <= %s (include NULLs = %s)",
+ "[purge] Running retention purge job for %s < max_lifetime <= %s (include NULLs = %s)",
min_ms,
max_ms,
include_null,
@@ -239,10 +176,10 @@ class PaginationHandler:
for room_id, retention_policy in rooms.items():
logger.info("[purge] Attempting to purge messages in room %s", room_id)
- if room_id in self._purges_in_progress_by_room:
+ if len(await self.get_delete_tasks_by_room(room_id, only_active=True)) > 0:
logger.warning(
- "[purge] not purging room %s as there's an ongoing purge running"
- " for this room",
+ "[purge] not purging room %s for retention as there's an ongoing purge"
+ " running for this room",
room_id,
)
continue
@@ -295,27 +232,20 @@ class PaginationHandler:
(stream, topo, _event_id) = r
token = "t%d-%d" % (topo, stream)
- purge_id = random_string(16)
-
- self._purges_by_id[purge_id] = PurgeStatus()
-
- logger.info(
- "Starting purging events in room %s (purge_id %s)" % (room_id, purge_id)
- )
+ logger.info("Starting purging events in room %s", room_id)
# We want to purge everything, including local events, and to run the purge in
# the background so that it's not blocking any other operation apart from
# other purges in the same room.
run_as_background_process(
- "_purge_history",
- self._purge_history,
- purge_id,
+ PURGE_HISTORY_ACTION_NAME,
+ self.purge_history,
room_id,
token,
True,
)
- def start_purge_history(
+ async def start_purge_history(
self, room_id: str, token: str, delete_local_events: bool = False
) -> str:
"""Start off a history purge on a room.
@@ -329,40 +259,58 @@ class PaginationHandler:
Returns:
unique ID for this purge transaction.
"""
- if room_id in self._purges_in_progress_by_room:
- raise SynapseError(
- 400, "History purge already in progress for %s" % (room_id,)
- )
-
- purge_id = random_string(16)
+ purge_id = await self._task_scheduler.schedule_task(
+ PURGE_HISTORY_ACTION_NAME,
+ resource_id=room_id,
+ params={"token": token, "delete_local_events": delete_local_events},
+ )
# we log the purge_id here so that it can be tied back to the
# request id in the log lines.
logger.info("[purge] starting purge_id %s", purge_id)
- self._purges_by_id[purge_id] = PurgeStatus()
- run_as_background_process(
- "purge_history",
- self._purge_history,
- purge_id,
- room_id,
- token,
- delete_local_events,
- )
return purge_id
async def _purge_history(
- self, purge_id: str, room_id: str, token: str, delete_local_events: bool
- ) -> None:
+ self,
+ task: ScheduledTask,
+ ) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]:
+ """
+ Scheduler action to purge some history of a room.
+ """
+ if (
+ task.resource_id is None
+ or task.params is None
+ or "token" not in task.params
+ or "delete_local_events" not in task.params
+ ):
+ return (
+ TaskStatus.FAILED,
+ None,
+ "Not enough parameters passed to _purge_history",
+ )
+ err = await self.purge_history(
+ task.resource_id,
+ task.params["token"],
+ task.params["delete_local_events"],
+ )
+ if err is not None:
+ return TaskStatus.FAILED, None, err
+ return TaskStatus.COMPLETE, None, None
+
+ async def purge_history(
+ self,
+ room_id: str,
+ token: str,
+ delete_local_events: bool,
+ ) -> Optional[str]:
"""Carry out a history purge on a room.
Args:
- purge_id: The ID for this purge.
room_id: The room to purge from
token: topological token to delete events before
delete_local_events: True to delete local events as well as remote ones
"""
- self._purges_in_progress_by_room.add(room_id)
try:
async with self._worker_locks.acquire_read_write_lock(
PURGE_PAGINATION_LOCK_NAME, room_id, write=True
@@ -371,57 +319,68 @@ class PaginationHandler:
room_id, token, delete_local_events
)
logger.info("[purge] complete")
- self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
+ return None
except Exception:
f = Failure()
logger.error(
"[purge] failed", exc_info=(f.type, f.value, f.getTracebackObject())
)
- self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED
- self._purges_by_id[purge_id].error = f.getErrorMessage()
- finally:
- self._purges_in_progress_by_room.discard(room_id)
-
- # remove the purge from the list 24 hours after it completes
- def clear_purge() -> None:
- del self._purges_by_id[purge_id]
-
- self.hs.get_reactor().callLater(
- PaginationHandler.CLEAR_PURGE_AFTER_MS / 1000, clear_purge
- )
-
- def get_purge_status(self, purge_id: str) -> Optional[PurgeStatus]:
- """Get the current status of an active purge
+ return f.getErrorMessage()
- Args:
- purge_id: purge_id returned by start_purge_history
- """
- return self._purges_by_id.get(purge_id)
-
- def get_delete_status(self, delete_id: str) -> Optional[DeleteStatus]:
+ async def get_delete_task(self, delete_id: str) -> Optional[ScheduledTask]:
"""Get the current status of an active deleting
Args:
delete_id: delete_id returned by start_shutdown_and_purge_room
+ or start_purge_history.
"""
- return self._delete_by_id.get(delete_id)
+ return await self._task_scheduler.get_task(delete_id)
- def get_delete_ids_by_room(self, room_id: str) -> Optional[StrCollection]:
- """Get all active delete ids by room
+ async def get_delete_tasks_by_room(
+ self, room_id: str, only_active: Optional[bool] = False
+ ) -> List[ScheduledTask]:
+ """Get complete, failed or active delete tasks by room
Args:
room_id: room_id that is deleted
+ only_active: if True, completed&failed tasks will be omitted
+ """
+ statuses = [TaskStatus.ACTIVE]
+ if not only_active:
+ statuses += [TaskStatus.COMPLETE, TaskStatus.FAILED]
+
+ return await self._task_scheduler.get_tasks(
+ actions=[PURGE_ROOM_ACTION_NAME, SHUTDOWN_AND_PURGE_ROOM_ACTION_NAME],
+ resource_id=room_id,
+ statuses=statuses,
+ )
+
+ async def _purge_room(
+ self,
+ task: ScheduledTask,
+ ) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]:
+ """
+ Scheduler action to purge a room.
"""
- return self._delete_by_room.get(room_id)
+ if not task.resource_id:
+ raise Exception("No room id passed to purge_room task")
+ params = task.params if task.params else {}
+ await self.purge_room(task.resource_id, params.get("force", False))
+ return TaskStatus.COMPLETE, None, None
- async def purge_room(self, room_id: str, force: bool = False) -> None:
+ async def purge_room(
+ self,
+ room_id: str,
+ force: bool,
+ ) -> None:
"""Purge the given room from the database.
- This function is part the delete room v1 API.
Args:
room_id: room to be purged
force: set true to skip checking for joined users.
"""
+ logger.info("starting purge room_id=%s force=%s", room_id, force)
+
async with self._worker_locks.acquire_multi_read_write_lock(
[
(PURGE_PAGINATION_LOCK_NAME, room_id),
@@ -430,13 +389,20 @@ class PaginationHandler:
write=True,
):
# first check that we have no users in this room
- if not force:
- joined = await self.store.is_host_joined(room_id, self._server_name)
- if joined:
+ joined = await self.store.is_host_joined(room_id, self._server_name)
+ if joined:
+ if force:
+ logger.info(
+ "force-purging room %s with some local users still joined",
+ room_id,
+ )
+ else:
raise SynapseError(400, "Users are still joined to this room")
await self._storage_controllers.purge_events.purge_room(room_id)
+ logger.info("purge complete for room_id %s", room_id)
+
@trace
async def get_messages(
self,
@@ -711,177 +677,72 @@ class PaginationHandler:
async def _shutdown_and_purge_room(
self,
- delete_id: str,
- room_id: str,
- requester_user_id: Optional[str],
- new_room_user_id: Optional[str] = None,
- new_room_name: Optional[str] = None,
- message: Optional[str] = None,
- block: bool = False,
- purge: bool = True,
- force_purge: bool = False,
- ) -> None:
+ task: ScheduledTask,
+ ) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]:
"""
- Shuts down and purges a room.
-
- See `RoomShutdownHandler.shutdown_room` for details of creation of the new room
-
- Args:
- delete_id: The ID for this delete.
- room_id: The ID of the room to shut down.
- requester_user_id:
- User who requested the action. Will be recorded as putting the room on the
- blocking list.
- If None, the action was not manually requested but instead
- triggered automatically, e.g. through a Synapse module
- or some other policy.
- MUST NOT be None if block=True.
- new_room_user_id:
- If set, a new room will be created with this user ID
- as the creator and admin, and all users in the old room will be
- moved into that room. If not set, no new room will be created
- and the users will just be removed from the old room.
- new_room_name:
- A string representing the name of the room that new users will
- be invited to. Defaults to `Content Violation Notification`
- message:
- A string containing the first message that will be sent as
- `new_room_user_id` in the new room. Ideally this will clearly
- convey why the original room was shut down.
- Defaults to `Sharing illegal content on this server is not
- permitted and rooms in violation will be blocked.`
- block:
- If set to `true`, this room will be added to a blocking list,
- preventing future attempts to join the room. Defaults to `false`.
- purge:
- If set to `true`, purge the given room from the database.
- force_purge:
- If set to `true`, the room will be purged from database
- also if it fails to remove some users from room.
-
- Saves a `RoomShutdownHandler.ShutdownRoomResponse` in `DeleteStatus`:
+ Scheduler action to shutdown and purge a room.
"""
+ if task.resource_id is None or task.params is None:
+ raise Exception(
+ "No room id and/or no parameters passed to shutdown_and_purge_room task"
+ )
- self._purges_in_progress_by_room.add(room_id)
- try:
- async with self._worker_locks.acquire_read_write_lock(
- PURGE_PAGINATION_LOCK_NAME, room_id, write=True
- ):
- self._delete_by_id[delete_id].status = DeleteStatus.STATUS_SHUTTING_DOWN
- self._delete_by_id[
- delete_id
- ].shutdown_room = await self._room_shutdown_handler.shutdown_room(
- room_id=room_id,
- requester_user_id=requester_user_id,
- new_room_user_id=new_room_user_id,
- new_room_name=new_room_name,
- message=message,
- block=block,
- )
- self._delete_by_id[delete_id].status = DeleteStatus.STATUS_PURGING
+ room_id = task.resource_id
- if purge:
- logger.info("starting purge room_id %s", room_id)
+ async def update_result(result: Optional[JsonMapping]) -> None:
+ await self._task_scheduler.update_task(task.id, result=result)
- # first check that we have no users in this room
- if not force_purge:
- joined = await self.store.is_host_joined(
- room_id, self._server_name
- )
- if joined:
- raise SynapseError(
- 400, "Users are still joined to this room"
- )
+ shutdown_result = (
+ cast(ShutdownRoomResponse, task.result) if task.result else None
+ )
- await self._storage_controllers.purge_events.purge_room(room_id)
+ shutdown_result = await self._room_shutdown_handler.shutdown_room(
+ room_id,
+ cast(ShutdownRoomParams, task.params),
+ shutdown_result,
+ update_result,
+ )
- logger.info("purge complete for room_id %s", room_id)
- self._delete_by_id[delete_id].status = DeleteStatus.STATUS_COMPLETE
- except Exception:
- f = Failure()
- logger.error(
- "failed",
- exc_info=(f.type, f.value, f.getTracebackObject()),
- )
- self._delete_by_id[delete_id].status = DeleteStatus.STATUS_FAILED
- self._delete_by_id[delete_id].error = f.getErrorMessage()
- finally:
- self._purges_in_progress_by_room.discard(room_id)
-
- # remove the delete from the list 24 hours after it completes
- def clear_delete() -> None:
- del self._delete_by_id[delete_id]
- self._delete_by_room[room_id].remove(delete_id)
- if not self._delete_by_room[room_id]:
- del self._delete_by_room[room_id]
-
- self.hs.get_reactor().callLater(
- PaginationHandler.CLEAR_PURGE_AFTER_MS / 1000, clear_delete
+ if task.params.get("purge", False):
+ await self.purge_room(
+ room_id,
+ task.params.get("force_purge", False),
)
- def start_shutdown_and_purge_room(
+ return (TaskStatus.COMPLETE, shutdown_result, None)
+
+ async def start_shutdown_and_purge_room(
self,
room_id: str,
- requester_user_id: Optional[str],
- new_room_user_id: Optional[str] = None,
- new_room_name: Optional[str] = None,
- message: Optional[str] = None,
- block: bool = False,
- purge: bool = True,
- force_purge: bool = False,
+ shutdown_params: ShutdownRoomParams,
) -> str:
"""Start off shut down and purge on a room.
Args:
room_id: The ID of the room to shut down.
- requester_user_id:
- User who requested the action and put the room on the
- blocking list.
- If None, the action was not manually requested but instead
- triggered automatically, e.g. through a Synapse module
- or some other policy.
- MUST NOT be None if block=True.
- new_room_user_id:
- If set, a new room will be created with this user ID
- as the creator and admin, and all users in the old room will be
- moved into that room. If not set, no new room will be created
- and the users will just be removed from the old room.
- new_room_name:
- A string representing the name of the room that new users will
- be invited to. Defaults to `Content Violation Notification`
- message:
- A string containing the first message that will be sent as
- `new_room_user_id` in the new room. Ideally this will clearly
- convey why the original room was shut down.
- Defaults to `Sharing illegal content on this server is not
- permitted and rooms in violation will be blocked.`
- block:
- If set to `true`, this room will be added to a blocking list,
- preventing future attempts to join the room. Defaults to `false`.
- purge:
- If set to `true`, purge the given room from the database.
- force_purge:
- If set to `true`, the room will be purged from database
- also if it fails to remove some users from room.
+ shutdown_params: parameters for the shutdown
Returns:
unique ID for this delete transaction.
"""
- if room_id in self._purges_in_progress_by_room:
- raise SynapseError(
- 400, "History purge already in progress for %s" % (room_id,)
- )
+ if len(await self.get_delete_tasks_by_room(room_id, only_active=True)) > 0:
+ raise SynapseError(400, "Purge already in progress for %s" % (room_id,))
# This check is double to `RoomShutdownHandler.shutdown_room`
# But here the requester get a direct response / error with HTTP request
# and do not have to check the purge status
+ new_room_user_id = shutdown_params["new_room_user_id"]
if new_room_user_id is not None:
if not self.hs.is_mine_id(new_room_user_id):
raise SynapseError(
400, "User must be our own: %s" % (new_room_user_id,)
)
- delete_id = random_string(16)
+ delete_id = await self._task_scheduler.schedule_task(
+ SHUTDOWN_AND_PURGE_ROOM_ACTION_NAME,
+ resource_id=room_id,
+ params=shutdown_params,
+ )
# we log the delete_id here so that it can be tied back to the
# request id in the log lines.
@@ -891,19 +752,4 @@ class PaginationHandler:
delete_id,
)
- self._delete_by_id[delete_id] = DeleteStatus()
- self._delete_by_room.setdefault(room_id, []).append(delete_id)
- run_as_background_process(
- "shutdown_and_purge_room",
- self._shutdown_and_purge_room,
- delete_id,
- room_id,
- requester_user_id,
- new_room_user_id,
- new_room_name,
- message,
- block,
- purge,
- force_purge,
- )
return delete_id
|