From ea20937084903864865f76e22f67d27729f2d6dc Mon Sep 17 00:00:00 2001 From: Dirk Klimpel <5740567+dklimpel@users.noreply.github.com> Date: Fri, 19 Nov 2021 20:39:46 +0100 Subject: Add an admin API to run background jobs. (#11352) Instead of having admins poke into the database directly. Can currently run jobs to populate stats and to populate the user directory. --- synapse/rest/admin/background_updates.py | 123 ++++++++++++++++++++++++------- 1 file changed, 96 insertions(+), 27 deletions(-) (limited to 'synapse/rest/admin/background_updates.py') diff --git a/synapse/rest/admin/background_updates.py b/synapse/rest/admin/background_updates.py index 0d0183bf20..479672d4d5 100644 --- a/synapse/rest/admin/background_updates.py +++ b/synapse/rest/admin/background_updates.py @@ -12,10 +12,15 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging +from http import HTTPStatus from typing import TYPE_CHECKING, Tuple from synapse.api.errors import SynapseError -from synapse.http.servlet import RestServlet, parse_json_object_from_request +from synapse.http.servlet import ( + RestServlet, + assert_params_in_dict, + parse_json_object_from_request, +) from synapse.http.site import SynapseRequest from synapse.rest.admin._base import admin_patterns, assert_user_is_admin from synapse.types import JsonDict @@ -29,37 +34,36 @@ logger = logging.getLogger(__name__) class BackgroundUpdateEnabledRestServlet(RestServlet): """Allows temporarily disabling background updates""" - PATTERNS = admin_patterns("/background_updates/enabled") + PATTERNS = admin_patterns("/background_updates/enabled$") def __init__(self, hs: "HomeServer"): - self.group_server = hs.get_groups_server_handler() - self.is_mine_id = hs.is_mine_id - self.auth = hs.get_auth() - - self.data_stores = hs.get_datastores() + self._auth = hs.get_auth() + self._data_stores = hs.get_datastores() async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]: - requester = await self.auth.get_user_by_req(request) - await assert_user_is_admin(self.auth, requester.user) + requester = await self._auth.get_user_by_req(request) + await assert_user_is_admin(self._auth, requester.user) # We need to check that all configured databases have updates enabled. # (They *should* all be in sync.) - enabled = all(db.updates.enabled for db in self.data_stores.databases) + enabled = all(db.updates.enabled for db in self._data_stores.databases) - return 200, {"enabled": enabled} + return HTTPStatus.OK, {"enabled": enabled} async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]: - requester = await self.auth.get_user_by_req(request) - await assert_user_is_admin(self.auth, requester.user) + requester = await self._auth.get_user_by_req(request) + await assert_user_is_admin(self._auth, requester.user) body = parse_json_object_from_request(request) enabled = body.get("enabled", True) if not isinstance(enabled, bool): - raise SynapseError(400, "'enabled' parameter must be a boolean") + raise SynapseError( + HTTPStatus.BAD_REQUEST, "'enabled' parameter must be a boolean" + ) - for db in self.data_stores.databases: + for db in self._data_stores.databases: db.updates.enabled = enabled # If we're re-enabling them ensure that we start the background @@ -67,32 +71,29 @@ class BackgroundUpdateEnabledRestServlet(RestServlet): if enabled: db.updates.start_doing_background_updates() - return 200, {"enabled": enabled} + return HTTPStatus.OK, {"enabled": enabled} class BackgroundUpdateRestServlet(RestServlet): """Fetch information about background updates""" - PATTERNS = admin_patterns("/background_updates/status") + PATTERNS = admin_patterns("/background_updates/status$") def __init__(self, hs: "HomeServer"): - self.group_server = hs.get_groups_server_handler() - self.is_mine_id = hs.is_mine_id - self.auth = hs.get_auth() - - self.data_stores = hs.get_datastores() + self._auth = hs.get_auth() + self._data_stores = hs.get_datastores() async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]: - requester = await self.auth.get_user_by_req(request) - await assert_user_is_admin(self.auth, requester.user) + requester = await self._auth.get_user_by_req(request) + await assert_user_is_admin(self._auth, requester.user) # We need to check that all configured databases have updates enabled. # (They *should* all be in sync.) - enabled = all(db.updates.enabled for db in self.data_stores.databases) + enabled = all(db.updates.enabled for db in self._data_stores.databases) current_updates = {} - for db in self.data_stores.databases: + for db in self._data_stores.databases: update = db.updates.get_current_update() if not update: continue @@ -104,4 +105,72 @@ class BackgroundUpdateRestServlet(RestServlet): "average_items_per_ms": update.average_items_per_ms(), } - return 200, {"enabled": enabled, "current_updates": current_updates} + return HTTPStatus.OK, {"enabled": enabled, "current_updates": current_updates} + + +class BackgroundUpdateStartJobRestServlet(RestServlet): + """Allows to start specific background updates""" + + PATTERNS = admin_patterns("/background_updates/start_job") + + def __init__(self, hs: "HomeServer"): + self._auth = hs.get_auth() + self._store = hs.get_datastore() + + async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]: + requester = await self._auth.get_user_by_req(request) + await assert_user_is_admin(self._auth, requester.user) + + body = parse_json_object_from_request(request) + assert_params_in_dict(body, ["job_name"]) + + job_name = body["job_name"] + + if job_name == "populate_stats_process_rooms": + jobs = [ + { + "update_name": "populate_stats_process_rooms", + "progress_json": "{}", + }, + ] + elif job_name == "regenerate_directory": + jobs = [ + { + "update_name": "populate_user_directory_createtables", + "progress_json": "{}", + "depends_on": "", + }, + { + "update_name": "populate_user_directory_process_rooms", + "progress_json": "{}", + "depends_on": "populate_user_directory_createtables", + }, + { + "update_name": "populate_user_directory_process_users", + "progress_json": "{}", + "depends_on": "populate_user_directory_process_rooms", + }, + { + "update_name": "populate_user_directory_cleanup", + "progress_json": "{}", + "depends_on": "populate_user_directory_process_users", + }, + ] + else: + raise SynapseError(HTTPStatus.BAD_REQUEST, "Invalid job_name") + + try: + await self._store.db_pool.simple_insert_many( + table="background_updates", + values=jobs, + desc=f"admin_api_run_{job_name}", + ) + except self._store.db_pool.engine.module.IntegrityError: + raise SynapseError( + HTTPStatus.BAD_REQUEST, + "Job %s is already in queue of background updates." % (job_name,), + ) + + self._store.db_pool.updates.start_doing_background_updates() + + return HTTPStatus.OK, {} -- cgit 1.4.1