diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/rest/admin/__init__.py | 6 | ||||
-rw-r--r-- | synapse/rest/admin/background_updates.py | 107 | ||||
-rw-r--r-- | synapse/storage/background_updates.py | 65 | ||||
-rw-r--r-- | synapse/storage/database.py | 4 |
4 files changed, 164 insertions, 18 deletions
diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py index 70514e814f..81e98f81d6 100644 --- a/synapse/rest/admin/__init__.py +++ b/synapse/rest/admin/__init__.py @@ -25,6 +25,10 @@ from synapse.http.server import HttpServer, JsonResource from synapse.http.servlet import RestServlet, parse_json_object_from_request from synapse.http.site import SynapseRequest from synapse.rest.admin._base import admin_patterns, assert_requester_is_admin +from synapse.rest.admin.background_updates import ( + BackgroundUpdateEnabledRestServlet, + BackgroundUpdateRestServlet, +) from synapse.rest.admin.devices import ( DeleteDevicesRestServlet, DeviceRestServlet, @@ -247,6 +251,8 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None: # Some servlets only get registered for the main process. if hs.config.worker.worker_app is None: SendServerNoticeServlet(hs).register(http_server) + BackgroundUpdateEnabledRestServlet(hs).register(http_server) + BackgroundUpdateRestServlet(hs).register(http_server) def register_servlets_for_client_rest_resource( diff --git a/synapse/rest/admin/background_updates.py b/synapse/rest/admin/background_updates.py new file mode 100644 index 0000000000..0d0183bf20 --- /dev/null +++ b/synapse/rest/admin/background_updates.py @@ -0,0 +1,107 @@ +# Copyright 2021 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging +from typing import TYPE_CHECKING, Tuple + +from synapse.api.errors import SynapseError +from synapse.http.servlet import RestServlet, parse_json_object_from_request +from synapse.http.site import SynapseRequest +from synapse.rest.admin._base import admin_patterns, assert_user_is_admin +from synapse.types import JsonDict + +if TYPE_CHECKING: + from synapse.server import HomeServer + +logger = logging.getLogger(__name__) + + +class BackgroundUpdateEnabledRestServlet(RestServlet): + """Allows temporarily disabling background updates""" + + PATTERNS = admin_patterns("/background_updates/enabled") + + def __init__(self, hs: "HomeServer"): + self.group_server = hs.get_groups_server_handler() + self.is_mine_id = hs.is_mine_id + self.auth = hs.get_auth() + + self.data_stores = hs.get_datastores() + + async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]: + requester = await self.auth.get_user_by_req(request) + await assert_user_is_admin(self.auth, requester.user) + + # We need to check that all configured databases have updates enabled. + # (They *should* all be in sync.) + enabled = all(db.updates.enabled for db in self.data_stores.databases) + + return 200, {"enabled": enabled} + + async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]: + requester = await self.auth.get_user_by_req(request) + await assert_user_is_admin(self.auth, requester.user) + + body = parse_json_object_from_request(request) + + enabled = body.get("enabled", True) + + if not isinstance(enabled, bool): + raise SynapseError(400, "'enabled' parameter must be a boolean") + + for db in self.data_stores.databases: + db.updates.enabled = enabled + + # If we're re-enabling them ensure that we start the background + # process again. + if enabled: + db.updates.start_doing_background_updates() + + return 200, {"enabled": enabled} + + +class BackgroundUpdateRestServlet(RestServlet): + """Fetch information about background updates""" + + PATTERNS = admin_patterns("/background_updates/status") + + def __init__(self, hs: "HomeServer"): + self.group_server = hs.get_groups_server_handler() + self.is_mine_id = hs.is_mine_id + self.auth = hs.get_auth() + + self.data_stores = hs.get_datastores() + + async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]: + requester = await self.auth.get_user_by_req(request) + await assert_user_is_admin(self.auth, requester.user) + + # We need to check that all configured databases have updates enabled. + # (They *should* all be in sync.) + enabled = all(db.updates.enabled for db in self.data_stores.databases) + + current_updates = {} + + for db in self.data_stores.databases: + update = db.updates.get_current_update() + if not update: + continue + + current_updates[db.name()] = { + "name": update.name, + "total_item_count": update.total_item_count, + "total_duration_ms": update.total_duration_ms, + "average_items_per_ms": update.average_items_per_ms(), + } + + return 200, {"enabled": enabled, "current_updates": current_updates} diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index 82b31d24f1..b9a8ca997e 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -100,29 +100,58 @@ class BackgroundUpdater: ] = {} self._all_done = False + # Whether we're currently running updates + self._running = False + + # Whether background updates are enabled. This allows us to + # enable/disable background updates via the admin API. + self.enabled = True + + def get_current_update(self) -> Optional[BackgroundUpdatePerformance]: + """Returns the current background update, if any.""" + + update_name = self._current_background_update + if not update_name: + return None + + perf = self._background_update_performance.get(update_name) + if not perf: + perf = BackgroundUpdatePerformance(update_name) + + return perf + def start_doing_background_updates(self) -> None: - run_as_background_process("background_updates", self.run_background_updates) + if self.enabled: + run_as_background_process("background_updates", self.run_background_updates) async def run_background_updates(self, sleep: bool = True) -> None: - logger.info("Starting background schema updates") - while True: - if sleep: - await self._clock.sleep(self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.0) + if self._running or not self.enabled: + return - try: - result = await self.do_next_background_update( - self.BACKGROUND_UPDATE_DURATION_MS - ) - except Exception: - logger.exception("Error doing update") - else: - if result: - logger.info( - "No more background updates to do." - " Unscheduling background update task." + self._running = True + + try: + logger.info("Starting background schema updates") + while self.enabled: + if sleep: + await self._clock.sleep(self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.0) + + try: + result = await self.do_next_background_update( + self.BACKGROUND_UPDATE_DURATION_MS ) - self._all_done = True - return None + except Exception: + logger.exception("Error doing update") + else: + if result: + logger.info( + "No more background updates to do." + " Unscheduling background update task." + ) + self._all_done = True + return None + finally: + self._running = False async def has_completed_background_updates(self) -> bool: """Check if all the background updates have completed diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 5c71e27518..d4cab69ebf 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -446,6 +446,10 @@ class DatabasePool: self._check_safe_to_upsert, ) + def name(self) -> str: + "Return the name of this database" + return self._database_config.name + def is_running(self) -> bool: """Is the database pool currently running""" return self._db_pool.running |