diff --git a/changelog.d/15488.feature b/changelog.d/15488.feature
new file mode 100644
index 0000000000..8684d84192
--- /dev/null
+++ b/changelog.d/15488.feature
@@ -0,0 +1 @@
+Add automatic purge after all users forgotten a room. Also add restore of purge/shutdown rooms after a synapse restart.
diff --git a/docs/usage/configuration/config_documentation.md b/docs/usage/configuration/config_documentation.md
index a06b3d8a06..885a7bf0a3 100644
--- a/docs/usage/configuration/config_documentation.md
+++ b/docs/usage/configuration/config_documentation.md
@@ -936,6 +936,17 @@ Example configuration:
redaction_retention_period: 28d
```
---
+### `forgotten_room_retention_period`
+
+How long to keep locally forgotten rooms before purging them from the DB.
+
+Defaults to `null`, meaning it's disabled.
+
+Example configuration:
+```yaml
+forgotten_room_retention_period: 28d
+```
+---
### `user_ips_max_age`
How long to track users' last seen time and IPs in the database.
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index d25e3548e0..f7c80eee21 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -77,6 +77,7 @@ from synapse.storage.databases.main.monthly_active_users import (
)
from synapse.storage.databases.main.presence import PresenceStore
from synapse.storage.databases.main.profile import ProfileWorkerStore
+from synapse.storage.databases.main.purge_events import PurgeEventsStore
from synapse.storage.databases.main.push_rule import PushRulesWorkerStore
from synapse.storage.databases.main.pusher import PusherWorkerStore
from synapse.storage.databases.main.receipts import ReceiptsWorkerStore
@@ -134,6 +135,7 @@ class GenericWorkerStore(
RelationsWorkerStore,
EventFederationWorkerStore,
EventPushActionsWorkerStore,
+ PurgeEventsStore,
StateGroupWorkerStore,
SignatureWorkerStore,
UserErasureWorkerStore,
diff --git a/synapse/config/server.py b/synapse/config/server.py
index b46fa51593..72d30da300 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -486,6 +486,17 @@ class ServerConfig(Config):
else:
self.redaction_retention_period = None
+ # How long to keep locally forgotten rooms before purging them from the DB.
+ forgotten_room_retention_period = config.get(
+ "forgotten_room_retention_period", None
+ )
+ if forgotten_room_retention_period is not None:
+ self.forgotten_room_retention_period: Optional[int] = self.parse_duration(
+ forgotten_room_retention_period
+ )
+ else:
+ self.forgotten_room_retention_period = None
+
# How long to keep entries in the `users_ips` table.
user_ips_max_age = config.get("user_ips_max_age", "28d")
if user_ips_max_age is not None:
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 19cf5a2b43..878f267a4e 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -13,9 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
-from typing import TYPE_CHECKING, Dict, List, Optional, Set
-
-import attr
+from typing import TYPE_CHECKING, List, Optional, Set, Tuple, cast
from twisted.python.failure import Failure
@@ -23,16 +21,22 @@ from synapse.api.constants import Direction, EventTypes, Membership
from synapse.api.errors import SynapseError
from synapse.api.filtering import Filter
from synapse.events.utils import SerializeEventConfig
-from synapse.handlers.room import ShutdownRoomResponse
+from synapse.handlers.room import ShutdownRoomParams, ShutdownRoomResponse
from synapse.handlers.worker_lock import NEW_EVENT_DURING_PURGE_LOCK_NAME
from synapse.logging.opentracing import trace
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.rest.admin._base import assert_user_is_admin
from synapse.streams.config import PaginationConfig
-from synapse.types import JsonDict, Requester, StrCollection, StreamKeyType
+from synapse.types import (
+ JsonDict,
+ JsonMapping,
+ Requester,
+ ScheduledTask,
+ StreamKeyType,
+ TaskStatus,
+)
from synapse.types.state import StateFilter
from synapse.util.async_helpers import ReadWriteLock
-from synapse.util.stringutils import random_string
from synapse.visibility import filter_events_for_client
if TYPE_CHECKING:
@@ -53,80 +57,11 @@ BACKFILL_BECAUSE_TOO_MANY_GAPS_THRESHOLD = 3
PURGE_PAGINATION_LOCK_NAME = "purge_pagination_lock"
-@attr.s(slots=True, auto_attribs=True)
-class PurgeStatus:
- """Object tracking the status of a purge request
-
- This class contains information on the progress of a purge request, for
- return by get_purge_status.
- """
-
- STATUS_ACTIVE = 0
- STATUS_COMPLETE = 1
- STATUS_FAILED = 2
-
- STATUS_TEXT = {
- STATUS_ACTIVE: "active",
- STATUS_COMPLETE: "complete",
- STATUS_FAILED: "failed",
- }
-
- # Save the error message if an error occurs
- error: str = ""
-
- # Tracks whether this request has completed. One of STATUS_{ACTIVE,COMPLETE,FAILED}.
- status: int = STATUS_ACTIVE
-
- def asdict(self) -> JsonDict:
- ret = {"status": PurgeStatus.STATUS_TEXT[self.status]}
- if self.error:
- ret["error"] = self.error
- return ret
-
-
-@attr.s(slots=True, auto_attribs=True)
-class DeleteStatus:
- """Object tracking the status of a delete room request
+PURGE_HISTORY_ACTION_NAME = "purge_history"
- This class contains information on the progress of a delete room request, for
- return by get_delete_status.
- """
+PURGE_ROOM_ACTION_NAME = "purge_room"
- STATUS_PURGING = 0
- STATUS_COMPLETE = 1
- STATUS_FAILED = 2
- STATUS_SHUTTING_DOWN = 3
-
- STATUS_TEXT = {
- STATUS_PURGING: "purging",
- STATUS_COMPLETE: "complete",
- STATUS_FAILED: "failed",
- STATUS_SHUTTING_DOWN: "shutting_down",
- }
-
- # Tracks whether this request has completed.
- # One of STATUS_{PURGING,COMPLETE,FAILED,SHUTTING_DOWN}.
- status: int = STATUS_PURGING
-
- # Save the error message if an error occurs
- error: str = ""
-
- # Saves the result of an action to give it back to REST API
- shutdown_room: ShutdownRoomResponse = {
- "kicked_users": [],
- "failed_to_kick_users": [],
- "local_aliases": [],
- "new_room_id": None,
- }
-
- def asdict(self) -> JsonDict:
- ret = {
- "status": DeleteStatus.STATUS_TEXT[self.status],
- "shutdown_room": self.shutdown_room,
- }
- if self.error:
- ret["error"] = self.error
- return ret
+SHUTDOWN_AND_PURGE_ROOM_ACTION_NAME = "shutdown_and_purge_room"
class PaginationHandler:
@@ -136,9 +71,6 @@ class PaginationHandler:
paginating during a purge.
"""
- # when to remove a completed deletion/purge from the results map
- CLEAR_PURGE_AFTER_MS = 1000 * 3600 * 24 # 24 hours
-
def __init__(self, hs: "HomeServer"):
self.hs = hs
self.auth = hs.get_auth()
@@ -150,17 +82,11 @@ class PaginationHandler:
self._room_shutdown_handler = hs.get_room_shutdown_handler()
self._relations_handler = hs.get_relations_handler()
self._worker_locks = hs.get_worker_locks_handler()
+ self._task_scheduler = hs.get_task_scheduler()
self.pagination_lock = ReadWriteLock()
# IDs of rooms in which there currently an active purge *or delete* operation.
self._purges_in_progress_by_room: Set[str] = set()
- # map from purge id to PurgeStatus
- self._purges_by_id: Dict[str, PurgeStatus] = {}
- # map from purge id to DeleteStatus
- self._delete_by_id: Dict[str, DeleteStatus] = {}
- # map from room id to delete ids
- # Dict[`room_id`, List[`delete_id`]]
- self._delete_by_room: Dict[str, List[str]] = {}
self._event_serializer = hs.get_event_client_serializer()
self._retention_default_max_lifetime = (
@@ -173,6 +99,9 @@ class PaginationHandler:
self._retention_allowed_lifetime_max = (
hs.config.retention.retention_allowed_lifetime_max
)
+ self._forgotten_room_retention_period = (
+ hs.config.server.forgotten_room_retention_period
+ )
self._is_master = hs.config.worker.worker_app is None
if hs.config.retention.retention_enabled and self._is_master:
@@ -189,6 +118,14 @@ class PaginationHandler:
job.longest_max_lifetime,
)
+ self._task_scheduler.register_action(
+ self._purge_history, PURGE_HISTORY_ACTION_NAME
+ )
+ self._task_scheduler.register_action(self._purge_room, PURGE_ROOM_ACTION_NAME)
+ self._task_scheduler.register_action(
+ self._shutdown_and_purge_room, SHUTDOWN_AND_PURGE_ROOM_ACTION_NAME
+ )
+
async def purge_history_for_rooms_in_range(
self, min_ms: Optional[int], max_ms: Optional[int]
) -> None:
@@ -224,7 +161,7 @@ class PaginationHandler:
include_null = False
logger.info(
- "[purge] Running purge job for %s < max_lifetime <= %s (include NULLs = %s)",
+ "[purge] Running retention purge job for %s < max_lifetime <= %s (include NULLs = %s)",
min_ms,
max_ms,
include_null,
@@ -239,10 +176,10 @@ class PaginationHandler:
for room_id, retention_policy in rooms.items():
logger.info("[purge] Attempting to purge messages in room %s", room_id)
- if room_id in self._purges_in_progress_by_room:
+ if len(await self.get_delete_tasks_by_room(room_id, only_active=True)) > 0:
logger.warning(
- "[purge] not purging room %s as there's an ongoing purge running"
- " for this room",
+ "[purge] not purging room %s for retention as there's an ongoing purge"
+ " running for this room",
room_id,
)
continue
@@ -295,27 +232,20 @@ class PaginationHandler:
(stream, topo, _event_id) = r
token = "t%d-%d" % (topo, stream)
- purge_id = random_string(16)
-
- self._purges_by_id[purge_id] = PurgeStatus()
-
- logger.info(
- "Starting purging events in room %s (purge_id %s)" % (room_id, purge_id)
- )
+ logger.info("Starting purging events in room %s", room_id)
# We want to purge everything, including local events, and to run the purge in
# the background so that it's not blocking any other operation apart from
# other purges in the same room.
run_as_background_process(
- "_purge_history",
- self._purge_history,
- purge_id,
+ PURGE_HISTORY_ACTION_NAME,
+ self.purge_history,
room_id,
token,
True,
)
- def start_purge_history(
+ async def start_purge_history(
self, room_id: str, token: str, delete_local_events: bool = False
) -> str:
"""Start off a history purge on a room.
@@ -329,40 +259,58 @@ class PaginationHandler:
Returns:
unique ID for this purge transaction.
"""
- if room_id in self._purges_in_progress_by_room:
- raise SynapseError(
- 400, "History purge already in progress for %s" % (room_id,)
- )
-
- purge_id = random_string(16)
+ purge_id = await self._task_scheduler.schedule_task(
+ PURGE_HISTORY_ACTION_NAME,
+ resource_id=room_id,
+ params={"token": token, "delete_local_events": delete_local_events},
+ )
# we log the purge_id here so that it can be tied back to the
# request id in the log lines.
logger.info("[purge] starting purge_id %s", purge_id)
- self._purges_by_id[purge_id] = PurgeStatus()
- run_as_background_process(
- "purge_history",
- self._purge_history,
- purge_id,
- room_id,
- token,
- delete_local_events,
- )
return purge_id
async def _purge_history(
- self, purge_id: str, room_id: str, token: str, delete_local_events: bool
- ) -> None:
+ self,
+ task: ScheduledTask,
+ ) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]:
+ """
+ Scheduler action to purge some history of a room.
+ """
+ if (
+ task.resource_id is None
+ or task.params is None
+ or "token" not in task.params
+ or "delete_local_events" not in task.params
+ ):
+ return (
+ TaskStatus.FAILED,
+ None,
+ "Not enough parameters passed to _purge_history",
+ )
+ err = await self.purge_history(
+ task.resource_id,
+ task.params["token"],
+ task.params["delete_local_events"],
+ )
+ if err is not None:
+ return TaskStatus.FAILED, None, err
+ return TaskStatus.COMPLETE, None, None
+
+ async def purge_history(
+ self,
+ room_id: str,
+ token: str,
+ delete_local_events: bool,
+ ) -> Optional[str]:
"""Carry out a history purge on a room.
Args:
- purge_id: The ID for this purge.
room_id: The room to purge from
token: topological token to delete events before
delete_local_events: True to delete local events as well as remote ones
"""
- self._purges_in_progress_by_room.add(room_id)
try:
async with self._worker_locks.acquire_read_write_lock(
PURGE_PAGINATION_LOCK_NAME, room_id, write=True
@@ -371,57 +319,68 @@ class PaginationHandler:
room_id, token, delete_local_events
)
logger.info("[purge] complete")
- self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
+ return None
except Exception:
f = Failure()
logger.error(
"[purge] failed", exc_info=(f.type, f.value, f.getTracebackObject())
)
- self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED
- self._purges_by_id[purge_id].error = f.getErrorMessage()
- finally:
- self._purges_in_progress_by_room.discard(room_id)
-
- # remove the purge from the list 24 hours after it completes
- def clear_purge() -> None:
- del self._purges_by_id[purge_id]
-
- self.hs.get_reactor().callLater(
- PaginationHandler.CLEAR_PURGE_AFTER_MS / 1000, clear_purge
- )
-
- def get_purge_status(self, purge_id: str) -> Optional[PurgeStatus]:
- """Get the current status of an active purge
+ return f.getErrorMessage()
- Args:
- purge_id: purge_id returned by start_purge_history
- """
- return self._purges_by_id.get(purge_id)
-
- def get_delete_status(self, delete_id: str) -> Optional[DeleteStatus]:
+ async def get_delete_task(self, delete_id: str) -> Optional[ScheduledTask]:
"""Get the current status of an active deleting
Args:
delete_id: delete_id returned by start_shutdown_and_purge_room
+ or start_purge_history.
"""
- return self._delete_by_id.get(delete_id)
+ return await self._task_scheduler.get_task(delete_id)
- def get_delete_ids_by_room(self, room_id: str) -> Optional[StrCollection]:
- """Get all active delete ids by room
+ async def get_delete_tasks_by_room(
+ self, room_id: str, only_active: Optional[bool] = False
+ ) -> List[ScheduledTask]:
+ """Get complete, failed or active delete tasks by room
Args:
room_id: room_id that is deleted
+ only_active: if True, completed&failed tasks will be omitted
+ """
+ statuses = [TaskStatus.ACTIVE]
+ if not only_active:
+ statuses += [TaskStatus.COMPLETE, TaskStatus.FAILED]
+
+ return await self._task_scheduler.get_tasks(
+ actions=[PURGE_ROOM_ACTION_NAME, SHUTDOWN_AND_PURGE_ROOM_ACTION_NAME],
+ resource_id=room_id,
+ statuses=statuses,
+ )
+
+ async def _purge_room(
+ self,
+ task: ScheduledTask,
+ ) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]:
+ """
+ Scheduler action to purge a room.
"""
- return self._delete_by_room.get(room_id)
+ if not task.resource_id:
+ raise Exception("No room id passed to purge_room task")
+ params = task.params if task.params else {}
+ await self.purge_room(task.resource_id, params.get("force", False))
+ return TaskStatus.COMPLETE, None, None
- async def purge_room(self, room_id: str, force: bool = False) -> None:
+ async def purge_room(
+ self,
+ room_id: str,
+ force: bool,
+ ) -> None:
"""Purge the given room from the database.
- This function is part the delete room v1 API.
Args:
room_id: room to be purged
force: set true to skip checking for joined users.
"""
+ logger.info("starting purge room_id=%s force=%s", room_id, force)
+
async with self._worker_locks.acquire_multi_read_write_lock(
[
(PURGE_PAGINATION_LOCK_NAME, room_id),
@@ -430,13 +389,20 @@ class PaginationHandler:
write=True,
):
# first check that we have no users in this room
- if not force:
- joined = await self.store.is_host_joined(room_id, self._server_name)
- if joined:
+ joined = await self.store.is_host_joined(room_id, self._server_name)
+ if joined:
+ if force:
+ logger.info(
+ "force-purging room %s with some local users still joined",
+ room_id,
+ )
+ else:
raise SynapseError(400, "Users are still joined to this room")
await self._storage_controllers.purge_events.purge_room(room_id)
+ logger.info("purge complete for room_id %s", room_id)
+
@trace
async def get_messages(
self,
@@ -711,177 +677,72 @@ class PaginationHandler:
async def _shutdown_and_purge_room(
self,
- delete_id: str,
- room_id: str,
- requester_user_id: Optional[str],
- new_room_user_id: Optional[str] = None,
- new_room_name: Optional[str] = None,
- message: Optional[str] = None,
- block: bool = False,
- purge: bool = True,
- force_purge: bool = False,
- ) -> None:
+ task: ScheduledTask,
+ ) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]:
"""
- Shuts down and purges a room.
-
- See `RoomShutdownHandler.shutdown_room` for details of creation of the new room
-
- Args:
- delete_id: The ID for this delete.
- room_id: The ID of the room to shut down.
- requester_user_id:
- User who requested the action. Will be recorded as putting the room on the
- blocking list.
- If None, the action was not manually requested but instead
- triggered automatically, e.g. through a Synapse module
- or some other policy.
- MUST NOT be None if block=True.
- new_room_user_id:
- If set, a new room will be created with this user ID
- as the creator and admin, and all users in the old room will be
- moved into that room. If not set, no new room will be created
- and the users will just be removed from the old room.
- new_room_name:
- A string representing the name of the room that new users will
- be invited to. Defaults to `Content Violation Notification`
- message:
- A string containing the first message that will be sent as
- `new_room_user_id` in the new room. Ideally this will clearly
- convey why the original room was shut down.
- Defaults to `Sharing illegal content on this server is not
- permitted and rooms in violation will be blocked.`
- block:
- If set to `true`, this room will be added to a blocking list,
- preventing future attempts to join the room. Defaults to `false`.
- purge:
- If set to `true`, purge the given room from the database.
- force_purge:
- If set to `true`, the room will be purged from database
- also if it fails to remove some users from room.
-
- Saves a `RoomShutdownHandler.ShutdownRoomResponse` in `DeleteStatus`:
+ Scheduler action to shutdown and purge a room.
"""
+ if task.resource_id is None or task.params is None:
+ raise Exception(
+ "No room id and/or no parameters passed to shutdown_and_purge_room task"
+ )
- self._purges_in_progress_by_room.add(room_id)
- try:
- async with self._worker_locks.acquire_read_write_lock(
- PURGE_PAGINATION_LOCK_NAME, room_id, write=True
- ):
- self._delete_by_id[delete_id].status = DeleteStatus.STATUS_SHUTTING_DOWN
- self._delete_by_id[
- delete_id
- ].shutdown_room = await self._room_shutdown_handler.shutdown_room(
- room_id=room_id,
- requester_user_id=requester_user_id,
- new_room_user_id=new_room_user_id,
- new_room_name=new_room_name,
- message=message,
- block=block,
- )
- self._delete_by_id[delete_id].status = DeleteStatus.STATUS_PURGING
+ room_id = task.resource_id
- if purge:
- logger.info("starting purge room_id %s", room_id)
+ async def update_result(result: Optional[JsonMapping]) -> None:
+ await self._task_scheduler.update_task(task.id, result=result)
- # first check that we have no users in this room
- if not force_purge:
- joined = await self.store.is_host_joined(
- room_id, self._server_name
- )
- if joined:
- raise SynapseError(
- 400, "Users are still joined to this room"
- )
+ shutdown_result = (
+ cast(ShutdownRoomResponse, task.result) if task.result else None
+ )
- await self._storage_controllers.purge_events.purge_room(room_id)
+ shutdown_result = await self._room_shutdown_handler.shutdown_room(
+ room_id,
+ cast(ShutdownRoomParams, task.params),
+ shutdown_result,
+ update_result,
+ )
- logger.info("purge complete for room_id %s", room_id)
- self._delete_by_id[delete_id].status = DeleteStatus.STATUS_COMPLETE
- except Exception:
- f = Failure()
- logger.error(
- "failed",
- exc_info=(f.type, f.value, f.getTracebackObject()),
- )
- self._delete_by_id[delete_id].status = DeleteStatus.STATUS_FAILED
- self._delete_by_id[delete_id].error = f.getErrorMessage()
- finally:
- self._purges_in_progress_by_room.discard(room_id)
-
- # remove the delete from the list 24 hours after it completes
- def clear_delete() -> None:
- del self._delete_by_id[delete_id]
- self._delete_by_room[room_id].remove(delete_id)
- if not self._delete_by_room[room_id]:
- del self._delete_by_room[room_id]
-
- self.hs.get_reactor().callLater(
- PaginationHandler.CLEAR_PURGE_AFTER_MS / 1000, clear_delete
+ if task.params.get("purge", False):
+ await self.purge_room(
+ room_id,
+ task.params.get("force_purge", False),
)
- def start_shutdown_and_purge_room(
+ return (TaskStatus.COMPLETE, shutdown_result, None)
+
+ async def start_shutdown_and_purge_room(
self,
room_id: str,
- requester_user_id: Optional[str],
- new_room_user_id: Optional[str] = None,
- new_room_name: Optional[str] = None,
- message: Optional[str] = None,
- block: bool = False,
- purge: bool = True,
- force_purge: bool = False,
+ shutdown_params: ShutdownRoomParams,
) -> str:
"""Start off shut down and purge on a room.
Args:
room_id: The ID of the room to shut down.
- requester_user_id:
- User who requested the action and put the room on the
- blocking list.
- If None, the action was not manually requested but instead
- triggered automatically, e.g. through a Synapse module
- or some other policy.
- MUST NOT be None if block=True.
- new_room_user_id:
- If set, a new room will be created with this user ID
- as the creator and admin, and all users in the old room will be
- moved into that room. If not set, no new room will be created
- and the users will just be removed from the old room.
- new_room_name:
- A string representing the name of the room that new users will
- be invited to. Defaults to `Content Violation Notification`
- message:
- A string containing the first message that will be sent as
- `new_room_user_id` in the new room. Ideally this will clearly
- convey why the original room was shut down.
- Defaults to `Sharing illegal content on this server is not
- permitted and rooms in violation will be blocked.`
- block:
- If set to `true`, this room will be added to a blocking list,
- preventing future attempts to join the room. Defaults to `false`.
- purge:
- If set to `true`, purge the given room from the database.
- force_purge:
- If set to `true`, the room will be purged from database
- also if it fails to remove some users from room.
+ shutdown_params: parameters for the shutdown
Returns:
unique ID for this delete transaction.
"""
- if room_id in self._purges_in_progress_by_room:
- raise SynapseError(
- 400, "History purge already in progress for %s" % (room_id,)
- )
+ if len(await self.get_delete_tasks_by_room(room_id, only_active=True)) > 0:
+ raise SynapseError(400, "Purge already in progress for %s" % (room_id,))
# This check is double to `RoomShutdownHandler.shutdown_room`
# But here the requester get a direct response / error with HTTP request
# and do not have to check the purge status
+ new_room_user_id = shutdown_params["new_room_user_id"]
if new_room_user_id is not None:
if not self.hs.is_mine_id(new_room_user_id):
raise SynapseError(
400, "User must be our own: %s" % (new_room_user_id,)
)
- delete_id = random_string(16)
+ delete_id = await self._task_scheduler.schedule_task(
+ SHUTDOWN_AND_PURGE_ROOM_ACTION_NAME,
+ resource_id=room_id,
+ params=shutdown_params,
+ )
# we log the delete_id here so that it can be tied back to the
# request id in the log lines.
@@ -891,19 +752,4 @@ class PaginationHandler:
delete_id,
)
- self._delete_by_id[delete_id] = DeleteStatus()
- self._delete_by_room.setdefault(room_id, []).append(delete_id)
- run_as_background_process(
- "shutdown_and_purge_room",
- self._shutdown_and_purge_room,
- delete_id,
- room_id,
- requester_user_id,
- new_room_user_id,
- new_room_name,
- message,
- block,
- purge,
- force_purge,
- )
return delete_id
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 7a762c8511..a0c3b16819 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -20,7 +20,7 @@ import random
import string
from collections import OrderedDict
from http import HTTPStatus
-from typing import TYPE_CHECKING, Any, Awaitable, Dict, List, Optional, Tuple
+from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, List, Optional, Tuple
import attr
from typing_extensions import TypedDict
@@ -54,11 +54,11 @@ from synapse.events import EventBase
from synapse.events.snapshot import UnpersistedEventContext
from synapse.events.utils import copy_and_fixup_power_levels_contents
from synapse.handlers.relations import BundledAggregations
-from synapse.module_api import NOT_SPAM
from synapse.rest.admin._base import assert_user_is_admin
from synapse.streams import EventSource
from synapse.types import (
JsonDict,
+ JsonMapping,
MutableStateMap,
Requester,
RoomAlias,
@@ -454,7 +454,7 @@ class RoomCreationHandler:
spam_check = await self._spam_checker_module_callbacks.user_may_create_room(
user_id
)
- if spam_check != NOT_SPAM:
+ if spam_check != self._spam_checker_module_callbacks.NOT_SPAM:
raise SynapseError(
403,
"You are not permitted to create rooms",
@@ -768,7 +768,7 @@ class RoomCreationHandler:
spam_check = await self._spam_checker_module_callbacks.user_may_create_room(
user_id
)
- if spam_check != NOT_SPAM:
+ if spam_check != self._spam_checker_module_callbacks.NOT_SPAM:
raise SynapseError(
403,
"You are not permitted to create rooms",
@@ -1750,6 +1750,45 @@ class RoomEventSource(EventSource[RoomStreamToken, EventBase]):
return self.store.get_current_room_stream_token_for_room_id(room_id)
+class ShutdownRoomParams(TypedDict):
+ """
+ Attributes:
+ requester_user_id:
+ User who requested the action. Will be recorded as putting the room on the
+ blocking list.
+ new_room_user_id:
+ If set, a new room will be created with this user ID
+ as the creator and admin, and all users in the old room will be
+ moved into that room. If not set, no new room will be created
+ and the users will just be removed from the old room.
+ new_room_name:
+ A string representing the name of the room that new users will
+ be invited to. Defaults to `Content Violation Notification`
+ message:
+ A string containing the first message that will be sent as
+ `new_room_user_id` in the new room. Ideally this will clearly
+ convey why the original room was shut down.
+ Defaults to `Sharing illegal content on this server is not
+ permitted and rooms in violation will be blocked.`
+ block:
+ If set to `true`, this room will be added to a blocking list,
+ preventing future attempts to join the room. Defaults to `false`.
+ purge:
+ If set to `true`, purge the given room from the database.
+ force_purge:
+ If set to `true`, the room will be purged from database
+ even if there are still users joined to the room.
+ """
+
+ requester_user_id: Optional[str]
+ new_room_user_id: Optional[str]
+ new_room_name: Optional[str]
+ message: Optional[str]
+ block: bool
+ purge: bool
+ force_purge: bool
+
+
class ShutdownRoomResponse(TypedDict):
"""
Attributes:
@@ -1787,12 +1826,12 @@ class RoomShutdownHandler:
async def shutdown_room(
self,
room_id: str,
- requester_user_id: Optional[str],
- new_room_user_id: Optional[str] = None,
- new_room_name: Optional[str] = None,
- message: Optional[str] = None,
- block: bool = False,
- ) -> ShutdownRoomResponse:
+ params: ShutdownRoomParams,
+ result: Optional[ShutdownRoomResponse] = None,
+ update_result_fct: Optional[
+ Callable[[Optional[JsonMapping]], Awaitable[None]]
+ ] = None,
+ ) -> Optional[ShutdownRoomResponse]:
"""
Shuts down a room. Moves all local users and room aliases automatically
to a new room if `new_room_user_id` is set. Otherwise local users only
@@ -1808,52 +1847,23 @@ class RoomShutdownHandler:
Args:
room_id: The ID of the room to shut down.
- requester_user_id:
- User who requested the action and put the room on the
- blocking list.
- If None, the action was not manually requested but instead
- triggered automatically, e.g. through a Synapse module
- or some other policy.
- MUST NOT be None if block=True.
- new_room_user_id:
- If set, a new room will be created with this user ID
- as the creator and admin, and all users in the old room will be
- moved into that room. If not set, no new room will be created
- and the users will just be removed from the old room.
- new_room_name:
- A string representing the name of the room that new users will
- be invited to. Defaults to `Content Violation Notification`
- message:
- A string containing the first message that will be sent as
- `new_room_user_id` in the new room. Ideally this will clearly
- convey why the original room was shut down.
- Defaults to `Sharing illegal content on this server is not
- permitted and rooms in violation will be blocked.`
- block:
- If set to `True`, users will be prevented from joining the old
- room. This option can also be used to pre-emptively block a room,
- even if it's unknown to this homeserver. In this case, the room
- will be blocked, and no further action will be taken. If `False`,
- attempting to delete an unknown room is invalid.
-
- Defaults to `False`.
-
- Returns: a dict containing the following keys:
- kicked_users: An array of users (`user_id`) that were kicked.
- failed_to_kick_users:
- An array of users (`user_id`) that that were not kicked.
- local_aliases:
- An array of strings representing the local aliases that were
- migrated from the old room to the new.
- new_room_id:
- A string representing the room ID of the new room, or None if
- no such room was created.
- """
+ delete_id: The delete ID identifying this delete request
+ params: parameters for the shutdown, cf `ShutdownRoomParams`
+ result: current status of the shutdown, if it was interrupted
+ update_result_fct: function called when `result` is updated locally
- if not new_room_name:
- new_room_name = self.DEFAULT_ROOM_NAME
- if not message:
- message = self.DEFAULT_MESSAGE
+ Returns: a dict matching `ShutdownRoomResponse`.
+ """
+ requester_user_id = params["requester_user_id"]
+ new_room_user_id = params["new_room_user_id"]
+ block = params["block"]
+
+ new_room_name = (
+ params["new_room_name"]
+ if params["new_room_name"]
+ else self.DEFAULT_ROOM_NAME
+ )
+ message = params["message"] if params["message"] else self.DEFAULT_MESSAGE
if not RoomID.is_valid(room_id):
raise SynapseError(400, "%s is not a legal room ID" % (room_id,))
@@ -1865,6 +1875,17 @@ class RoomShutdownHandler:
403, "Shutdown of this room is forbidden", Codes.FORBIDDEN
)
+ result = (
+ result
+ if result
+ else {
+ "kicked_users": [],
+ "failed_to_kick_users": [],
+ "local_aliases": [],
+ "new_room_id": None,
+ }
+ )
+
# Action the block first (even if the room doesn't exist yet)
if block:
if requester_user_id is None:
@@ -1877,14 +1898,10 @@ class RoomShutdownHandler:
if not await self.store.get_room(room_id):
# if we don't know about the room, there is nothing left to do.
- return {
- "kicked_users": [],
- "failed_to_kick_users": [],
- "local_aliases": [],
- "new_room_id": None,
- }
+ return result
- if new_room_user_id is not None:
+ new_room_id = result.get("new_room_id")
+ if new_room_user_id is not None and new_room_id is None:
if not self.hs.is_mine_id(new_room_user_id):
raise SynapseError(
400, "User must be our own: %s" % (new_room_user_id,)
@@ -1904,6 +1921,10 @@ class RoomShutdownHandler:
ratelimit=False,
)
+ result["new_room_id"] = new_room_id
+ if update_result_fct:
+ await update_result_fct(result)
+
logger.info(
"Shutting down room %r, joining to new room: %r", room_id, new_room_id
)
@@ -1917,12 +1938,9 @@ class RoomShutdownHandler:
stream_id,
)
else:
- new_room_id = None
logger.info("Shutting down room %r", room_id)
users = await self.store.get_users_in_room(room_id)
- kicked_users = []
- failed_to_kick_users = []
for user_id in users:
if not self.hs.is_mine_id(user_id):
continue
@@ -1951,7 +1969,9 @@ class RoomShutdownHandler:
stream_id,
)
- await self.room_member_handler.forget(target_requester.user, room_id)
+ await self.room_member_handler.forget(
+ target_requester.user, room_id, do_not_schedule_purge=True
+ )
# Join users to new room
if new_room_user_id:
@@ -1966,15 +1986,23 @@ class RoomShutdownHandler:
require_consent=False,
)
- kicked_users.append(user_id)
+ result["kicked_users"].append(user_id)
+ if update_result_fct:
+ await update_result_fct(result)
except Exception:
logger.exception(
"Failed to leave old room and join new room for %r", user_id
)
- failed_to_kick_users.append(user_id)
+ result["failed_to_kick_users"].append(user_id)
+ if update_result_fct:
+ await update_result_fct(result)
# Send message in new room and move aliases
if new_room_user_id:
+ room_creator_requester = create_requester(
+ new_room_user_id, authenticated_entity=requester_user_id
+ )
+
await self.event_creation_handler.create_and_send_nonmember_event(
room_creator_requester,
{
@@ -1986,18 +2014,15 @@ class RoomShutdownHandler:
ratelimit=False,
)
- aliases_for_room = await self.store.get_aliases_for_room(room_id)
+ result["local_aliases"] = list(
+ await self.store.get_aliases_for_room(room_id)
+ )
assert new_room_id is not None
await self.store.update_aliases_for_room(
room_id, new_room_id, requester_user_id
)
else:
- aliases_for_room = []
+ result["local_aliases"] = []
- return {
- "kicked_users": kicked_users,
- "failed_to_kick_users": failed_to_kick_users,
- "local_aliases": list(aliases_for_room),
- "new_room_id": new_room_id,
- }
+ return result
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index de0f04e3fe..90343c2306 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -37,13 +37,13 @@ from synapse.api.ratelimiting import Ratelimiter
from synapse.event_auth import get_named_level, get_power_level_event
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
+from synapse.handlers.pagination import PURGE_ROOM_ACTION_NAME
from synapse.handlers.profile import MAX_AVATAR_URL_LEN, MAX_DISPLAYNAME_LEN
from synapse.handlers.state_deltas import MatchChange, StateDeltasHandler
from synapse.handlers.worker_lock import NEW_EVENT_DURING_PURGE_LOCK_NAME
from synapse.logging import opentracing
from synapse.metrics import event_processing_positions
from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.module_api import NOT_SPAM
from synapse.types import (
JsonDict,
Requester,
@@ -169,6 +169,10 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
self.request_ratelimiter = hs.get_request_ratelimiter()
hs.get_notifier().add_new_join_in_room_callback(self._on_user_joined_room)
+ self._forgotten_room_retention_period = (
+ hs.config.server.forgotten_room_retention_period
+ )
+
def _on_user_joined_room(self, event_id: str, room_id: str) -> None:
"""Notify the rate limiter that a room join has occurred.
@@ -278,7 +282,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
"""
raise NotImplementedError()
- async def forget(self, user: UserID, room_id: str) -> None:
+ async def forget(
+ self, user: UserID, room_id: str, do_not_schedule_purge: bool = False
+ ) -> None:
user_id = user.to_string()
member = await self._storage_controllers.state.get_current_state_event(
@@ -298,6 +304,20 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
# the table `current_state_events` and `get_current_state_events` is `None`.
await self.store.forget(user_id, room_id)
+ # If everyone locally has left the room, then there is no reason for us to keep the
+ # room around and we automatically purge room after a little bit
+ if (
+ not do_not_schedule_purge
+ and self._forgotten_room_retention_period
+ and await self.store.is_locally_forgotten_room(room_id)
+ ):
+ await self.hs.get_task_scheduler().schedule_task(
+ PURGE_ROOM_ACTION_NAME,
+ resource_id=room_id,
+ timestamp=self.clock.time_msec()
+ + self._forgotten_room_retention_period,
+ )
+
async def ratelimit_multiple_invites(
self,
requester: Optional[Requester],
@@ -804,7 +824,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
spam_check = await self._spam_checker_module_callbacks.user_may_invite(
requester.user.to_string(), target_id, room_id
)
- if spam_check != NOT_SPAM:
+ if spam_check != self._spam_checker_module_callbacks.NOT_SPAM:
logger.info("Blocking invite due to spam checker")
block_invite_result = spam_check
@@ -939,7 +959,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
target.to_string(), room_id, is_invited=inviter is not None
)
)
- if spam_check != NOT_SPAM:
+ if spam_check != self._spam_checker_module_callbacks.NOT_SPAM:
raise SynapseError(
403,
"Not allowed to join this room",
@@ -1557,7 +1577,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
room_id=room_id,
)
)
- if spam_check != NOT_SPAM:
+ if spam_check != self._spam_checker_module_callbacks.NOT_SPAM:
raise SynapseError(
403,
"Cannot send threepid invite",
diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py
index 7ec202be23..65e2aca456 100644
--- a/synapse/module_api/__init__.py
+++ b/synapse/module_api/__init__.py
@@ -1741,7 +1741,18 @@ class ModuleApi:
"""
# Future extensions to this method might want to e.g. allow use of `force_purge`.
# TODO In the future we should make sure this is persistent.
- self._hs.get_pagination_handler().start_shutdown_and_purge_room(room_id, None)
+ await self._hs.get_pagination_handler().start_shutdown_and_purge_room(
+ room_id,
+ {
+ "new_room_user_id": None,
+ "new_room_name": None,
+ "message": None,
+ "requester_user_id": None,
+ "block": False,
+ "purge": True,
+ "force_purge": False,
+ },
+ )
async def set_displayname(
self,
diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py
index 0d42c89ff7..7d0b4b55a0 100644
--- a/synapse/rest/admin/__init__.py
+++ b/synapse/rest/admin/__init__.py
@@ -21,6 +21,7 @@ from http import HTTPStatus
from typing import TYPE_CHECKING, Optional, Tuple
from synapse.api.errors import Codes, NotFoundError, SynapseError
+from synapse.handlers.pagination import PURGE_HISTORY_ACTION_NAME
from synapse.http.server import HttpServer, JsonResource
from synapse.http.servlet import RestServlet, parse_json_object_from_request
from synapse.http.site import SynapseRequest
@@ -93,7 +94,7 @@ from synapse.rest.admin.users import (
UserTokenRestServlet,
WhoisRestServlet,
)
-from synapse.types import JsonDict, RoomStreamToken
+from synapse.types import JsonDict, RoomStreamToken, TaskStatus
from synapse.util import SYNAPSE_VERSION
if TYPE_CHECKING:
@@ -196,7 +197,7 @@ class PurgeHistoryRestServlet(RestServlet):
errcode=Codes.BAD_JSON,
)
- purge_id = self.pagination_handler.start_purge_history(
+ purge_id = await self.pagination_handler.start_purge_history(
room_id, token, delete_local_events=delete_local_events
)
@@ -215,11 +216,20 @@ class PurgeHistoryStatusRestServlet(RestServlet):
) -> Tuple[int, JsonDict]:
await assert_requester_is_admin(self.auth, request)
- purge_status = self.pagination_handler.get_purge_status(purge_id)
- if purge_status is None:
+ purge_task = await self.pagination_handler.get_delete_task(purge_id)
+ if purge_task is None or purge_task.action != PURGE_HISTORY_ACTION_NAME:
raise NotFoundError("purge id '%s' not found" % purge_id)
- return HTTPStatus.OK, purge_status.asdict()
+ result: JsonDict = {
+ "status": purge_task.status
+ if purge_task.status == TaskStatus.COMPLETE
+ or purge_task.status == TaskStatus.FAILED
+ else "active",
+ }
+ if purge_task.error:
+ result["error"] = purge_task.error
+
+ return HTTPStatus.OK, result
########################################################################################
diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py
index 1d65560265..436718c8b2 100644
--- a/synapse/rest/admin/rooms.py
+++ b/synapse/rest/admin/rooms.py
@@ -19,6 +19,10 @@ from urllib import parse as urlparse
from synapse.api.constants import Direction, EventTypes, JoinRules, Membership
from synapse.api.errors import AuthError, Codes, NotFoundError, SynapseError
from synapse.api.filtering import Filter
+from synapse.handlers.pagination import (
+ PURGE_ROOM_ACTION_NAME,
+ SHUTDOWN_AND_PURGE_ROOM_ACTION_NAME,
+)
from synapse.http.servlet import (
ResolveRoomIdMixin,
RestServlet,
@@ -36,7 +40,7 @@ from synapse.rest.admin._base import (
)
from synapse.storage.databases.main.room import RoomSortOrder
from synapse.streams.config import PaginationConfig
-from synapse.types import JsonDict, RoomID, UserID, create_requester
+from synapse.types import JsonDict, RoomID, ScheduledTask, UserID, create_requester
from synapse.types.state import StateFilter
from synapse.util import json_decoder
@@ -117,20 +121,30 @@ class RoomRestV2Servlet(RestServlet):
403, "Shutdown of this room is forbidden", Codes.FORBIDDEN
)
- delete_id = self._pagination_handler.start_shutdown_and_purge_room(
+ delete_id = await self._pagination_handler.start_shutdown_and_purge_room(
room_id=room_id,
- new_room_user_id=content.get("new_room_user_id"),
- new_room_name=content.get("room_name"),
- message=content.get("message"),
- requester_user_id=requester.user.to_string(),
- block=block,
- purge=purge,
- force_purge=force_purge,
+ shutdown_params={
+ "new_room_user_id": content.get("new_room_user_id"),
+ "new_room_name": content.get("room_name"),
+ "message": content.get("message"),
+ "requester_user_id": requester.user.to_string(),
+ "block": block,
+ "purge": purge,
+ "force_purge": force_purge,
+ },
)
return HTTPStatus.OK, {"delete_id": delete_id}
+def _convert_delete_task_to_response(task: ScheduledTask) -> JsonDict:
+ return {
+ "delete_id": task.id,
+ "status": task.status,
+ "shutdown_room": task.result,
+ }
+
+
class DeleteRoomStatusByRoomIdRestServlet(RestServlet):
"""Get the status of the delete room background task."""
@@ -150,21 +164,16 @@ class DeleteRoomStatusByRoomIdRestServlet(RestServlet):
HTTPStatus.BAD_REQUEST, "%s is not a legal room ID" % (room_id,)
)
- delete_ids = self._pagination_handler.get_delete_ids_by_room(room_id)
- if delete_ids is None:
- raise NotFoundError("No delete task for room_id '%s' found" % room_id)
+ delete_tasks = await self._pagination_handler.get_delete_tasks_by_room(room_id)
- response = []
- for delete_id in delete_ids:
- delete = self._pagination_handler.get_delete_status(delete_id)
- if delete:
- response += [
- {
- "delete_id": delete_id,
- **delete.asdict(),
- }
- ]
- return HTTPStatus.OK, {"results": cast(JsonDict, response)}
+ if delete_tasks:
+ return HTTPStatus.OK, {
+ "results": [
+ _convert_delete_task_to_response(task) for task in delete_tasks
+ ],
+ }
+ else:
+ raise NotFoundError("No delete task for room_id '%s' found" % room_id)
class DeleteRoomStatusByDeleteIdRestServlet(RestServlet):
@@ -181,11 +190,14 @@ class DeleteRoomStatusByDeleteIdRestServlet(RestServlet):
) -> Tuple[int, JsonDict]:
await assert_requester_is_admin(self._auth, request)
- delete_status = self._pagination_handler.get_delete_status(delete_id)
- if delete_status is None:
+ delete_task = await self._pagination_handler.get_delete_task(delete_id)
+ if delete_task is None or (
+ delete_task.action != PURGE_ROOM_ACTION_NAME
+ and delete_task.action != SHUTDOWN_AND_PURGE_ROOM_ACTION_NAME
+ ):
raise NotFoundError("delete id '%s' not found" % delete_id)
- return HTTPStatus.OK, cast(JsonDict, delete_status.asdict())
+ return HTTPStatus.OK, _convert_delete_task_to_response(delete_task)
class ListRoomRestServlet(RestServlet):
@@ -349,11 +361,15 @@ class RoomRestServlet(RestServlet):
ret = await room_shutdown_handler.shutdown_room(
room_id=room_id,
- new_room_user_id=content.get("new_room_user_id"),
- new_room_name=content.get("room_name"),
- message=content.get("message"),
- requester_user_id=requester.user.to_string(),
- block=block,
+ params={
+ "new_room_user_id": content.get("new_room_user_id"),
+ "new_room_name": content.get("room_name"),
+ "message": content.get("message"),
+ "requester_user_id": requester.user.to_string(),
+ "block": block,
+ "purge": purge,
+ "force_purge": force_purge,
+ },
)
# Purge room
diff --git a/tests/rest/admin/test_room.py b/tests/rest/admin/test_room.py
index eb50086c50..6ed451d7c4 100644
--- a/tests/rest/admin/test_room.py
+++ b/tests/rest/admin/test_room.py
@@ -15,26 +15,34 @@ import json
import time
import urllib.parse
from typing import List, Optional
-from unittest.mock import Mock
+from unittest.mock import AsyncMock, Mock
from parameterized import parameterized
+from twisted.internet.task import deferLater
from twisted.test.proto_helpers import MemoryReactor
import synapse.rest.admin
from synapse.api.constants import EventTypes, Membership, RoomTypes
from synapse.api.errors import Codes
-from synapse.handlers.pagination import PaginationHandler, PurgeStatus
+from synapse.handlers.pagination import (
+ PURGE_ROOM_ACTION_NAME,
+ SHUTDOWN_AND_PURGE_ROOM_ACTION_NAME,
+)
from synapse.rest.client import directory, events, login, room
from synapse.server import HomeServer
+from synapse.types import UserID
from synapse.util import Clock
-from synapse.util.stringutils import random_string
+from synapse.util.task_scheduler import TaskScheduler
from tests import unittest
"""Tests admin REST events for /rooms paths."""
+ONE_HOUR_IN_S = 3600
+
+
class DeleteRoomTestCase(unittest.HomeserverTestCase):
servlets = [
synapse.rest.admin.register_servlets,
@@ -46,6 +54,7 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase):
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.event_creation_handler = hs.get_event_creation_handler()
+ self.task_scheduler = hs.get_task_scheduler()
hs.config.consent.user_consent_version = "1"
consent_uri_builder = Mock()
@@ -476,6 +485,7 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.event_creation_handler = hs.get_event_creation_handler()
+ self.task_scheduler = hs.get_task_scheduler()
hs.config.consent.user_consent_version = "1"
consent_uri_builder = Mock()
@@ -502,6 +512,9 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
)
self.url_status_by_delete_id = "/_synapse/admin/v2/rooms/delete_status/"
+ self.room_member_handler = hs.get_room_member_handler()
+ self.pagination_handler = hs.get_pagination_handler()
+
@parameterized.expand(
[
("DELETE", "/_synapse/admin/v2/rooms/%s"),
@@ -661,7 +674,7 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
delete_id1 = channel.json_body["delete_id"]
# go ahead
- self.reactor.advance(PaginationHandler.CLEAR_PURGE_AFTER_MS / 1000 / 2)
+ self.reactor.advance(TaskScheduler.KEEP_TASKS_FOR_MS / 1000 / 2)
# second task
channel = self.make_request(
@@ -686,12 +699,14 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
self.assertEqual(2, len(channel.json_body["results"]))
self.assertEqual("complete", channel.json_body["results"][0]["status"])
self.assertEqual("complete", channel.json_body["results"][1]["status"])
- self.assertEqual(delete_id1, channel.json_body["results"][0]["delete_id"])
- self.assertEqual(delete_id2, channel.json_body["results"][1]["delete_id"])
+ delete_ids = {delete_id1, delete_id2}
+ self.assertTrue(channel.json_body["results"][0]["delete_id"] in delete_ids)
+ delete_ids.remove(channel.json_body["results"][0]["delete_id"])
+ self.assertTrue(channel.json_body["results"][1]["delete_id"] in delete_ids)
# get status after more than clearing time for first task
# second task is not cleared
- self.reactor.advance(PaginationHandler.CLEAR_PURGE_AFTER_MS / 1000 / 2)
+ self.reactor.advance(TaskScheduler.KEEP_TASKS_FOR_MS / 1000 / 2)
channel = self.make_request(
"GET",
@@ -705,7 +720,7 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
self.assertEqual(delete_id2, channel.json_body["results"][0]["delete_id"])
# get status after more than clearing time for all tasks
- self.reactor.advance(PaginationHandler.CLEAR_PURGE_AFTER_MS / 1000 / 2)
+ self.reactor.advance(TaskScheduler.KEEP_TASKS_FOR_MS / 1000 / 2)
channel = self.make_request(
"GET",
@@ -721,6 +736,13 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
body = {"new_room_user_id": self.admin_user}
+ # Mock PaginationHandler.purge_room to sleep for 100s, so we have time to do a second call
+ # before the purge is over. Note that it doesn't purge anymore, but we don't care.
+ async def purge_room(room_id: str, force: bool) -> None:
+ await deferLater(self.hs.get_reactor(), 100, lambda: None)
+
+ self.pagination_handler.purge_room = AsyncMock(side_effect=purge_room) # type: ignore[method-assign]
+
# first call to delete room
# and do not wait for finish the task
first_channel = self.make_request(
@@ -728,7 +750,6 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
self.url.encode("ascii"),
content=body,
access_token=self.admin_user_tok,
- await_result=False,
)
# second call to delete room
@@ -742,7 +763,7 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
self.assertEqual(400, second_channel.code, msg=second_channel.json_body)
self.assertEqual(Codes.UNKNOWN, second_channel.json_body["errcode"])
self.assertEqual(
- f"History purge already in progress for {self.room_id}",
+ f"Purge already in progress for {self.room_id}",
second_channel.json_body["error"],
)
@@ -751,6 +772,9 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
self.assertEqual(200, first_channel.code, msg=first_channel.json_body)
self.assertIn("delete_id", first_channel.json_body)
+ # wait for purge_room to finish
+ self.pump(1)
+
# check status after finish the task
self._test_result(
first_channel.json_body["delete_id"],
@@ -972,6 +996,115 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
# Assert we can no longer peek into the room
self._assert_peek(self.room_id, expect_code=403)
+ @unittest.override_config({"forgotten_room_retention_period": "1d"})
+ def test_purge_forgotten_room(self) -> None:
+ # Create a test room
+ room_id = self.helper.create_room_as(
+ self.admin_user,
+ tok=self.admin_user_tok,
+ )
+
+ self.helper.leave(room_id, user=self.admin_user, tok=self.admin_user_tok)
+ self.get_success(
+ self.room_member_handler.forget(
+ UserID.from_string(self.admin_user), room_id
+ )
+ )
+
+ # Test that room is not yet purged
+ with self.assertRaises(AssertionError):
+ self._is_purged(room_id)
+
+ # Advance 24 hours in the future, past the `forgotten_room_retention_period`
+ self.reactor.advance(24 * ONE_HOUR_IN_S)
+
+ self._is_purged(room_id)
+
+ def test_scheduled_purge_room(self) -> None:
+ # Create a test room
+ room_id = self.helper.create_room_as(
+ self.admin_user,
+ tok=self.admin_user_tok,
+ )
+ self.helper.leave(room_id, user=self.admin_user, tok=self.admin_user_tok)
+
+ # Schedule a purge 10 seconds in the future
+ self.get_success(
+ self.task_scheduler.schedule_task(
+ PURGE_ROOM_ACTION_NAME,
+ resource_id=room_id,
+ timestamp=self.clock.time_msec() + 10 * 1000,
+ )
+ )
+
+ # Test that room is not yet purged
+ with self.assertRaises(AssertionError):
+ self._is_purged(room_id)
+
+ # Wait for next scheduler run
+ self.reactor.advance(TaskScheduler.SCHEDULE_INTERVAL_MS)
+
+ self._is_purged(room_id)
+
+ def test_schedule_shutdown_room(self) -> None:
+ # Create a test room
+ room_id = self.helper.create_room_as(
+ self.other_user,
+ tok=self.other_user_tok,
+ )
+
+ # Schedule a shutdown 10 seconds in the future
+ delete_id = self.get_success(
+ self.task_scheduler.schedule_task(
+ SHUTDOWN_AND_PURGE_ROOM_ACTION_NAME,
+ resource_id=room_id,
+ params={
+ "requester_user_id": self.admin_user,
+ "new_room_user_id": self.admin_user,
+ "new_room_name": None,
+ "message": None,
+ "block": False,
+ "purge": True,
+ "force_purge": True,
+ },
+ timestamp=self.clock.time_msec() + 10 * 1000,
+ )
+ )
+
+ # Test that room is not yet shutdown
+ self._is_member(room_id, self.other_user)
+
+ # Test that room is not yet purged
+ with self.assertRaises(AssertionError):
+ self._is_purged(room_id)
+
+ # Wait for next scheduler run
+ self.reactor.advance(TaskScheduler.SCHEDULE_INTERVAL_MS)
+
+ # Test that all users has been kicked (room is shutdown)
+ self._has_no_members(room_id)
+
+ self._is_purged(room_id)
+
+ # Retrieve delete results
+ result = self.make_request(
+ "GET",
+ self.url_status_by_delete_id + delete_id,
+ access_token=self.admin_user_tok,
+ )
+ self.assertEqual(200, result.code, msg=result.json_body)
+
+ # Check that the user is in kicked_users
+ self.assertIn(
+ self.other_user, result.json_body["shutdown_room"]["kicked_users"]
+ )
+
+ new_room_id = result.json_body["shutdown_room"]["new_room_id"]
+ self.assertTrue(new_room_id)
+
+ # Check that the user is actually in the new room
+ self._is_member(new_room_id, self.other_user)
+
def _is_blocked(self, room_id: str, expect: bool = True) -> None:
"""Assert that the room is blocked or not"""
d = self.store.is_room_blocked(room_id)
@@ -1034,7 +1167,6 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
kicked_user: a user_id which is kicked from the room
expect_new_room: if we expect that a new room was created
"""
-
# get information by room_id
channel_room_id = self.make_request(
"GET",
@@ -1957,11 +2089,8 @@ class RoomMessagesTestCase(unittest.HomeserverTestCase):
self.assertEqual(len(chunk), 2, [event["content"] for event in chunk])
# Purge every event before the second event.
- purge_id = random_string(16)
- pagination_handler._purges_by_id[purge_id] = PurgeStatus()
self.get_success(
- pagination_handler._purge_history(
- purge_id=purge_id,
+ pagination_handler.purge_history(
room_id=self.room_id,
token=second_token_str,
delete_local_events=True,
diff --git a/tests/rest/admin/test_server_notice.py b/tests/rest/admin/test_server_notice.py
index 28b999573e..dfd14f5751 100644
--- a/tests/rest/admin/test_server_notice.py
+++ b/tests/rest/admin/test_server_notice.py
@@ -22,6 +22,7 @@ from synapse.server import HomeServer
from synapse.storage.roommember import RoomsForUser
from synapse.types import JsonDict
from synapse.util import Clock
+from synapse.util.stringutils import random_string
from tests import unittest
from tests.unittest import override_config
@@ -413,11 +414,24 @@ class ServerNoticeTestCase(unittest.HomeserverTestCase):
self.assertEqual(messages[0]["content"]["body"], "test msg one")
self.assertEqual(messages[0]["sender"], "@notices:test")
+ random_string(16)
+
# shut down and purge room
self.get_success(
- self.room_shutdown_handler.shutdown_room(first_room_id, self.admin_user)
- )
- self.get_success(self.pagination_handler.purge_room(first_room_id))
+ self.room_shutdown_handler.shutdown_room(
+ first_room_id,
+ {
+ "requester_user_id": self.admin_user,
+ "new_room_user_id": None,
+ "new_room_name": None,
+ "message": None,
+ "block": False,
+ "purge": True,
+ "force_purge": False,
+ },
+ )
+ )
+ self.get_success(self.pagination_handler.purge_room(first_room_id, force=False))
# user is not member anymore
self._check_invite_and_join_status(self.other_user, 0, 0)
diff --git a/tests/rest/client/test_rooms.py b/tests/rest/client/test_rooms.py
index 47c1d38ad7..7627823d3f 100644
--- a/tests/rest/client/test_rooms.py
+++ b/tests/rest/client/test_rooms.py
@@ -41,7 +41,6 @@ from synapse.api.errors import Codes, HttpResponseException
from synapse.appservice import ApplicationService
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
-from synapse.handlers.pagination import PurgeStatus
from synapse.rest import admin
from synapse.rest.client import account, directory, login, profile, register, room, sync
from synapse.server import HomeServer
@@ -2086,11 +2085,8 @@ class RoomMessageListTestCase(RoomBase):
self.assertEqual(len(chunk), 2, [event["content"] for event in chunk])
# Purge every event before the second event.
- purge_id = random_string(16)
- pagination_handler._purges_by_id[purge_id] = PurgeStatus()
self.get_success(
- pagination_handler._purge_history(
- purge_id=purge_id,
+ pagination_handler.purge_history(
room_id=self.room_id,
token=second_token_str,
delete_local_events=True,
|