diff options
author | Mathieu Velten <mathieuv@matrix.org> | 2023-06-26 17:07:53 +0200 |
---|---|---|
committer | Mathieu Velten <mathieuv@matrix.org> | 2023-06-30 15:58:11 +0200 |
commit | 5bce6397aa477f55a2eb89a44ee449bdc486f8c4 (patch) | |
tree | f9d38d81f56de6d4e2ea89b65343d891250aee45 /synapse | |
parent | comments (diff) | |
download | synapse-5bce6397aa477f55a2eb89a44ee449bdc486f8c4.tar.xz |
Use DB for all purge/shutdown actions, including purge history
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/handlers/pagination.py | 239 | ||||
-rw-r--r-- | synapse/handlers/room.py | 45 | ||||
-rw-r--r-- | synapse/handlers/room_member.py | 15 | ||||
-rw-r--r-- | synapse/rest/admin/__init__.py | 5 | ||||
-rw-r--r-- | synapse/rest/admin/rooms.py | 20 | ||||
-rw-r--r-- | synapse/storage/databases/main/roommember.py | 80 | ||||
-rw-r--r-- | synapse/storage/schema/main/delta/78/03_rooms_to_delete.sql (renamed from synapse/storage/schema/main/delta/78/03_rooms_to_purge.sql) | 11 |
7 files changed, 238 insertions, 177 deletions
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index 4ed4c301e2..adedff94bd 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -14,9 +14,7 @@ # limitations under the License. import json import logging -from typing import TYPE_CHECKING, Dict, List, Optional, Set - -import attr +from typing import TYPE_CHECKING, List, Optional, Set from twisted.python.failure import Failure @@ -29,7 +27,7 @@ from synapse.logging.opentracing import trace from synapse.metrics.background_process_metrics import run_as_background_process from synapse.rest.admin._base import assert_user_is_admin from synapse.streams.config import PaginationConfig -from synapse.types import JsonDict, Requester, StrCollection, StreamKeyType +from synapse.types import JsonDict, Requester, StreamKeyType from synapse.types.state import StateFilter from synapse.util.async_helpers import ReadWriteLock from synapse.util.stringutils import random_string @@ -42,37 +40,6 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) -@attr.s(slots=True, auto_attribs=True) -class PurgeStatus: - """Object tracking the status of a purge request - - This class contains information on the progress of a purge request, for - return by get_purge_status. - """ - - STATUS_ACTIVE = 0 - STATUS_COMPLETE = 1 - STATUS_FAILED = 2 - - STATUS_TEXT = { - STATUS_ACTIVE: "active", - STATUS_COMPLETE: "complete", - STATUS_FAILED: "failed", - } - - # Save the error message if an error occurs - error: str = "" - - # Tracks whether this request has completed. One of STATUS_{ACTIVE,COMPLETE,FAILED}. - status: int = STATUS_ACTIVE - - def asdict(self) -> JsonDict: - ret = {"status": PurgeStatus.STATUS_TEXT[self.status]} - if self.error: - ret["error"] = self.error - return ret - - class PaginationHandler: """Handles pagination and purge history requests. @@ -100,13 +67,6 @@ class PaginationHandler: self.pagination_lock = ReadWriteLock() # IDs of rooms in which there currently an active purge *or delete* operation. self._purges_in_progress_by_room: Set[str] = set() - # map from purge id to PurgeStatus - self._purges_by_id: Dict[str, PurgeStatus] = {} - # map from purge id to DeleteStatus - self._delete_by_id: Dict[str, DeleteStatus] = {} - # map from room id to delete ids - # Dict[`room_id`, List[`delete_id`]] - self._delete_by_room: Dict[str, List[str]] = {} self._event_serializer = hs.get_event_client_serializer() self._retention_default_max_lifetime = ( @@ -151,11 +111,12 @@ class PaginationHandler: It should be run regularly. """ - rooms_to_purge = await self.store.get_rooms_to_purge() - for r in rooms_to_purge: + rooms_to_delete = await self.store.get_rooms_to_delete() + for r in rooms_to_delete: room_id = r["room_id"] delete_id = r["delete_id"] status = r["status"] + action = r["action"] timestamp = r["timestamp"] if ( @@ -165,32 +126,25 @@ class PaginationHandler: # remove the delete from the list 24 hours after it completes or fails ms_since_completed = self.clock.time_msec() - timestamp if ms_since_completed >= PaginationHandler.CLEAR_PURGE_AFTER_MS: - await self.store.delete_room_to_purge(room_id, delete_id) - - del self._delete_by_id[delete_id] - self._delete_by_room[room_id].remove(delete_id) - if not self._delete_by_room[room_id]: - del self._delete_by_room[room_id] + await self.store.delete_room_to_delete(room_id, delete_id) continue - delete_status = self._delete_by_id.get(delete_id) - if delete_status is not None: + if room_id in self._purges_in_progress_by_room: # a delete background task is already running (or has run) - # for this delete id, let's ignore it + # for this room id, let's ignore it for now continue - self._delete_by_id[delete_id] = DeleteStatus() - self._delete_by_id[delete_id].status = status - self._delete_by_room.setdefault(room_id, []).append(delete_id) - # If the database says we were last in the middle of shutting down the room, # let's continue the shutdown process. shutdown_response = None - if status == DeleteStatus.STATUS_SHUTTING_DOWN: - shutdown_params = json.loads(r["shutdown_params"]) - if r["shutdown_response"]: - shutdown_response = json.loads(r["shutdown_response"]) + if ( + action == DeleteStatus.ACTION_SHUTDOWN + and status == DeleteStatus.STATUS_SHUTTING_DOWN + ): + shutdown_params = json.loads(r["params"]) + if r["response"]: + shutdown_response = json.loads(r["response"]) await self._shutdown_and_purge_room( room_id, delete_id, @@ -204,16 +158,33 @@ class PaginationHandler: if status == DeleteStatus.STATUS_PURGING: purge_now = True # Or if we're at or past the scheduled purge time, let's start that one as well - elif status == DeleteStatus.STATUS_SCHEDULED_PURGE and ( + elif status == DeleteStatus.STATUS_SCHEDULED and ( timestamp is None or self.clock.time_msec() >= timestamp ): purge_now = True # TODO 2 stages purge, keep memberships for a while so we don't "break" sync if purge_now: - await self.purge_room( - room_id, delete_id, True, shutdown_response=shutdown_response - ) + params = {} + if r["params"]: + params = json.loads(r["params"]) + + if action == DeleteStatus.ACTION_PURGE_HISTORY: + if "token" in params: + await self._purge_history( + delete_id, + room_id, + params["token"], + params.get("delete_local_events", False), + True, + ) + elif action == DeleteStatus.ACTION_PURGE: + await self.purge_room( + room_id, + delete_id, + params.get("force", False), + shutdown_response=shutdown_response, + ) async def purge_history_for_rooms_in_range( self, min_ms: Optional[int], max_ms: Optional[int] @@ -323,8 +294,6 @@ class PaginationHandler: purge_id = random_string(16) - self._purges_by_id[purge_id] = PurgeStatus() - logger.info( "Starting purging events in room %s (purge_id %s)" % (room_id, purge_id) ) @@ -339,9 +308,10 @@ class PaginationHandler: room_id, token, True, + False, ) - def start_purge_history( + async def start_purge_history( self, room_id: str, token: str, delete_local_events: bool = False ) -> str: """Start off a history purge on a room. @@ -366,7 +336,16 @@ class PaginationHandler: # request id in the log lines. logger.info("[purge] starting purge_id %s", purge_id) - self._purges_by_id[purge_id] = PurgeStatus() + await self.store.upsert_room_to_delete( + room_id, + purge_id, + DeleteStatus.ACTION_PURGE_HISTORY, + DeleteStatus.STATUS_PURGING, + params=json.dumps( + {"token": token, "delete_local_events": delete_local_events} + ), + ) + run_as_background_process( "purge_history", self._purge_history, @@ -374,11 +353,17 @@ class PaginationHandler: room_id, token, delete_local_events, + True, ) return purge_id async def _purge_history( - self, purge_id: str, room_id: str, token: str, delete_local_events: bool + self, + purge_id: str, + room_id: str, + token: str, + delete_local_events: bool, + update_rooms_to_delete_table: bool, ) -> None: """Carry out a history purge on a room. @@ -387,6 +372,10 @@ class PaginationHandler: room_id: The room to purge from token: topological token to delete events before delete_local_events: True to delete local events as well as remote ones + update_rooms_to_delete_table: True if we don't want to update/persist this + purge history action to the DB to be restorable. Used with the retention + functionality since we don't need to explicitly restore those, they + will be relaunch by the retention logic. """ self._purges_in_progress_by_room.add(room_id) try: @@ -395,48 +384,75 @@ class PaginationHandler: room_id, token, delete_local_events ) logger.info("[purge] complete") - self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE + if update_rooms_to_delete_table: + await self.store.upsert_room_to_delete( + room_id, + purge_id, + DeleteStatus.ACTION_PURGE_HISTORY, + DeleteStatus.STATUS_COMPLETE, + timestamp=self.clock.time_msec(), + ) except Exception: f = Failure() logger.error( "[purge] failed", exc_info=(f.type, f.value, f.getTracebackObject()) ) - self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED - self._purges_by_id[purge_id].error = f.getErrorMessage() + if update_rooms_to_delete_table: + await self.store.upsert_room_to_delete( + room_id, + purge_id, + DeleteStatus.ACTION_PURGE_HISTORY, + DeleteStatus.STATUS_FAILED, + error=f.getErrorMessage(), + timestamp=self.clock.time_msec(), + ) finally: self._purges_in_progress_by_room.discard(room_id) - # remove the purge from the list 24 hours after it completes - def clear_purge() -> None: - del self._purges_by_id[purge_id] - - self.hs.get_reactor().callLater( - PaginationHandler.CLEAR_PURGE_AFTER_MS / 1000, clear_purge - ) - - def get_purge_status(self, purge_id: str) -> Optional[PurgeStatus]: - """Get the current status of an active purge + if update_rooms_to_delete_table: + # remove the purge from the list 24 hours after it completes + async def clear_purge() -> None: + await self.store.delete_room_to_delete(room_id, purge_id) - Args: - purge_id: purge_id returned by start_purge_history - """ - return self._purges_by_id.get(purge_id) + self.hs.get_reactor().callLater( + PaginationHandler.CLEAR_PURGE_AFTER_MS / 1000, clear_purge + ) - def get_delete_status(self, delete_id: str) -> Optional[DeleteStatus]: + async def get_delete_status(self, delete_id: str) -> Optional[DeleteStatus]: """Get the current status of an active deleting Args: delete_id: delete_id returned by start_shutdown_and_purge_room + or start_purge_history. """ - return self._delete_by_id.get(delete_id) - - def get_delete_ids_by_room(self, room_id: str) -> Optional[StrCollection]: - """Get all active delete ids by room + res = await self.store.get_room_to_delete(delete_id) + if res: + status = DeleteStatus() + status.delete_id = res["delete_id"] + status.action = res["action"] + status.status = res["status"] + if status.action == DeleteStatus.ACTION_SHUTDOWN and res["response"]: + status.shutdown_room = json.loads(res["response"]) + return status + return None + + async def get_delete_statuses_by_room(self, room_id: str) -> List[DeleteStatus]: + """Get all active delete statuses by room Args: room_id: room_id that is deleted """ - return self._delete_by_room.get(room_id) + res = await self.store.get_rooms_to_delete(room_id) + statuses = [] + for r in res: + status = DeleteStatus() + status.delete_id = r["delete_id"] + status.action = r["action"] + status.status = r["status"] + if status.action == DeleteStatus.ACTION_SHUTDOWN and r["response"]: + status.shutdown_room = json.loads(r["response"]) + statuses.append(status) + return statuses async def purge_room( self, @@ -455,6 +471,10 @@ class PaginationHandler: """ logger.info("starting purge room_id=%s force=%s", room_id, force) + action = DeleteStatus.ACTION_PURGE + if shutdown_response: + action = DeleteStatus.ACTION_SHUTDOWN + async with self.pagination_lock.write(room_id): # first check that we have no users in this room if not force: @@ -462,21 +482,23 @@ class PaginationHandler: if joined: raise SynapseError(400, "Users are still joined to this room") - await self.store.upsert_room_to_purge( + await self.store.upsert_room_to_delete( room_id, delete_id, + action, DeleteStatus.STATUS_PURGING, - shutdown_response=json.dumps(shutdown_response), + response=json.dumps(shutdown_response), ) await self._storage_controllers.purge_events.purge_room(room_id) - await self.store.upsert_room_to_purge( + await self.store.upsert_room_to_delete( room_id, delete_id, + action, DeleteStatus.STATUS_COMPLETE, timestamp=self.clock.time_msec(), - shutdown_response=json.dumps(shutdown_response), + response=json.dumps(shutdown_response), ) logger.info("purge complete for room_id %s", room_id) @@ -691,44 +713,41 @@ class PaginationHandler: self._purges_in_progress_by_room.add(room_id) try: - self._delete_by_id[delete_id].status = DeleteStatus.STATUS_SHUTTING_DOWN shutdown_response = await self._room_shutdown_handler.shutdown_room( room_id=room_id, delete_id=delete_id, shutdown_params=shutdown_params, shutdown_response=shutdown_response, ) - self._delete_by_id[delete_id].shutdown_room = shutdown_response if shutdown_params["purge"]: - self._delete_by_id[delete_id].status = DeleteStatus.STATUS_PURGING await self.purge_room( room_id, delete_id, shutdown_params["force_purge"], - shutdown_response=self._delete_by_id[delete_id].shutdown_room, + shutdown_response=shutdown_response, ) - await self.store.upsert_room_to_purge( + await self.store.upsert_room_to_delete( room_id, delete_id, + DeleteStatus.ACTION_SHUTDOWN, DeleteStatus.STATUS_COMPLETE, timestamp=self.clock.time_msec(), - shutdown_response=json.dumps(shutdown_response), + response=json.dumps(shutdown_response), ) - self._delete_by_id[delete_id].status = DeleteStatus.STATUS_COMPLETE except Exception: f = Failure() logger.error( "failed", exc_info=(f.type, f.value, f.getTracebackObject()), ) - self._delete_by_id[delete_id].status = DeleteStatus.STATUS_FAILED - self._delete_by_id[delete_id].error = f.getErrorMessage() - await self.store.upsert_room_to_purge( + await self.store.upsert_room_to_delete( room_id, delete_id, + DeleteStatus.ACTION_SHUTDOWN, DeleteStatus.STATUS_FAILED, + timestamp=self.clock.time_msec(), error=f.getErrorMessage(), ) finally: @@ -749,9 +768,7 @@ class PaginationHandler: unique ID for this delete transaction. """ if room_id in self._purges_in_progress_by_room: - raise SynapseError( - 400, "History purge already in progress for %s" % (room_id,) - ) + raise SynapseError(400, "Purge already in progress for %s" % (room_id,)) # This check is double to `RoomShutdownHandler.shutdown_room` # But here the requester get a direct response / error with HTTP request @@ -773,8 +790,6 @@ class PaginationHandler: delete_id, ) - self._delete_by_id[delete_id] = DeleteStatus() - self._delete_by_room.setdefault(room_id, []).append(delete_id) run_as_background_process( "shutdown_and_purge_room", self._shutdown_and_purge_room, diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 0f04038068..27e3d1d739 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -1815,13 +1815,21 @@ class DeleteStatus: return by get_delete_status. """ + ACTION_SHUTDOWN = "shutdown" + ACTION_PURGE = "purge" + ACTION_PURGE_HISTORY = "purge_history" + + # Scheduled delete waiting to be launch at a specific time + STATUS_SCHEDULED = "scheduled" STATUS_SHUTTING_DOWN = "shutting_down" - # Scheduled purge waiting to be launch at a specific time - STATUS_SCHEDULED_PURGE = "scheduled_purge" STATUS_PURGING = "purging" STATUS_COMPLETE = "complete" STATUS_FAILED = "failed" + delete_id: str = "" + + action: str = ACTION_PURGE + # Tracks whether this request has completed. # One of STATUS_{PURGING,COMPLETE,FAILED,SHUTTING_DOWN,WAIT_PURGE}. status: str = STATUS_PURGING @@ -1839,6 +1847,7 @@ class DeleteStatus: def asdict(self) -> JsonDict: ret = { + "delete_id": self.delete_id, "status": self.status, "shutdown_room": self.shutdown_room, } @@ -1925,12 +1934,13 @@ class RoomShutdownHandler: "new_room_id": None, } - await self.store.upsert_room_to_purge( + await self.store.upsert_room_to_delete( room_id, delete_id, + DeleteStatus.ACTION_SHUTDOWN, DeleteStatus.STATUS_SHUTTING_DOWN, - shutdown_params=json.dumps(shutdown_params), - shutdown_response=json.dumps(shutdown_response), + params=json.dumps(shutdown_params), + response=json.dumps(shutdown_response), ) # Action the block first (even if the room doesn't exist yet) @@ -1965,12 +1975,13 @@ class RoomShutdownHandler: ) shutdown_response["new_room_id"] = new_room_id - await self.store.upsert_room_to_purge( + await self.store.upsert_room_to_delete( room_id, delete_id, + DeleteStatus.ACTION_SHUTDOWN, DeleteStatus.STATUS_SHUTTING_DOWN, - shutdown_params=json.dumps(shutdown_params), - shutdown_response=json.dumps(shutdown_response), + params=json.dumps(shutdown_params), + response=json.dumps(shutdown_response), ) logger.info( @@ -2017,7 +2028,9 @@ class RoomShutdownHandler: stream_id, ) - await self.room_member_handler.forget(target_requester.user, room_id) + await self.room_member_handler.forget( + target_requester.user, room_id, do_not_schedule_purge=True + ) # Join users to new room if new_room_user_id: @@ -2033,24 +2046,26 @@ class RoomShutdownHandler: ) shutdown_response["kicked_users"].append(user_id) - await self.store.upsert_room_to_purge( + await self.store.upsert_room_to_delete( room_id, delete_id, + DeleteStatus.ACTION_SHUTDOWN, DeleteStatus.STATUS_SHUTTING_DOWN, - shutdown_params=json.dumps(shutdown_params), - shutdown_response=json.dumps(shutdown_response), + params=json.dumps(shutdown_params), + response=json.dumps(shutdown_response), ) except Exception: logger.exception( "Failed to leave old room and join new room for %r", user_id ) shutdown_response["failed_to_kick_users"].append(user_id) - await self.store.upsert_room_to_purge( + await self.store.upsert_room_to_delete( room_id, delete_id, + DeleteStatus.ACTION_SHUTDOWN, DeleteStatus.STATUS_SHUTTING_DOWN, - shutdown_params=json.dumps(shutdown_params), - shutdown_response=json.dumps(shutdown_response), + params=json.dumps(shutdown_params), + response=json.dumps(shutdown_response), ) # Send message in new room and move aliases diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 27f1e3a615..170c071b50 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -289,7 +289,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): """ raise NotImplementedError() - async def forget(self, user: UserID, room_id: str) -> None: + async def forget( + self, user: UserID, room_id: str, do_not_schedule_purge: bool = False + ) -> None: user_id = user.to_string() member = await self._storage_controllers.state.get_current_state_event( @@ -311,14 +313,17 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): # If everyone locally has left the room, then there is no reason for us to keep the # room around and we automatically purge room after a little bit - if self._purge_retention_period and await self.store.is_locally_forgotten_room( - room_id + if ( + not do_not_schedule_purge + and self._purge_retention_period + and await self.store.is_locally_forgotten_room(room_id) ): delete_id = random_string(16) - await self.store.upsert_room_to_purge( + await self.store.upsert_room_to_delete( room_id, delete_id, - DeleteStatus.STATUS_SCHEDULED_PURGE, + DeleteStatus.ACTION_PURGE, + DeleteStatus.STATUS_SCHEDULED, timestamp=self.clock.time_msec() + self._purge_retention_period, ) diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py index fe8177ed4d..dbdcef7ae6 100644 --- a/synapse/rest/admin/__init__.py +++ b/synapse/rest/admin/__init__.py @@ -196,7 +196,7 @@ class PurgeHistoryRestServlet(RestServlet): errcode=Codes.BAD_JSON, ) - purge_id = self.pagination_handler.start_purge_history( + purge_id = await self.pagination_handler.start_purge_history( room_id, token, delete_local_events=delete_local_events ) @@ -215,10 +215,11 @@ class PurgeHistoryStatusRestServlet(RestServlet): ) -> Tuple[int, JsonDict]: await assert_requester_is_admin(self.auth, request) - purge_status = self.pagination_handler.get_purge_status(purge_id) + purge_status = await self.pagination_handler.get_delete_status(purge_id) if purge_status is None: raise NotFoundError("purge id '%s' not found" % purge_id) + # TODO active vs purging etc return HTTPStatus.OK, purge_status.asdict() diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py index 5e4b0b4759..ed54c8bb64 100644 --- a/synapse/rest/admin/rooms.py +++ b/synapse/rest/admin/rooms.py @@ -154,20 +154,14 @@ class DeleteRoomStatusByRoomIdRestServlet(RestServlet): HTTPStatus.BAD_REQUEST, "%s is not a legal room ID" % (room_id,) ) - delete_ids = self._pagination_handler.get_delete_ids_by_room(room_id) - if delete_ids is None: - delete_ids = [] + delete_statuses = await self._pagination_handler.get_delete_statuses_by_room( + room_id + ) response = [] - for delete_id in delete_ids: - delete = self._pagination_handler.get_delete_status(delete_id) - if delete and delete.status != DeleteStatus.STATUS_SCHEDULED_PURGE: - response += [ - { - "delete_id": delete_id, - **delete.asdict(), - } - ] + for delete_status in delete_statuses: + if delete_status.status != DeleteStatus.STATUS_SCHEDULED: + response += [delete_status.asdict()] if response: return HTTPStatus.OK, {"results": cast(JsonDict, response)} @@ -189,7 +183,7 @@ class DeleteRoomStatusByDeleteIdRestServlet(RestServlet): ) -> Tuple[int, JsonDict]: await assert_requester_is_admin(self._auth, request) - delete_status = self._pagination_handler.get_delete_status(delete_id) + delete_status = await self._pagination_handler.get_delete_status(delete_id) if delete_status is None: raise NotFoundError("delete id '%s' not found" % delete_id) diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index a4d806828f..2c0b573d4c 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -1284,47 +1284,48 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): # If any rows still exist it means someone has not forgotten this room yet return not rows[0][0] - async def upsert_room_to_purge( + async def upsert_room_to_delete( self, room_id: str, delete_id: str, + action: str, status: str, - error: Optional[str] = None, timestamp: Optional[int] = None, - shutdown_params: Optional[str] = None, - shutdown_response: Optional[str] = None, + params: Optional[str] = None, + response: Optional[str] = None, + error: Optional[str] = None, ) -> None: """Insert or update a room to shutdown/purge. Args: room_id: The room ID to shutdown/purge delete_id: The delete ID identifying this action + action: the type of job, mainly `shutdown` `purge` or `purge_history` status: Current status of the delete. Cf `DeleteStatus` for possible values - error: Error message to return, if any timestamp: Time of the last update. If status is `wait_purge`, then it specifies when to do the purge, with an empty value specifying ASAP - shutdown_params: JSON representation of shutdown parameters, cf `ShutdownRoomParams` - shutdown_response: JSON representation of shutdown current status, cf `ShutdownRoomResponse` + error: Error message to return, if any + params: JSON representation of delete job parameters + response: JSON representation of delete current status """ await self.db_pool.simple_upsert( - "rooms_to_purge", + "rooms_to_delete", { "room_id": room_id, "delete_id": delete_id, }, { - "room_id": room_id, - "delete_id": delete_id, + "action": action, "status": status, - "error": error, "timestamp": timestamp, - "shutdown_params": shutdown_params, - "shutdown_response": shutdown_response, + "params": params, + "response": response, + "error": error, }, - desc="upsert_room_to_purge", + desc="upsert_room_to_delete", ) - async def delete_room_to_purge(self, room_id: str, delete_id: str) -> None: + async def delete_room_to_delete(self, room_id: str, delete_id: str) -> None: """Remove a room from the list of rooms to purge. Args: @@ -1333,32 +1334,61 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): """ await self.db_pool.simple_delete( - "rooms_to_purge", + "rooms_to_delete", keyvalues={ "room_id": room_id, "delete_id": delete_id, }, - desc="delete_room_to_purge", + desc="delete_room_to_delete", ) - async def get_rooms_to_purge(self) -> List[Dict[str, Any]]: - """Returns all rooms to shutdown/purge. This includes those that have - been interrupted by a stop/restart of synapse, but also scheduled ones + async def get_rooms_to_delete( + self, room_id: Optional[str] = None + ) -> List[Dict[str, Any]]: + """Returns all delete jobs. This includes those that have been + interrupted by a stop/restart of synapse, but also scheduled ones like locally forgotten rooms. + + Args: + room_id: if specified, will only return the delete jobs for a specific room + """ + keyvalues = {} + if room_id is not None: + keyvalues["room_id"] = room_id + return await self.db_pool.simple_select_list( - table="rooms_to_purge", - keyvalues={}, + table="rooms_to_delete", + keyvalues=keyvalues, retcols=( "room_id", "delete_id", + "action", + "status", "timestamp", + "params", + "response", + "error", + ), + desc="rooms_to_delete_fetch", + ) + + async def get_room_to_delete(self, delete_id: str) -> Optional[Dict[str, Any]]: + """Return the delete job identified by delete_id.""" + return await self.db_pool.simple_select_one( + table="rooms_to_delete", + keyvalues={"delete_id": delete_id}, + retcols=( + "room_id", + "delete_id", + "action", "status", + "timestamp", + "params", + "response", "error", - "shutdown_params", - "shutdown_response", ), - desc="rooms_to_purge_fetch", + desc="rooms_to_delete_fetch", ) async def get_rooms_user_has_been_in(self, user_id: str) -> Set[str]: diff --git a/synapse/storage/schema/main/delta/78/03_rooms_to_purge.sql b/synapse/storage/schema/main/delta/78/03_rooms_to_delete.sql index ef541e11f4..8f9c8c7010 100644 --- a/synapse/storage/schema/main/delta/78/03_rooms_to_purge.sql +++ b/synapse/storage/schema/main/delta/78/03_rooms_to_delete.sql @@ -13,14 +13,15 @@ * limitations under the License. */ --- cf upsert_room_to_purge docstring for the meaning of the fields. -CREATE TABLE IF NOT EXISTS rooms_to_purge( +-- cf upsert_room_to_delete docstring for the meaning of the fields. +CREATE TABLE IF NOT EXISTS rooms_to_delete( room_id text NOT NULL, delete_id text NOT NULL, + action text NOT NULL, status text NOT NULL, - error text, timestamp bigint, - shutdown_params text, - shutdown_response text, + params text, + response text, + error text, UNIQUE(room_id, delete_id) ); |