summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2021-11-08 16:08:02 +0000
committerGitHub <noreply@github.com>2021-11-08 16:08:02 +0000
commit4ee71b96377c39a2b9d060c6aafbce62fb16ccc6 (patch)
tree71a0905f4118953137fed5551393f93585092d56
parentFix typo in comment from #11255. (#11276) (diff)
downloadsynapse-4ee71b96377c39a2b9d060c6aafbce62fb16ccc6.tar.xz
Add some background update admin APIs (#11263)
Fixes #11259
-rw-r--r--changelog.d/11263.feature1
-rw-r--r--docs/SUMMARY.md1
-rw-r--r--docs/usage/administration/admin_api/background_updates.md84
-rw-r--r--synapse/rest/admin/__init__.py6
-rw-r--r--synapse/rest/admin/background_updates.py107
-rw-r--r--synapse/storage/background_updates.py65
-rw-r--r--synapse/storage/database.py4
-rw-r--r--tests/rest/admin/test_background_updates.py218
8 files changed, 468 insertions, 18 deletions
diff --git a/changelog.d/11263.feature b/changelog.d/11263.feature
new file mode 100644
index 0000000000..831e76ec9f
--- /dev/null
+++ b/changelog.d/11263.feature
@@ -0,0 +1 @@
+Add some background update admin APIs.
diff --git a/docs/SUMMARY.md b/docs/SUMMARY.md
index 35412ea92c..04320ab07b 100644
--- a/docs/SUMMARY.md
+++ b/docs/SUMMARY.md
@@ -51,6 +51,7 @@
   - [Administration](usage/administration/README.md)
     - [Admin API](usage/administration/admin_api/README.md)
       - [Account Validity](admin_api/account_validity.md)
+      - [Background Updates](usage/administration/admin_api/background_updates.md)
       - [Delete Group](admin_api/delete_group.md)
       - [Event Reports](admin_api/event_reports.md)
       - [Media](admin_api/media_admin_api.md)
diff --git a/docs/usage/administration/admin_api/background_updates.md b/docs/usage/administration/admin_api/background_updates.md
new file mode 100644
index 0000000000..b36d7fe398
--- /dev/null
+++ b/docs/usage/administration/admin_api/background_updates.md
@@ -0,0 +1,84 @@
+# Background Updates API
+
+This API allows a server administrator to manage the background updates being
+run against the database.
+
+## Status
+
+This API gets the current status of the background updates.
+
+
+The API is:
+
+```
+GET /_synapse/admin/v1/background_updates/status
+```
+
+Returning:
+
+```json
+{
+    "enabled": true,
+    "current_updates": {
+        "<db_name>": {
+            "name": "<background_update_name>",
+            "total_item_count": 50,
+            "total_duration_ms": 10000.0,
+            "average_items_per_ms": 2.2,
+        },
+    }
+}
+```
+
+`enabled` whether the background updates are enabled or disabled.
+
+`db_name` the database name (usually Synapse is configured with a single database named 'master').
+
+For each update:
+
+`name` the name of the update.
+`total_item_count` total number of "items" processed (the meaning of 'items' depends on the update in question).
+`total_duration_ms` how long the background process has been running, not including time spent sleeping.
+`average_items_per_ms` how many items are processed per millisecond based on an exponential average.
+
+
+
+## Enabled
+
+This API allow pausing background updates.
+
+Background updates should *not* be paused for significant periods of time, as
+this can affect the performance of Synapse.
+
+*Note*: This won't persist over restarts.
+
+*Note*: This won't cancel any update query that is currently running. This is
+usually fine since most queries are short lived, except for `CREATE INDEX`
+background updates which won't be cancelled once started.
+
+
+The API is:
+
+```
+POST /_synapse/admin/v1/background_updates/enabled
+```
+
+with the following body:
+
+```json
+{
+    "enabled": false
+}
+```
+
+`enabled` sets whether the background updates are enabled or disabled.
+
+The API returns the `enabled` param.
+
+```json
+{
+    "enabled": false
+}
+```
+
+There is also a `GET` version which returns the `enabled` state.
diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py
index 70514e814f..81e98f81d6 100644
--- a/synapse/rest/admin/__init__.py
+++ b/synapse/rest/admin/__init__.py
@@ -25,6 +25,10 @@ from synapse.http.server import HttpServer, JsonResource
 from synapse.http.servlet import RestServlet, parse_json_object_from_request
 from synapse.http.site import SynapseRequest
 from synapse.rest.admin._base import admin_patterns, assert_requester_is_admin
+from synapse.rest.admin.background_updates import (
+    BackgroundUpdateEnabledRestServlet,
+    BackgroundUpdateRestServlet,
+)
 from synapse.rest.admin.devices import (
     DeleteDevicesRestServlet,
     DeviceRestServlet,
@@ -247,6 +251,8 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
     # Some servlets only get registered for the main process.
     if hs.config.worker.worker_app is None:
         SendServerNoticeServlet(hs).register(http_server)
+        BackgroundUpdateEnabledRestServlet(hs).register(http_server)
+        BackgroundUpdateRestServlet(hs).register(http_server)
 
 
 def register_servlets_for_client_rest_resource(
diff --git a/synapse/rest/admin/background_updates.py b/synapse/rest/admin/background_updates.py
new file mode 100644
index 0000000000..0d0183bf20
--- /dev/null
+++ b/synapse/rest/admin/background_updates.py
@@ -0,0 +1,107 @@
+# Copyright 2021 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+from typing import TYPE_CHECKING, Tuple
+
+from synapse.api.errors import SynapseError
+from synapse.http.servlet import RestServlet, parse_json_object_from_request
+from synapse.http.site import SynapseRequest
+from synapse.rest.admin._base import admin_patterns, assert_user_is_admin
+from synapse.types import JsonDict
+
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
+logger = logging.getLogger(__name__)
+
+
+class BackgroundUpdateEnabledRestServlet(RestServlet):
+    """Allows temporarily disabling background updates"""
+
+    PATTERNS = admin_patterns("/background_updates/enabled")
+
+    def __init__(self, hs: "HomeServer"):
+        self.group_server = hs.get_groups_server_handler()
+        self.is_mine_id = hs.is_mine_id
+        self.auth = hs.get_auth()
+
+        self.data_stores = hs.get_datastores()
+
+    async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
+        requester = await self.auth.get_user_by_req(request)
+        await assert_user_is_admin(self.auth, requester.user)
+
+        # We need to check that all configured databases have updates enabled.
+        # (They *should* all be in sync.)
+        enabled = all(db.updates.enabled for db in self.data_stores.databases)
+
+        return 200, {"enabled": enabled}
+
+    async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
+        requester = await self.auth.get_user_by_req(request)
+        await assert_user_is_admin(self.auth, requester.user)
+
+        body = parse_json_object_from_request(request)
+
+        enabled = body.get("enabled", True)
+
+        if not isinstance(enabled, bool):
+            raise SynapseError(400, "'enabled' parameter must be a boolean")
+
+        for db in self.data_stores.databases:
+            db.updates.enabled = enabled
+
+            # If we're re-enabling them ensure that we start the background
+            # process again.
+            if enabled:
+                db.updates.start_doing_background_updates()
+
+        return 200, {"enabled": enabled}
+
+
+class BackgroundUpdateRestServlet(RestServlet):
+    """Fetch information about background updates"""
+
+    PATTERNS = admin_patterns("/background_updates/status")
+
+    def __init__(self, hs: "HomeServer"):
+        self.group_server = hs.get_groups_server_handler()
+        self.is_mine_id = hs.is_mine_id
+        self.auth = hs.get_auth()
+
+        self.data_stores = hs.get_datastores()
+
+    async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
+        requester = await self.auth.get_user_by_req(request)
+        await assert_user_is_admin(self.auth, requester.user)
+
+        # We need to check that all configured databases have updates enabled.
+        # (They *should* all be in sync.)
+        enabled = all(db.updates.enabled for db in self.data_stores.databases)
+
+        current_updates = {}
+
+        for db in self.data_stores.databases:
+            update = db.updates.get_current_update()
+            if not update:
+                continue
+
+            current_updates[db.name()] = {
+                "name": update.name,
+                "total_item_count": update.total_item_count,
+                "total_duration_ms": update.total_duration_ms,
+                "average_items_per_ms": update.average_items_per_ms(),
+            }
+
+        return 200, {"enabled": enabled, "current_updates": current_updates}
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index 82b31d24f1..b9a8ca997e 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -100,29 +100,58 @@ class BackgroundUpdater:
         ] = {}
         self._all_done = False
 
+        # Whether we're currently running updates
+        self._running = False
+
+        # Whether background updates are enabled. This allows us to
+        # enable/disable background updates via the admin API.
+        self.enabled = True
+
+    def get_current_update(self) -> Optional[BackgroundUpdatePerformance]:
+        """Returns the current background update, if any."""
+
+        update_name = self._current_background_update
+        if not update_name:
+            return None
+
+        perf = self._background_update_performance.get(update_name)
+        if not perf:
+            perf = BackgroundUpdatePerformance(update_name)
+
+        return perf
+
     def start_doing_background_updates(self) -> None:
-        run_as_background_process("background_updates", self.run_background_updates)
+        if self.enabled:
+            run_as_background_process("background_updates", self.run_background_updates)
 
     async def run_background_updates(self, sleep: bool = True) -> None:
-        logger.info("Starting background schema updates")
-        while True:
-            if sleep:
-                await self._clock.sleep(self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.0)
+        if self._running or not self.enabled:
+            return
 
-            try:
-                result = await self.do_next_background_update(
-                    self.BACKGROUND_UPDATE_DURATION_MS
-                )
-            except Exception:
-                logger.exception("Error doing update")
-            else:
-                if result:
-                    logger.info(
-                        "No more background updates to do."
-                        " Unscheduling background update task."
+        self._running = True
+
+        try:
+            logger.info("Starting background schema updates")
+            while self.enabled:
+                if sleep:
+                    await self._clock.sleep(self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.0)
+
+                try:
+                    result = await self.do_next_background_update(
+                        self.BACKGROUND_UPDATE_DURATION_MS
                     )
-                    self._all_done = True
-                    return None
+                except Exception:
+                    logger.exception("Error doing update")
+                else:
+                    if result:
+                        logger.info(
+                            "No more background updates to do."
+                            " Unscheduling background update task."
+                        )
+                        self._all_done = True
+                        return None
+        finally:
+            self._running = False
 
     async def has_completed_background_updates(self) -> bool:
         """Check if all the background updates have completed
diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index 5c71e27518..d4cab69ebf 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -446,6 +446,10 @@ class DatabasePool:
                 self._check_safe_to_upsert,
             )
 
+    def name(self) -> str:
+        "Return the name of this database"
+        return self._database_config.name
+
     def is_running(self) -> bool:
         """Is the database pool currently running"""
         return self._db_pool.running
diff --git a/tests/rest/admin/test_background_updates.py b/tests/rest/admin/test_background_updates.py
new file mode 100644
index 0000000000..78c48db552
--- /dev/null
+++ b/tests/rest/admin/test_background_updates.py
@@ -0,0 +1,218 @@
+# Copyright 2021 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import synapse.rest.admin
+from synapse.rest.client import login
+from synapse.server import HomeServer
+
+from tests import unittest
+
+
+class BackgroundUpdatesTestCase(unittest.HomeserverTestCase):
+    servlets = [
+        synapse.rest.admin.register_servlets,
+        login.register_servlets,
+    ]
+
+    def prepare(self, reactor, clock, hs: HomeServer):
+        self.store = hs.get_datastore()
+        self.admin_user = self.register_user("admin", "pass", admin=True)
+        self.admin_user_tok = self.login("admin", "pass")
+
+    def _register_bg_update(self):
+        "Adds a bg update but doesn't start it"
+
+        async def _fake_update(progress, batch_size) -> int:
+            await self.clock.sleep(0.2)
+            return batch_size
+
+        self.store.db_pool.updates.register_background_update_handler(
+            "test_update",
+            _fake_update,
+        )
+
+        self.get_success(
+            self.store.db_pool.simple_insert(
+                table="background_updates",
+                values={
+                    "update_name": "test_update",
+                    "progress_json": "{}",
+                },
+            )
+        )
+
+    def test_status_empty(self):
+        """Test the status API works."""
+
+        channel = self.make_request(
+            "GET",
+            "/_synapse/admin/v1/background_updates/status",
+            access_token=self.admin_user_tok,
+        )
+        self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+
+        # Background updates should be enabled, but none should be running.
+        self.assertDictEqual(
+            channel.json_body, {"current_updates": {}, "enabled": True}
+        )
+
+    def test_status_bg_update(self):
+        """Test the status API works with a background update."""
+
+        # Create a new background update
+
+        self._register_bg_update()
+
+        self.store.db_pool.updates.start_doing_background_updates()
+        self.reactor.pump([1.0, 1.0])
+
+        channel = self.make_request(
+            "GET",
+            "/_synapse/admin/v1/background_updates/status",
+            access_token=self.admin_user_tok,
+        )
+        self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+
+        # Background updates should be enabled, and one should be running.
+        self.assertDictEqual(
+            channel.json_body,
+            {
+                "current_updates": {
+                    "master": {
+                        "name": "test_update",
+                        "average_items_per_ms": 0.1,
+                        "total_duration_ms": 1000.0,
+                        "total_item_count": 100,
+                    }
+                },
+                "enabled": True,
+            },
+        )
+
+    def test_enabled(self):
+        """Test the enabled API works."""
+
+        # Create a new background update
+
+        self._register_bg_update()
+        self.store.db_pool.updates.start_doing_background_updates()
+
+        # Test that GET works and returns enabled is True.
+        channel = self.make_request(
+            "GET",
+            "/_synapse/admin/v1/background_updates/enabled",
+            access_token=self.admin_user_tok,
+        )
+        self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+        self.assertDictEqual(channel.json_body, {"enabled": True})
+
+        # Disable the BG updates
+        channel = self.make_request(
+            "POST",
+            "/_synapse/admin/v1/background_updates/enabled",
+            content={"enabled": False},
+            access_token=self.admin_user_tok,
+        )
+        self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+        self.assertDictEqual(channel.json_body, {"enabled": False})
+
+        # Advance a bit and get the current status, note this will finish the in
+        # flight background update so we call it the status API twice and check
+        # there was no change.
+        self.reactor.pump([1.0, 1.0])
+
+        channel = self.make_request(
+            "GET",
+            "/_synapse/admin/v1/background_updates/status",
+            access_token=self.admin_user_tok,
+        )
+        self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+        self.assertDictEqual(
+            channel.json_body,
+            {
+                "current_updates": {
+                    "master": {
+                        "name": "test_update",
+                        "average_items_per_ms": 0.1,
+                        "total_duration_ms": 1000.0,
+                        "total_item_count": 100,
+                    }
+                },
+                "enabled": False,
+            },
+        )
+
+        # Run the reactor for a bit so the BG updates would have a chance to run
+        # if they were to.
+        self.reactor.pump([1.0, 1.0])
+
+        channel = self.make_request(
+            "GET",
+            "/_synapse/admin/v1/background_updates/status",
+            access_token=self.admin_user_tok,
+        )
+        self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+
+        # There should be no change from the previous /status response.
+        self.assertDictEqual(
+            channel.json_body,
+            {
+                "current_updates": {
+                    "master": {
+                        "name": "test_update",
+                        "average_items_per_ms": 0.1,
+                        "total_duration_ms": 1000.0,
+                        "total_item_count": 100,
+                    }
+                },
+                "enabled": False,
+            },
+        )
+
+        # Re-enable the background updates.
+
+        channel = self.make_request(
+            "POST",
+            "/_synapse/admin/v1/background_updates/enabled",
+            content={"enabled": True},
+            access_token=self.admin_user_tok,
+        )
+        self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+
+        self.assertDictEqual(channel.json_body, {"enabled": True})
+
+        self.reactor.pump([1.0, 1.0])
+
+        channel = self.make_request(
+            "GET",
+            "/_synapse/admin/v1/background_updates/status",
+            access_token=self.admin_user_tok,
+        )
+        self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+
+        # Background updates should be enabled and making progress.
+        self.assertDictEqual(
+            channel.json_body,
+            {
+                "current_updates": {
+                    "master": {
+                        "name": "test_update",
+                        "average_items_per_ms": 0.1,
+                        "total_duration_ms": 2000.0,
+                        "total_item_count": 200,
+                    }
+                },
+                "enabled": True,
+            },
+        )