diff options
author | Mathieu Velten <mathieuv@matrix.org> | 2023-05-12 15:43:45 +0200 |
---|---|---|
committer | Mathieu Velten <mathieuv@matrix.org> | 2023-05-12 15:52:41 +0200 |
commit | d05bc56a2110aa0b67371c88b744b2b2bd293bc9 (patch) | |
tree | b2fe4baf6da580bb9cea20690f46802c54d8220f /synapse | |
parent | Save shutdown and purge state in DB (diff) | |
download | synapse-d05bc56a2110aa0b67371c88b744b2b2bd293bc9.tar.xz |
Restore purge and shutdown from DB on startup
It will also launch scheduled purge (`wait_purge` status) hourly
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/handlers/pagination.py | 93 |
1 files changed, 81 insertions, 12 deletions
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index 2ddada8a56..64792ff15a 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -24,7 +24,7 @@ from synapse.api.constants import Direction, EventTypes, Membership from synapse.api.errors import SynapseError from synapse.api.filtering import Filter from synapse.events.utils import SerializeEventConfig -from synapse.handlers.room import ShutdownRoomResponse +from synapse.handlers.room import DeleteStatus, ShutdownRoomParams, ShutdownRoomResponse from synapse.logging.opentracing import trace from synapse.metrics.background_process_metrics import run_as_background_process from synapse.rest.admin._base import assert_user_is_admin @@ -83,6 +83,9 @@ class PaginationHandler: # when to remove a completed deletion/purge from the results map CLEAR_PURGE_AFTER_MS = 1000 * 3600 * 24 # 24 hours + # how often to run the purge rooms loop + PURGE_ROOMS_PERIOD = 1000 * 3600 # 1 hour + def __init__(self, hs: "HomeServer"): self.hs = hs self.auth = hs.get_auth() @@ -116,6 +119,7 @@ class PaginationHandler: self._retention_allowed_lifetime_max = ( hs.config.retention.retention_allowed_lifetime_max ) + self._purge_retention_period = hs.config.server.purge_retention_period self._is_master = hs.config.worker.worker_app is None if hs.config.retention.retention_enabled and self._is_master: @@ -132,6 +136,82 @@ class PaginationHandler: job.longest_max_lifetime, ) + if self._is_master: + self.clock.looping_call( + run_as_background_process, + PaginationHandler.PURGE_ROOMS_PERIOD, + "purge_rooms", + self.purge_rooms, + ) + + async def purge_rooms(self) -> None: + """This takes care of restoring unfinished purge/shutdown rooms from the DB. + It also takes care to launch scheduled ones, like rooms that has been fully + forgotten. + + It should be run regularly. + """ + rooms_to_purge = await self.store.get_rooms_to_purge() + for r in rooms_to_purge: + room_id = r["room_id"] + delete_id = r["delete_id"] + status = r["status"] + timestamp = r["timestamp"] + + if ( + status == DeleteStatus.STATUS_COMPLETE + or status == DeleteStatus.STATUS_FAILED + ): + # remove the delete from the list 24 hours after it completes or fails + time_since_completed = self.clock.time_msec() - timestamp + if time_since_completed >= PaginationHandler.CLEAR_PURGE_AFTER_MS: + await self.store.delete_room_to_purge(room_id, delete_id) + + del self._delete_by_id[delete_id] + self._delete_by_room[room_id].remove(delete_id) + if not self._delete_by_room[room_id]: + del self._delete_by_room[room_id] + + continue + + delete_status = self._delete_by_id.get(delete_id) + if delete_status is not None: + # a delete background task is already running (or has run) + # for this delete id, let's ignore it + continue + + self._delete_by_id[delete_id] = DeleteStatus() + self._delete_by_id[delete_id].status = status + self._delete_by_room.setdefault(room_id, []).append(delete_id) + + # restore a shutdown from the DB + # also take care of purging if needed + shutdown_response = None + if status == DeleteStatus.STATUS_SHUTTING_DOWN: + shutdown_params = json.loads(r["shutdown_params"]) + if r["shutdown_response"]: + shutdown_response = json.loads(r["shutdown_response"]) + await self._shutdown_and_purge_room( + room_id, + delete_id, + shutdown_params=shutdown_params, + shutdown_response=shutdown_response, + ) + continue + + # launch a purge from the DB + # it may be an interrupted purge or a scheduled one + purge_now = True if status == DeleteStatus.STATUS_PURGING else False + if status == DeleteStatus.STATUS_WAIT_PURGE: + if timestamp is None or self.clock.time_msec() >= timestamp: + purge_now = True + + # TODO 2 stages purge, keep memberships for a while so we don't "break" sync + if purge_now: + await self.purge_room( + room_id, delete_id, True, shutdown_response=shutdown_response + ) + async def purge_history_for_rooms_in_range( self, min_ms: Optional[int], max_ms: Optional[int] ) -> None: @@ -651,17 +731,6 @@ class PaginationHandler: finally: self._purges_in_progress_by_room.discard(room_id) - # remove the delete from the list 24 hours after it completes - def clear_delete() -> None: - del self._delete_by_id[delete_id] - self._delete_by_room[room_id].remove(delete_id) - if not self._delete_by_room[room_id]: - del self._delete_by_room[room_id] - - self.hs.get_reactor().callLater( - PaginationHandler.CLEAR_PURGE_AFTER_MS / 1000, clear_delete - ) - def start_shutdown_and_purge_room( self, room_id: str, |