summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorMathieu Velten <mathieuv@matrix.org>2023-05-12 15:43:45 +0200
committerMathieu Velten <mathieuv@matrix.org>2023-05-12 15:52:41 +0200
commitd05bc56a2110aa0b67371c88b744b2b2bd293bc9 (patch)
treeb2fe4baf6da580bb9cea20690f46802c54d8220f /synapse
parentSave shutdown and purge state in DB (diff)
downloadsynapse-d05bc56a2110aa0b67371c88b744b2b2bd293bc9.tar.xz
Restore purge and shutdown from DB on startup
It will also launch scheduled purge (`wait_purge` status) hourly
Diffstat (limited to 'synapse')
-rw-r--r--synapse/handlers/pagination.py93
1 files changed, 81 insertions, 12 deletions
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 2ddada8a56..64792ff15a 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -24,7 +24,7 @@ from synapse.api.constants import Direction, EventTypes, Membership
 from synapse.api.errors import SynapseError
 from synapse.api.filtering import Filter
 from synapse.events.utils import SerializeEventConfig
-from synapse.handlers.room import ShutdownRoomResponse
+from synapse.handlers.room import DeleteStatus, ShutdownRoomParams, ShutdownRoomResponse
 from synapse.logging.opentracing import trace
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.rest.admin._base import assert_user_is_admin
@@ -83,6 +83,9 @@ class PaginationHandler:
     # when to remove a completed deletion/purge from the results map
     CLEAR_PURGE_AFTER_MS = 1000 * 3600 * 24  # 24 hours
 
+    # how often to run the purge rooms loop
+    PURGE_ROOMS_PERIOD = 1000 * 3600  # 1 hour
+
     def __init__(self, hs: "HomeServer"):
         self.hs = hs
         self.auth = hs.get_auth()
@@ -116,6 +119,7 @@ class PaginationHandler:
         self._retention_allowed_lifetime_max = (
             hs.config.retention.retention_allowed_lifetime_max
         )
+        self._purge_retention_period = hs.config.server.purge_retention_period
         self._is_master = hs.config.worker.worker_app is None
 
         if hs.config.retention.retention_enabled and self._is_master:
@@ -132,6 +136,82 @@ class PaginationHandler:
                     job.longest_max_lifetime,
                 )
 
+        if self._is_master:
+            self.clock.looping_call(
+                run_as_background_process,
+                PaginationHandler.PURGE_ROOMS_PERIOD,
+                "purge_rooms",
+                self.purge_rooms,
+            )
+
+    async def purge_rooms(self) -> None:
+        """This takes care of restoring unfinished purge/shutdown rooms from the DB.
+        It also takes care to launch scheduled ones, like rooms that has been fully
+        forgotten.
+
+        It should be run regularly.
+        """
+        rooms_to_purge = await self.store.get_rooms_to_purge()
+        for r in rooms_to_purge:
+            room_id = r["room_id"]
+            delete_id = r["delete_id"]
+            status = r["status"]
+            timestamp = r["timestamp"]
+
+            if (
+                status == DeleteStatus.STATUS_COMPLETE
+                or status == DeleteStatus.STATUS_FAILED
+            ):
+                # remove the delete from the list 24 hours after it completes or fails
+                time_since_completed = self.clock.time_msec() - timestamp
+                if time_since_completed >= PaginationHandler.CLEAR_PURGE_AFTER_MS:
+                    await self.store.delete_room_to_purge(room_id, delete_id)
+
+                    del self._delete_by_id[delete_id]
+                    self._delete_by_room[room_id].remove(delete_id)
+                    if not self._delete_by_room[room_id]:
+                        del self._delete_by_room[room_id]
+
+                continue
+
+            delete_status = self._delete_by_id.get(delete_id)
+            if delete_status is not None:
+                # a delete background task is already running (or has run)
+                # for this delete id, let's ignore it
+                continue
+
+            self._delete_by_id[delete_id] = DeleteStatus()
+            self._delete_by_id[delete_id].status = status
+            self._delete_by_room.setdefault(room_id, []).append(delete_id)
+
+            # restore a shutdown from the DB
+            # also take care of purging if needed
+            shutdown_response = None
+            if status == DeleteStatus.STATUS_SHUTTING_DOWN:
+                shutdown_params = json.loads(r["shutdown_params"])
+                if r["shutdown_response"]:
+                    shutdown_response = json.loads(r["shutdown_response"])
+                await self._shutdown_and_purge_room(
+                    room_id,
+                    delete_id,
+                    shutdown_params=shutdown_params,
+                    shutdown_response=shutdown_response,
+                )
+                continue
+
+            # launch a purge from the DB
+            # it may be an interrupted purge or a scheduled one
+            purge_now = True if status == DeleteStatus.STATUS_PURGING else False
+            if status == DeleteStatus.STATUS_WAIT_PURGE:
+                if timestamp is None or self.clock.time_msec() >= timestamp:
+                    purge_now = True
+
+            # TODO 2 stages purge, keep memberships for a while so we don't "break" sync
+            if purge_now:
+                await self.purge_room(
+                    room_id, delete_id, True, shutdown_response=shutdown_response
+                )
+
     async def purge_history_for_rooms_in_range(
         self, min_ms: Optional[int], max_ms: Optional[int]
     ) -> None:
@@ -651,17 +731,6 @@ class PaginationHandler:
         finally:
             self._purges_in_progress_by_room.discard(room_id)
 
-            # remove the delete from the list 24 hours after it completes
-            def clear_delete() -> None:
-                del self._delete_by_id[delete_id]
-                self._delete_by_room[room_id].remove(delete_id)
-                if not self._delete_by_room[room_id]:
-                    del self._delete_by_room[room_id]
-
-            self.hs.get_reactor().callLater(
-                PaginationHandler.CLEAR_PURGE_AFTER_MS / 1000, clear_delete
-            )
-
     def start_shutdown_and_purge_room(
         self,
         room_id: str,