diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 982b38cc43..c52676709d 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -12,9 +12,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-import json
import logging
-from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set
+from typing import TYPE_CHECKING, List, Optional, Set, Tuple
from twisted.python.failure import Failure
@@ -22,12 +21,19 @@ from synapse.api.constants import Direction, EventTypes, Membership
from synapse.api.errors import SynapseError
from synapse.api.filtering import Filter
from synapse.events.utils import SerializeEventConfig
-from synapse.handlers.room import DeleteStatus, ShutdownRoomParams, ShutdownRoomResponse
+from synapse.handlers.room import ShutdownRoomParams
from synapse.logging.opentracing import trace
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.rest.admin._base import assert_user_is_admin
from synapse.streams.config import PaginationConfig
-from synapse.types import JsonDict, Requester, StreamKeyType
+from synapse.types import (
+ JsonDict,
+ JsonMapping,
+ Requester,
+ ScheduledTask,
+ StreamKeyType,
+ TaskStatus,
+)
from synapse.types.state import StateFilter
from synapse.util.async_helpers import ReadWriteLock
from synapse.util.stringutils import random_string
@@ -52,12 +58,6 @@ class PaginationHandler:
paginating during a purge.
"""
- # when to remove a completed deletion/purge from the results map
- CLEAR_PURGE_AFTER_MS = 1000 * 3600 * 24 # 24 hours
-
- # how often to run the purge rooms loop
- PURGE_ROOMS_INTERVAL_MS = 1000 * 3600 # 1 hour
-
def __init__(self, hs: "HomeServer"):
self.hs = hs
self.auth = hs.get_auth()
@@ -68,6 +68,7 @@ class PaginationHandler:
self._server_name = hs.hostname
self._room_shutdown_handler = hs.get_room_shutdown_handler()
self._relations_handler = hs.get_relations_handler()
+ self._task_scheduler = hs.get_task_scheduler()
self.pagination_lock = ReadWriteLock()
# IDs of rooms in which there currently an active purge *or delete* operation.
@@ -101,95 +102,11 @@ class PaginationHandler:
job.longest_max_lifetime,
)
- if self._is_master:
- self.clock.looping_call(
- run_as_background_process,
- PaginationHandler.PURGE_ROOMS_INTERVAL_MS,
- "purge_rooms",
- self.purge_rooms,
- )
-
- async def purge_rooms(self) -> None:
- """This takes care of restoring unfinished purge/shutdown rooms from the DB.
- It also takes care to launch scheduled ones, like rooms that has been fully
- forgotten.
-
- It should be run regularly.
- """
- rooms_to_delete = await self.store.get_rooms_to_delete()
- for r in rooms_to_delete:
- room_id = r["room_id"]
- delete_id = r["delete_id"]
- status = r["status"]
- action = r["action"]
- timestamp = r["timestamp"]
-
- if (
- status == DeleteStatus.STATUS_COMPLETE
- or status == DeleteStatus.STATUS_FAILED
- ):
- # remove the delete from the list 24 hours after it completes or fails
- ms_since_completed = self.clock.time_msec() - timestamp
- if ms_since_completed >= PaginationHandler.CLEAR_PURGE_AFTER_MS:
- await self.store.delete_room_to_delete(room_id, delete_id)
-
- continue
-
- if room_id in self._purges_in_progress_by_room:
- # a delete background task is already running (or has run)
- # for this room id, let's ignore it for now
- continue
-
- # If the database says we were last in the middle of shutting down the room,
- # let's continue the shutdown process.
- shutdown_response = None
- if (
- action == DeleteStatus.ACTION_SHUTDOWN
- and status == DeleteStatus.STATUS_SHUTTING_DOWN
- ):
- shutdown_params = json.loads(r["params"])
- if r["response"]:
- shutdown_response = json.loads(r["response"])
- await self._shutdown_and_purge_room(
- room_id,
- delete_id,
- shutdown_params=shutdown_params,
- shutdown_response=shutdown_response,
- )
- continue
-
- # If the database says we were last in the middle of purging the room,
- # let's continue the purge process.
- if status == DeleteStatus.STATUS_PURGING:
- purge_now = True
- # Or if we're at or past the scheduled purge time, let's start that one as well
- elif status == DeleteStatus.STATUS_SCHEDULED and (
- timestamp is None or self.clock.time_msec() >= timestamp
- ):
- purge_now = True
-
- # TODO 2 stages purge, keep memberships for a while so we don't "break" sync
- if purge_now:
- params = {}
- if r["params"]:
- params = json.loads(r["params"])
-
- if action == DeleteStatus.ACTION_PURGE_HISTORY:
- if "token" in params:
- await self._purge_history(
- delete_id,
- room_id,
- params["token"],
- params.get("delete_local_events", False),
- True,
- )
- elif action == DeleteStatus.ACTION_PURGE:
- await self.purge_room(
- room_id,
- delete_id,
- params.get("force", False),
- shutdown_response=shutdown_response,
- )
+ self._task_scheduler.register_action(self._purge_history, "purge_history")
+ self._task_scheduler.register_action(self._purge_room, "purge_room")
+ self._task_scheduler.register_action(
+ self._shutdown_and_purge_room, "shutdown_and_purge_room"
+ )
async def purge_history_for_rooms_in_range(
self, min_ms: Optional[int], max_ms: Optional[int]
@@ -241,14 +158,6 @@ class PaginationHandler:
for room_id, retention_policy in rooms.items():
logger.info("[purge] Attempting to purge messages in room %s", room_id)
- if room_id in self._purges_in_progress_by_room:
- logger.warning(
- "[purge] not purging room %s as there's an ongoing purge running"
- " for this room",
- room_id,
- )
- continue
-
# If max_lifetime is None, it means that the room has no retention policy.
# Given we only retrieve such rooms when there's a default retention policy
# defined in the server's configuration, we can safely assume that's the
@@ -330,46 +239,49 @@ class PaginationHandler:
Returns:
unique ID for this purge transaction.
"""
- if room_id in self._purges_in_progress_by_room:
- raise SynapseError(
- 400, "History purge already in progress for %s" % (room_id,)
- )
-
- purge_id = random_string(16)
+ purge_id = await self._task_scheduler.schedule_task(
+ "purge_history",
+ resource_id=room_id,
+ params={"token": token, "delete_local_events": delete_local_events},
+ )
# we log the purge_id here so that it can be tied back to the
# request id in the log lines.
logger.info("[purge] starting purge_id %s", purge_id)
- await self.store.upsert_room_to_delete(
- room_id,
- purge_id,
- DeleteStatus.ACTION_PURGE_HISTORY,
- DeleteStatus.STATUS_PURGING,
- params=json.dumps(
- {"token": token, "delete_local_events": delete_local_events}
- ),
- )
-
- run_as_background_process(
- "purge_history",
- self._purge_history,
- purge_id,
- room_id,
- token,
- delete_local_events,
- True,
- )
return purge_id
async def _purge_history(
self,
- purge_id: str,
+ task: ScheduledTask,
+ first_launch: bool,
+ ) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]:
+ if (
+ task.resource_id is None
+ or task.params is None
+ or "token" not in task.params
+ or "delete_local_events" not in task.params
+ ):
+ return (
+ TaskStatus.FAILED,
+ None,
+ "Not enough parameters passed to _purge_history",
+ )
+ err = await self.purge_history(
+ task.resource_id,
+ task.params["token"],
+ task.params["delete_local_events"],
+ )
+ if err is not None:
+ return TaskStatus.FAILED, None, err
+ return TaskStatus.COMPLETE, None, None
+
+ async def purge_history(
+ self,
room_id: str,
token: str,
delete_local_events: bool,
- update_rooms_to_delete_table: bool,
- ) -> None:
+ ) -> Optional[str]:
"""Carry out a history purge on a room.
Args:
@@ -382,88 +294,54 @@ class PaginationHandler:
functionality since we don't need to explicitly restore those, they
will be relaunch by the retention logic.
"""
- self._purges_in_progress_by_room.add(room_id)
try:
async with self.pagination_lock.write(room_id):
await self._storage_controllers.purge_events.purge_history(
room_id, token, delete_local_events
)
logger.info("[purge] complete")
- if update_rooms_to_delete_table:
- await self.store.upsert_room_to_delete(
- room_id,
- purge_id,
- DeleteStatus.ACTION_PURGE_HISTORY,
- DeleteStatus.STATUS_COMPLETE,
- timestamp=self.clock.time_msec(),
- )
+ return None
except Exception:
f = Failure()
logger.error(
"[purge] failed", exc_info=(f.type, f.value, f.getTracebackObject())
)
- if update_rooms_to_delete_table:
- await self.store.upsert_room_to_delete(
- room_id,
- purge_id,
- DeleteStatus.ACTION_PURGE_HISTORY,
- DeleteStatus.STATUS_FAILED,
- error=f.getErrorMessage(),
- timestamp=self.clock.time_msec(),
- )
- finally:
- self._purges_in_progress_by_room.discard(room_id)
-
- if update_rooms_to_delete_table:
- # remove the purge from the list 24 hours after it completes
- async def clear_purge() -> None:
- await self.store.delete_room_to_delete(room_id, purge_id)
-
- self.hs.get_reactor().callLater(
- PaginationHandler.CLEAR_PURGE_AFTER_MS / 1000, clear_purge
- )
+ return f.getErrorMessage()
- @staticmethod
- def _convert_to_delete_status(res: Dict[str, Any]) -> DeleteStatus:
- status = DeleteStatus()
- status.delete_id = res["delete_id"]
- status.action = res["action"]
- status.status = res["status"]
- if "error" in res:
- status.error = res["error"]
-
- if status.action == DeleteStatus.ACTION_SHUTDOWN and res["response"]:
- status.shutdown_room = json.loads(res["response"])
-
- return status
-
- async def get_delete_status(self, delete_id: str) -> Optional[DeleteStatus]:
+ async def get_delete_task(self, delete_id: str) -> Optional[ScheduledTask]:
"""Get the current status of an active deleting
Args:
delete_id: delete_id returned by start_shutdown_and_purge_room
or start_purge_history.
"""
- res = await self.store.get_room_to_delete(delete_id)
- if res:
- return PaginationHandler._convert_to_delete_status(res)
- return None
+ return await self._task_scheduler.get_task(delete_id)
- async def get_delete_statuses_by_room(self, room_id: str) -> List[DeleteStatus]:
+ async def get_delete_tasks_by_room(self, room_id: str) -> List[ScheduledTask]:
"""Get all active delete statuses by room
Args:
room_id: room_id that is deleted
"""
- res = await self.store.get_rooms_to_delete(room_id)
- return [PaginationHandler._convert_to_delete_status(r) for r in res]
+ return await self._task_scheduler.get_tasks(
+ actions=["purge_room", "shutdown_and_purge_room"], resource_ids=[room_id]
+ )
+
+ async def _purge_room(
+ self,
+ task: ScheduledTask,
+ first_launch: bool,
+ ) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]:
+ if not task.resource_id:
+ raise Exception("No room id passed to purge_room task")
+ params = task.params if task.params else {}
+ await self.purge_room(task.resource_id, params.get("force", False))
+ return TaskStatus.COMPLETE, None, None
async def purge_room(
self,
room_id: str,
- delete_id: str,
- force: bool = False,
- shutdown_response: Optional[ShutdownRoomResponse] = None,
+ force: bool,
) -> None:
"""Purge the given room from the database.
@@ -475,10 +353,6 @@ class PaginationHandler:
"""
logger.info("starting purge room_id=%s force=%s", room_id, force)
- action = DeleteStatus.ACTION_PURGE
- if shutdown_response:
- action = DeleteStatus.ACTION_SHUTDOWN
-
async with self.pagination_lock.write(room_id):
# first check that we have no users in this room
joined = await self.store.is_host_joined(room_id, self._server_name)
@@ -491,25 +365,8 @@ class PaginationHandler:
else:
raise SynapseError(400, "Users are still joined to this room")
- await self.store.upsert_room_to_delete(
- room_id,
- delete_id,
- action,
- DeleteStatus.STATUS_PURGING,
- response=json.dumps(shutdown_response),
- )
-
await self._storage_controllers.purge_events.purge_room(room_id)
- await self.store.upsert_room_to_delete(
- room_id,
- delete_id,
- action,
- DeleteStatus.STATUS_COMPLETE,
- timestamp=self.clock.time_msec(),
- response=json.dumps(shutdown_response),
- )
-
logger.info("purge complete for room_id %s", room_id)
@trace
@@ -789,11 +646,9 @@ class PaginationHandler:
async def _shutdown_and_purge_room(
self,
- room_id: str,
- delete_id: str,
- shutdown_params: ShutdownRoomParams,
- shutdown_response: Optional[ShutdownRoomResponse] = None,
- ) -> None:
+ task: ScheduledTask,
+ first_launch: bool,
+ ) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]:
"""
Shuts down and purges a room.
@@ -807,50 +662,36 @@ class PaginationHandler:
Keeps track of the `DeleteStatus` (and `ShutdownRoomResponse`) in `self._delete_by_id` and persisted in DB
"""
-
- self._purges_in_progress_by_room.add(room_id)
- try:
- shutdown_response = await self._room_shutdown_handler.shutdown_room(
- room_id=room_id,
- delete_id=delete_id,
- shutdown_params=shutdown_params,
- shutdown_response=shutdown_response,
+ if task.resource_id is None or task.params is None:
+ raise Exception(
+ "No room id and/or no parameters passed to shutdown_and_purge_room task"
)
- if shutdown_params["purge"]:
- await self.purge_room(
- room_id,
- delete_id,
- shutdown_params["force_purge"],
- shutdown_response=shutdown_response,
- )
+ room_id = task.resource_id
- await self.store.upsert_room_to_delete(
- room_id,
- delete_id,
- DeleteStatus.ACTION_SHUTDOWN,
- DeleteStatus.STATUS_COMPLETE,
- timestamp=self.clock.time_msec(),
- response=json.dumps(shutdown_response),
- )
- except Exception:
- f = Failure()
- logger.error(
- "failed",
- exc_info=(f.type, f.value, f.getTracebackObject()),
- )
- await self.store.upsert_room_to_delete(
+ async def update_result(result: Optional[JsonMapping]) -> None:
+ await self._task_scheduler.update_task(task.id, result=result)
+
+ shutdown_result = await self._room_shutdown_handler.shutdown_room(
+ room_id, task.params, task.result, update_result
+ )
+
+ if task.params.get("purge", False):
+ await self.purge_room(
room_id,
- delete_id,
- DeleteStatus.ACTION_SHUTDOWN,
- DeleteStatus.STATUS_FAILED,
- timestamp=self.clock.time_msec(),
- error=f.getErrorMessage(),
+ task.params.get("force_purge", False),
)
- finally:
- self._purges_in_progress_by_room.discard(room_id)
- def start_shutdown_and_purge_room(
+ return (TaskStatus.COMPLETE, shutdown_result, None)
+
+ async def get_current_delete_tasks(self, room_id: str) -> List[ScheduledTask]:
+ return await self._task_scheduler.get_tasks(
+ actions=["purge_history", "purge_room", "shutdown_and_purge_room"],
+ resource_ids=[room_id],
+ statuses=[TaskStatus.ACTIVE, TaskStatus.SCHEDULED],
+ )
+
+ async def start_shutdown_and_purge_room(
self,
room_id: str,
shutdown_params: ShutdownRoomParams,
@@ -864,7 +705,7 @@ class PaginationHandler:
Returns:
unique ID for this delete transaction.
"""
- if room_id in self._purges_in_progress_by_room:
+ if len(await self.get_current_delete_tasks(room_id)) > 0:
raise SynapseError(400, "Purge already in progress for %s" % (room_id,))
# This check is double to `RoomShutdownHandler.shutdown_room`
@@ -877,7 +718,11 @@ class PaginationHandler:
400, "User must be our own: %s" % (new_room_user_id,)
)
- delete_id = random_string(16)
+ delete_id = await self._task_scheduler.schedule_task(
+ "shutdown_and_purge_room",
+ resource_id=room_id,
+ params=shutdown_params,
+ )
# we log the delete_id here so that it can be tied back to the
# request id in the log lines.
@@ -887,11 +732,4 @@ class PaginationHandler:
delete_id,
)
- run_as_background_process(
- "shutdown_and_purge_room",
- self._shutdown_and_purge_room,
- room_id,
- delete_id,
- shutdown_params,
- )
return delete_id
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index f7f9d9d2f5..ce678d41fc 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -14,14 +14,13 @@
"""Contains functions for performing actions on rooms."""
import itertools
-import json
import logging
import math
import random
import string
from collections import OrderedDict
from http import HTTPStatus
-from typing import TYPE_CHECKING, Any, Awaitable, Dict, List, Optional, Tuple
+from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, List, Optional, Tuple
import attr
from typing_extensions import TypedDict
@@ -59,6 +58,7 @@ from synapse.rest.admin._base import assert_user_is_admin
from synapse.streams import EventSource
from synapse.types import (
JsonDict,
+ JsonMapping,
MutableStateMap,
Requester,
RoomAlias,
@@ -1883,10 +1883,12 @@ class RoomShutdownHandler:
async def shutdown_room(
self,
room_id: str,
- delete_id: str,
- shutdown_params: ShutdownRoomParams,
- shutdown_response: Optional[ShutdownRoomResponse] = None,
- ) -> ShutdownRoomResponse:
+ params: JsonMapping,
+ result: Optional[JsonMapping] = None,
+ update_result_fct: Optional[
+ Callable[[Optional[JsonMapping]], Awaitable[None]]
+ ] = None,
+ ) -> Optional[JsonMapping]:
"""
Shuts down a room. Moves all local users and room aliases automatically
to a new room if `new_room_user_id` is set. Otherwise local users only
@@ -1908,21 +1910,16 @@ class RoomShutdownHandler:
Returns: a dict matching `ShutdownRoomResponse`.
"""
-
- requester_user_id = shutdown_params["requester_user_id"]
- new_room_user_id = shutdown_params["new_room_user_id"]
- block = shutdown_params["block"]
+ requester_user_id = params["requester_user_id"]
+ new_room_user_id = params["new_room_user_id"]
+ block = params["block"]
new_room_name = (
- shutdown_params["new_room_name"]
- if shutdown_params["new_room_name"]
+ params["new_room_name"]
+ if params["new_room_name"]
else self.DEFAULT_ROOM_NAME
)
- message = (
- shutdown_params["message"]
- if shutdown_params["message"]
- else self.DEFAULT_MESSAGE
- )
+ message = params["message"] if params["message"] else self.DEFAULT_MESSAGE
if not RoomID.is_valid(room_id):
raise SynapseError(400, "%s is not a legal room ID" % (room_id,))
@@ -1934,21 +1931,15 @@ class RoomShutdownHandler:
403, "Shutdown of this room is forbidden", Codes.FORBIDDEN
)
- if not shutdown_response:
- shutdown_response = {
+ result = (
+ dict(result)
+ if result
+ else {
"kicked_users": [],
"failed_to_kick_users": [],
"local_aliases": [],
"new_room_id": None,
}
-
- await self.store.upsert_room_to_delete(
- room_id,
- delete_id,
- DeleteStatus.ACTION_SHUTDOWN,
- DeleteStatus.STATUS_SHUTTING_DOWN,
- params=json.dumps(shutdown_params),
- response=json.dumps(shutdown_response),
)
# Action the block first (even if the room doesn't exist yet)
@@ -1959,9 +1950,9 @@ class RoomShutdownHandler:
if not await self.store.get_room(room_id):
# if we don't know about the room, there is nothing left to do.
- return shutdown_response
+ return result
- new_room_id = shutdown_response.get("new_room_id")
+ new_room_id = result.get("new_room_id")
if new_room_user_id is not None and new_room_id is None:
if not self.hs.is_mine_id(new_room_user_id):
raise SynapseError(
@@ -1982,15 +1973,9 @@ class RoomShutdownHandler:
ratelimit=False,
)
- shutdown_response["new_room_id"] = new_room_id
- await self.store.upsert_room_to_delete(
- room_id,
- delete_id,
- DeleteStatus.ACTION_SHUTDOWN,
- DeleteStatus.STATUS_SHUTTING_DOWN,
- params=json.dumps(shutdown_params),
- response=json.dumps(shutdown_response),
- )
+ result["new_room_id"] = new_room_id
+ if update_result_fct:
+ await update_result_fct(result)
logger.info(
"Shutting down room %r, joining to new room: %r", room_id, new_room_id
@@ -2053,28 +2038,16 @@ class RoomShutdownHandler:
require_consent=False,
)
- shutdown_response["kicked_users"].append(user_id)
- await self.store.upsert_room_to_delete(
- room_id,
- delete_id,
- DeleteStatus.ACTION_SHUTDOWN,
- DeleteStatus.STATUS_SHUTTING_DOWN,
- params=json.dumps(shutdown_params),
- response=json.dumps(shutdown_response),
- )
+ result["kicked_users"].append(user_id)
+ if update_result_fct:
+ await update_result_fct(result)
except Exception:
logger.exception(
"Failed to leave old room and join new room for %r", user_id
)
- shutdown_response["failed_to_kick_users"].append(user_id)
- await self.store.upsert_room_to_delete(
- room_id,
- delete_id,
- DeleteStatus.ACTION_SHUTDOWN,
- DeleteStatus.STATUS_SHUTTING_DOWN,
- params=json.dumps(shutdown_params),
- response=json.dumps(shutdown_response),
- )
+ result["failed_to_kick_users"].append(user_id)
+ if update_result_fct:
+ await update_result_fct(result)
# Send message in new room and move aliases
if new_room_user_id:
@@ -2093,7 +2066,7 @@ class RoomShutdownHandler:
ratelimit=False,
)
- shutdown_response["local_aliases"] = list(
+ result["local_aliases"] = list(
await self.store.get_aliases_for_room(room_id)
)
@@ -2102,6 +2075,6 @@ class RoomShutdownHandler:
room_id, new_room_id, requester_user_id
)
else:
- shutdown_response["local_aliases"] = []
+ result["local_aliases"] = []
- return shutdown_response
+ return result
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index e3147924f7..db98cf856c 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -38,7 +38,6 @@ from synapse.event_auth import get_named_level, get_power_level_event
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
from synapse.handlers.profile import MAX_AVATAR_URL_LEN, MAX_DISPLAYNAME_LEN
-from synapse.handlers.room import DeleteStatus
from synapse.handlers.state_deltas import MatchChange, StateDeltasHandler
from synapse.logging import opentracing
from synapse.metrics import event_processing_positions
@@ -57,7 +56,6 @@ from synapse.types import (
from synapse.types.state import StateFilter
from synapse.util.async_helpers import Linearizer
from synapse.util.distributor import user_left_room
-from synapse.util.stringutils import random_string
if TYPE_CHECKING:
from synapse.server import HomeServer
@@ -96,6 +94,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
self.event_creation_handler = hs.get_event_creation_handler()
self.account_data_handler = hs.get_account_data_handler()
self.event_auth_handler = hs.get_event_auth_handler()
+ self.task_scheduler = hs.get_task_scheduler()
self.member_linearizer: Linearizer = Linearizer(name="member")
self.member_as_limiter = Linearizer(max_count=10, name="member_as_limiter")
@@ -318,12 +317,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
and self._purge_retention_period
and await self.store.is_locally_forgotten_room(room_id)
):
- delete_id = random_string(16)
- await self.store.upsert_room_to_delete(
- room_id,
- delete_id,
- DeleteStatus.ACTION_PURGE,
- DeleteStatus.STATUS_SCHEDULED,
+ await self.task_scheduler.schedule_task(
+ "purge_room",
+ resource_id=room_id,
timestamp=self.clock.time_msec() + self._purge_retention_period,
)
diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py
index 0cabdd1dc6..fafa9ea428 100644
--- a/synapse/rest/admin/__init__.py
+++ b/synapse/rest/admin/__init__.py
@@ -93,7 +93,7 @@ from synapse.rest.admin.users import (
UserTokenRestServlet,
WhoisRestServlet,
)
-from synapse.types import JsonDict, RoomStreamToken
+from synapse.types import JsonDict, RoomStreamToken, TaskStatus
from synapse.util import SYNAPSE_VERSION
if TYPE_CHECKING:
@@ -215,12 +215,21 @@ class PurgeHistoryStatusRestServlet(RestServlet):
) -> Tuple[int, JsonDict]:
await assert_requester_is_admin(self.auth, request)
- purge_status = await self.pagination_handler.get_delete_status(purge_id)
- if purge_status is None:
+ purge_task = await self.pagination_handler.get_delete_task(purge_id)
+ if purge_task is None or purge_task.action != "purge_history":
raise NotFoundError("purge id '%s' not found" % purge_id)
+ result = {
+ "status": purge_task.status
+ if purge_task.status == TaskStatus.COMPLETE
+ or purge_task.status == TaskStatus.FAILED
+ else "active",
+ }
+ if purge_task.error:
+ result["error"] = purge_task.error
+
# TODO active vs purging etc
- return HTTPStatus.OK, purge_status.asdict(use_purge_history_format=True)
+ return HTTPStatus.OK, result
########################################################################################
diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py
index 9e31d018b1..62cdb9af38 100644
--- a/synapse/rest/admin/rooms.py
+++ b/synapse/rest/admin/rooms.py
@@ -19,7 +19,6 @@ from urllib import parse as urlparse
from synapse.api.constants import Direction, EventTypes, JoinRules, Membership
from synapse.api.errors import AuthError, Codes, NotFoundError, SynapseError
from synapse.api.filtering import Filter
-from synapse.handlers.room import DeleteStatus
from synapse.http.servlet import (
ResolveRoomIdMixin,
RestServlet,
@@ -37,10 +36,16 @@ from synapse.rest.admin._base import (
)
from synapse.storage.databases.main.room import RoomSortOrder
from synapse.streams.config import PaginationConfig
-from synapse.types import JsonDict, RoomID, UserID, create_requester
+from synapse.types import (
+ JsonDict,
+ RoomID,
+ ScheduledTask,
+ TaskStatus,
+ UserID,
+ create_requester,
+)
from synapse.types.state import StateFilter
from synapse.util import json_decoder
-from synapse.util.stringutils import random_string
if TYPE_CHECKING:
from synapse.api.auth import Auth
@@ -119,7 +124,7 @@ class RoomRestV2Servlet(RestServlet):
403, "Shutdown of this room is forbidden", Codes.FORBIDDEN
)
- delete_id = self._pagination_handler.start_shutdown_and_purge_room(
+ delete_id = await self._pagination_handler.start_shutdown_and_purge_room(
room_id=room_id,
shutdown_params={
"new_room_user_id": content.get("new_room_user_id"),
@@ -135,6 +140,14 @@ class RoomRestV2Servlet(RestServlet):
return HTTPStatus.OK, {"delete_id": delete_id}
+def _convert_delete_task_to_response(task: ScheduledTask) -> JsonDict:
+ return {
+ "delete_id": task.id,
+ "status": task.status,
+ "shutdown_room": task.result,
+ }
+
+
class DeleteRoomStatusByRoomIdRestServlet(RestServlet):
"""Get the status of the delete room background task."""
@@ -154,16 +167,14 @@ class DeleteRoomStatusByRoomIdRestServlet(RestServlet):
HTTPStatus.BAD_REQUEST, "%s is not a legal room ID" % (room_id,)
)
- delete_statuses = await self._pagination_handler.get_delete_statuses_by_room(
- room_id
- )
+ delete_tasks = await self._pagination_handler.get_delete_tasks_by_room(room_id)
response = []
- for delete_status in delete_statuses:
+ for delete_task in delete_tasks:
# We ignore scheduled deletes because currently they are only used
# for automatically purging forgotten room after X time.
- if delete_status.status != DeleteStatus.STATUS_SCHEDULED:
- response += [delete_status.asdict()]
+ if delete_task.status != TaskStatus.SCHEDULED:
+ response += [_convert_delete_task_to_response(delete_task)]
if response:
return HTTPStatus.OK, {"results": cast(JsonDict, response)}
@@ -185,11 +196,14 @@ class DeleteRoomStatusByDeleteIdRestServlet(RestServlet):
) -> Tuple[int, JsonDict]:
await assert_requester_is_admin(self._auth, request)
- delete_status = await self._pagination_handler.get_delete_status(delete_id)
- if delete_status is None:
+ delete_task = await self._pagination_handler.get_delete_task(delete_id)
+ if delete_task is None or (
+ delete_task.action != "purge_room"
+ and delete_task.action != "shutdown_and_purge_room"
+ ):
raise NotFoundError("delete id '%s' not found" % delete_id)
- return HTTPStatus.OK, cast(JsonDict, delete_status.asdict())
+ return HTTPStatus.OK, _convert_delete_task_to_response(delete_task)
class ListRoomRestServlet(RestServlet):
@@ -351,12 +365,9 @@ class RoomRestServlet(RestServlet):
Codes.BAD_JSON,
)
- delete_id = random_string(16)
-
ret = await room_shutdown_handler.shutdown_room(
room_id=room_id,
- delete_id=delete_id,
- shutdown_params={
+ params={
"new_room_user_id": content.get("new_room_user_id"),
"new_room_name": content.get("room_name"),
"message": content.get("message"),
@@ -370,9 +381,7 @@ class RoomRestServlet(RestServlet):
# Purge room
if purge:
try:
- await pagination_handler.purge_room(
- room_id, delete_id, force=force_purge
- )
+ await pagination_handler.purge_room(room_id, force=force_purge)
except NotFoundError:
if block:
# We can block unknown rooms with this endpoint, in which case
diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index 2c0b573d4c..582875c91a 100644
--- a/synapse/storage/databases/main/roommember.py
+++ b/synapse/storage/databases/main/roommember.py
@@ -17,7 +17,6 @@ from itertools import chain
from typing import (
TYPE_CHECKING,
AbstractSet,
- Any,
Collection,
Dict,
FrozenSet,
@@ -1284,113 +1283,6 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
# If any rows still exist it means someone has not forgotten this room yet
return not rows[0][0]
- async def upsert_room_to_delete(
- self,
- room_id: str,
- delete_id: str,
- action: str,
- status: str,
- timestamp: Optional[int] = None,
- params: Optional[str] = None,
- response: Optional[str] = None,
- error: Optional[str] = None,
- ) -> None:
- """Insert or update a room to shutdown/purge.
-
- Args:
- room_id: The room ID to shutdown/purge
- delete_id: The delete ID identifying this action
- action: the type of job, mainly `shutdown` `purge` or `purge_history`
- status: Current status of the delete. Cf `DeleteStatus` for possible values
- timestamp: Time of the last update. If status is `wait_purge`,
- then it specifies when to do the purge, with an empty value specifying ASAP
- error: Error message to return, if any
- params: JSON representation of delete job parameters
- response: JSON representation of delete current status
- """
- await self.db_pool.simple_upsert(
- "rooms_to_delete",
- {
- "room_id": room_id,
- "delete_id": delete_id,
- },
- {
- "action": action,
- "status": status,
- "timestamp": timestamp,
- "params": params,
- "response": response,
- "error": error,
- },
- desc="upsert_room_to_delete",
- )
-
- async def delete_room_to_delete(self, room_id: str, delete_id: str) -> None:
- """Remove a room from the list of rooms to purge.
-
- Args:
- room_id: The room ID matching the delete to remove
- delete_id: The delete ID identifying the delete to remove
- """
-
- await self.db_pool.simple_delete(
- "rooms_to_delete",
- keyvalues={
- "room_id": room_id,
- "delete_id": delete_id,
- },
- desc="delete_room_to_delete",
- )
-
- async def get_rooms_to_delete(
- self, room_id: Optional[str] = None
- ) -> List[Dict[str, Any]]:
- """Returns all delete jobs. This includes those that have been
- interrupted by a stop/restart of synapse, but also scheduled ones
- like locally forgotten rooms.
-
- Args:
- room_id: if specified, will only return the delete jobs for a specific room
-
- """
- keyvalues = {}
- if room_id is not None:
- keyvalues["room_id"] = room_id
-
- return await self.db_pool.simple_select_list(
- table="rooms_to_delete",
- keyvalues=keyvalues,
- retcols=(
- "room_id",
- "delete_id",
- "action",
- "status",
- "timestamp",
- "params",
- "response",
- "error",
- ),
- desc="rooms_to_delete_fetch",
- )
-
- async def get_room_to_delete(self, delete_id: str) -> Optional[Dict[str, Any]]:
- """Return the delete job identified by delete_id."""
- return await self.db_pool.simple_select_one(
- table="rooms_to_delete",
- keyvalues={"delete_id": delete_id},
- retcols=(
- "room_id",
- "delete_id",
- "action",
- "status",
- "timestamp",
- "params",
- "response",
- "error",
- ),
- desc="rooms_to_delete_fetch",
- )
-
async def get_rooms_user_has_been_in(self, user_id: str) -> Set[str]:
"""Get all rooms that the user has ever been in.
diff --git a/synapse/storage/schema/main/delta/78/03_rooms_to_delete.sql b/synapse/storage/schema/main/delta/78/03_rooms_to_delete.sql
deleted file mode 100644
index 8f9c8c7010..0000000000
--- a/synapse/storage/schema/main/delta/78/03_rooms_to_delete.sql
+++ /dev/null
@@ -1,27 +0,0 @@
-/* Copyright 2023 The Matrix.org Foundation C.I.C
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
--- cf upsert_room_to_delete docstring for the meaning of the fields.
-CREATE TABLE IF NOT EXISTS rooms_to_delete(
- room_id text NOT NULL,
- delete_id text NOT NULL,
- action text NOT NULL,
- status text NOT NULL,
- timestamp bigint,
- params text,
- response text,
- error text,
- UNIQUE(room_id, delete_id)
-);
diff --git a/tests/rest/admin/test_room.py b/tests/rest/admin/test_room.py
index ba8afbc2b9..6b1138acb6 100644
--- a/tests/rest/admin/test_room.py
+++ b/tests/rest/admin/test_room.py
@@ -24,12 +24,11 @@ from twisted.test.proto_helpers import MemoryReactor
import synapse.rest.admin
from synapse.api.constants import EventTypes, Membership, RoomTypes
from synapse.api.errors import Codes
-from synapse.handlers.pagination import DeleteStatus, PaginationHandler
from synapse.rest.client import directory, events, login, room
from synapse.server import HomeServer
from synapse.types import UserID
from synapse.util import Clock
-from synapse.util.stringutils import random_string
+from synapse.util.task_scheduler import TaskScheduler
from tests import unittest
@@ -50,6 +49,7 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase):
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.event_creation_handler = hs.get_event_creation_handler()
+ self.task_scheduler = hs.get_task_scheduler()
hs.config.consent.user_consent_version = "1"
consent_uri_builder = Mock()
@@ -480,6 +480,7 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.event_creation_handler = hs.get_event_creation_handler()
+ self.task_scheduler = hs.get_task_scheduler()
hs.config.consent.user_consent_version = "1"
consent_uri_builder = Mock()
@@ -668,7 +669,7 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
delete_id1 = channel.json_body["delete_id"]
# go ahead
- self.reactor.advance(PaginationHandler.CLEAR_PURGE_AFTER_MS / 1000 / 2)
+ self.reactor.advance(TaskScheduler.KEEP_TASKS_FOR_MS / 1000 / 2)
# second task
channel = self.make_request(
@@ -700,7 +701,7 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
# get status after more than clearing time for first task
# second task is not cleared
- self.reactor.advance(PaginationHandler.CLEAR_PURGE_AFTER_MS / 1000 / 2)
+ self.reactor.advance(TaskScheduler.KEEP_TASKS_FOR_MS / 1000 / 2)
channel = self.make_request(
"GET",
@@ -714,7 +715,7 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
self.assertEqual(delete_id2, channel.json_body["results"][0]["delete_id"])
# get status after more than clearing time for all tasks
- self.reactor.advance(PaginationHandler.CLEAR_PURGE_AFTER_MS / 1000 / 2)
+ self.reactor.advance(TaskScheduler.KEEP_TASKS_FOR_MS / 1000 / 2)
channel = self.make_request(
"GET",
@@ -725,48 +726,6 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
self.assertEqual(404, channel.code, msg=channel.json_body)
self.assertEqual(Codes.NOT_FOUND, channel.json_body["errcode"])
- def test_delete_same_room_twice(self) -> None:
- """Test that the call for delete a room at second time gives an exception."""
-
- body = {"new_room_user_id": self.admin_user}
-
- # first call to delete room
- # and do not wait for finish the task
- first_channel = self.make_request(
- "DELETE",
- self.url.encode("ascii"),
- content=body,
- access_token=self.admin_user_tok,
- await_result=False,
- )
-
- # second call to delete room
- second_channel = self.make_request(
- "DELETE",
- self.url.encode("ascii"),
- content=body,
- access_token=self.admin_user_tok,
- )
-
- self.assertEqual(400, second_channel.code, msg=second_channel.json_body)
- self.assertEqual(Codes.UNKNOWN, second_channel.json_body["errcode"])
- self.assertEqual(
- f"Purge already in progress for {self.room_id}",
- second_channel.json_body["error"],
- )
-
- # get result of first call
- first_channel.await_result()
- self.assertEqual(200, first_channel.code, msg=first_channel.json_body)
- self.assertIn("delete_id", first_channel.json_body)
-
- # check status after finish the task
- self._test_result(
- first_channel.json_body["delete_id"],
- self.other_user,
- expect_new_room=True,
- )
-
def test_purge_room_and_block(self) -> None:
"""Test to purge a room and block it.
Members will not be moved to a new room and will not receive a message.
@@ -1005,7 +964,7 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
self._is_purged(room_id)
- def test_resume_purge_room(self) -> None:
+ def test_scheduled_purge_room(self) -> None:
# Create a test room
room_id = self.helper.create_room_as(
self.admin_user,
@@ -1013,12 +972,12 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
)
self.helper.leave(room_id, user=self.admin_user, tok=self.admin_user_tok)
+ # Schedule a purge 10 seconds in the future
self.get_success(
- self.store.upsert_room_to_delete(
- room_id,
- random_string(16),
- DeleteStatus.ACTION_PURGE,
- DeleteStatus.STATUS_PURGING,
+ self.task_scheduler.schedule_task(
+ "purge_room",
+ resource_id=room_id,
+ timestamp=self.clock.time_msec() + 10 * 1000,
)
)
@@ -1026,38 +985,34 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
with self.assertRaises(AssertionError):
self._is_purged(room_id)
- # Advance one hour in the future past `PURGE_ROOMS_INTERVAL_MS` so that
- # the automatic purging takes place and resumes the purge
+ # Advance one hour in the future past `TaskScheduler.SCHEDULE_INTERVAL_MS` so that
+ # the automatic purging takes place and launch the purge
self.reactor.advance(ONE_HOUR_IN_S)
self._is_purged(room_id)
- def test_resume_shutdown_room(self) -> None:
+ def test_schedule_shutdown_room(self) -> None:
# Create a test room
room_id = self.helper.create_room_as(
self.other_user,
tok=self.other_user_tok,
)
- delete_id = random_string(16)
-
- self.get_success(
- self.store.upsert_room_to_delete(
- room_id,
- delete_id,
- DeleteStatus.ACTION_SHUTDOWN,
- DeleteStatus.STATUS_SHUTTING_DOWN,
- params=json.dumps(
- {
- "requester_user_id": self.admin_user,
- "new_room_user_id": self.admin_user,
- "new_room_name": None,
- "message": None,
- "block": False,
- "purge": True,
- "force_purge": True,
- }
- ),
+ # Schedule a shutdown 10 seconds in the future
+ delete_id = self.get_success(
+ self.task_scheduler.schedule_task(
+ "shutdown_and_purge_room",
+ resource_id=room_id,
+ params={
+ "requester_user_id": self.admin_user,
+ "new_room_user_id": self.admin_user,
+ "new_room_name": None,
+ "message": None,
+ "block": False,
+ "purge": True,
+ "force_purge": True,
+ },
+ timestamp=self.clock.time_msec() + 10 * 1000,
)
)
@@ -1068,7 +1023,7 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
with self.assertRaises(AssertionError):
self._is_purged(room_id)
- # Advance one hour in the future past `PURGE_ROOMS_INTERVAL_MS` so that
+ # Advance one hour in the future past `TaskScheduler.SCHEDULE_INTERVAL_MS` so that
# the automatic purging takes place and resumes the purge
self.reactor.advance(ONE_HOUR_IN_S)
@@ -2081,14 +2036,11 @@ class RoomMessagesTestCase(unittest.HomeserverTestCase):
self.assertEqual(len(chunk), 2, [event["content"] for event in chunk])
# Purge every event before the second event.
- purge_id = random_string(16)
self.get_success(
- pagination_handler._purge_history(
- purge_id=purge_id,
+ pagination_handler.purge_history(
room_id=self.room_id,
token=second_token_str,
delete_local_events=True,
- update_rooms_to_delete_table=True,
)
)
diff --git a/tests/rest/admin/test_server_notice.py b/tests/rest/admin/test_server_notice.py
index d14da9fd0e..dfd14f5751 100644
--- a/tests/rest/admin/test_server_notice.py
+++ b/tests/rest/admin/test_server_notice.py
@@ -414,13 +414,12 @@ class ServerNoticeTestCase(unittest.HomeserverTestCase):
self.assertEqual(messages[0]["content"]["body"], "test msg one")
self.assertEqual(messages[0]["sender"], "@notices:test")
- delete_id = random_string(16)
+ random_string(16)
# shut down and purge room
self.get_success(
self.room_shutdown_handler.shutdown_room(
first_room_id,
- delete_id,
{
"requester_user_id": self.admin_user,
"new_room_user_id": None,
@@ -432,7 +431,7 @@ class ServerNoticeTestCase(unittest.HomeserverTestCase):
},
)
)
- self.get_success(self.pagination_handler.purge_room(first_room_id, "delete_id"))
+ self.get_success(self.pagination_handler.purge_room(first_room_id, force=False))
# user is not member anymore
self._check_invite_and_join_status(self.other_user, 0, 0)
diff --git a/tests/rest/client/test_rooms.py b/tests/rest/client/test_rooms.py
index 98c3f99d11..88643cca9e 100644
--- a/tests/rest/client/test_rooms.py
+++ b/tests/rest/client/test_rooms.py
@@ -2088,14 +2088,11 @@ class RoomMessageListTestCase(RoomBase):
self.assertEqual(len(chunk), 2, [event["content"] for event in chunk])
# Purge every event before the second event.
- purge_id = random_string(16)
self.get_success(
- pagination_handler._purge_history(
- purge_id=purge_id,
+ pagination_handler.purge_history(
room_id=self.room_id,
token=second_token_str,
delete_local_events=True,
- update_rooms_to_delete_table=True,
)
)
|