summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/11352.feature1
-rw-r--r--docs/sample_config.yaml4
-rw-r--r--docs/usage/administration/admin_api/background_updates.md27
-rw-r--r--docs/user_directory.md6
-rw-r--r--synapse/config/user_directory.py4
-rw-r--r--synapse/rest/admin/__init__.py2
-rw-r--r--synapse/rest/admin/background_updates.py123
-rw-r--r--synapse/storage/background_updates.py2
-rw-r--r--tests/rest/admin/test_background_updates.py154
9 files changed, 280 insertions, 43 deletions
diff --git a/changelog.d/11352.feature b/changelog.d/11352.feature
new file mode 100644
index 0000000000..a4d01b3549
--- /dev/null
+++ b/changelog.d/11352.feature
@@ -0,0 +1 @@
+Add admin API to run background jobs.
\ No newline at end of file
diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml
index 3c931468aa..aee300013f 100644
--- a/docs/sample_config.yaml
+++ b/docs/sample_config.yaml
@@ -2360,8 +2360,8 @@ user_directory:
     # indexes were (re)built was before Synapse 1.44, you'll have to
     # rebuild the indexes in order to search through all known users.
     # These indexes are built the first time Synapse starts; admins can
-    # manually trigger a rebuild following the instructions at
-    #     https://matrix-org.github.io/synapse/latest/user_directory.html
+    # manually trigger a rebuild via API following the instructions at
+    #     https://matrix-org.github.io/synapse/latest/usage/administration/admin_api/background_updates.html#run
     #
     # Uncomment to return search results containing all known users, even if that
     # user does not share a room with the requester.
diff --git a/docs/usage/administration/admin_api/background_updates.md b/docs/usage/administration/admin_api/background_updates.md
index b36d7fe398..9f6ac7d567 100644
--- a/docs/usage/administration/admin_api/background_updates.md
+++ b/docs/usage/administration/admin_api/background_updates.md
@@ -42,7 +42,6 @@ For each update:
 `average_items_per_ms` how many items are processed per millisecond based on an exponential average.
 
 
-
 ## Enabled
 
 This API allow pausing background updates.
@@ -82,3 +81,29 @@ The API returns the `enabled` param.
 ```
 
 There is also a `GET` version which returns the `enabled` state.
+
+
+## Run
+
+This API schedules a specific background update to run. The job starts immediately after calling the API.
+
+
+The API is:
+
+```
+POST /_synapse/admin/v1/background_updates/start_job
+```
+
+with the following body:
+
+```json
+{
+    "job_name": "populate_stats_process_rooms"
+}
+```
+
+The following JSON body parameters are available:
+
+- `job_name` - A string which job to run. Valid values are:
+  - `populate_stats_process_rooms` - Recalculate the stats for all rooms.
+  - `regenerate_directory` - Recalculate the [user directory](../../../user_directory.md) if it is stale or out of sync.
diff --git a/docs/user_directory.md b/docs/user_directory.md
index 07fe954891..c4794b04cf 100644
--- a/docs/user_directory.md
+++ b/docs/user_directory.md
@@ -6,9 +6,9 @@ on this particular server - i.e. ones which your account shares a room with, or
 who are present in a publicly viewable room present on the server.
 
 The directory info is stored in various tables, which can (typically after
-DB corruption) get stale or out of sync.  If this happens, for now the
-solution to fix it is to execute the SQL [here](https://github.com/matrix-org/synapse/blob/master/synapse/storage/schema/main/delta/53/user_dir_populate.sql)
-and then restart synapse. This should then start a background task to
+DB corruption) get stale or out of sync. If this happens, for now the
+solution to fix it is to use the [admin API](usage/administration/admin_api/background_updates.md#run)
+and execute the job `regenerate_directory`. This should then start a background task to
 flush the current tables and regenerate the directory.
 
 Data model
diff --git a/synapse/config/user_directory.py b/synapse/config/user_directory.py
index 2552f688d0..6d6678c7e4 100644
--- a/synapse/config/user_directory.py
+++ b/synapse/config/user_directory.py
@@ -53,8 +53,8 @@ class UserDirectoryConfig(Config):
             # indexes were (re)built was before Synapse 1.44, you'll have to
             # rebuild the indexes in order to search through all known users.
             # These indexes are built the first time Synapse starts; admins can
-            # manually trigger a rebuild following the instructions at
-            #     https://matrix-org.github.io/synapse/latest/user_directory.html
+            # manually trigger a rebuild via API following the instructions at
+            #     https://matrix-org.github.io/synapse/latest/usage/administration/admin_api/background_updates.html#run
             #
             # Uncomment to return search results containing all known users, even if that
             # user does not share a room with the requester.
diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py
index 65b76fa10c..ee4a5e481b 100644
--- a/synapse/rest/admin/__init__.py
+++ b/synapse/rest/admin/__init__.py
@@ -28,6 +28,7 @@ from synapse.rest.admin._base import admin_patterns, assert_requester_is_admin
 from synapse.rest.admin.background_updates import (
     BackgroundUpdateEnabledRestServlet,
     BackgroundUpdateRestServlet,
+    BackgroundUpdateStartJobRestServlet,
 )
 from synapse.rest.admin.devices import (
     DeleteDevicesRestServlet,
@@ -261,6 +262,7 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
         SendServerNoticeServlet(hs).register(http_server)
         BackgroundUpdateEnabledRestServlet(hs).register(http_server)
         BackgroundUpdateRestServlet(hs).register(http_server)
+        BackgroundUpdateStartJobRestServlet(hs).register(http_server)
 
 
 def register_servlets_for_client_rest_resource(
diff --git a/synapse/rest/admin/background_updates.py b/synapse/rest/admin/background_updates.py
index 0d0183bf20..479672d4d5 100644
--- a/synapse/rest/admin/background_updates.py
+++ b/synapse/rest/admin/background_updates.py
@@ -12,10 +12,15 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
+from http import HTTPStatus
 from typing import TYPE_CHECKING, Tuple
 
 from synapse.api.errors import SynapseError
-from synapse.http.servlet import RestServlet, parse_json_object_from_request
+from synapse.http.servlet import (
+    RestServlet,
+    assert_params_in_dict,
+    parse_json_object_from_request,
+)
 from synapse.http.site import SynapseRequest
 from synapse.rest.admin._base import admin_patterns, assert_user_is_admin
 from synapse.types import JsonDict
@@ -29,37 +34,36 @@ logger = logging.getLogger(__name__)
 class BackgroundUpdateEnabledRestServlet(RestServlet):
     """Allows temporarily disabling background updates"""
 
-    PATTERNS = admin_patterns("/background_updates/enabled")
+    PATTERNS = admin_patterns("/background_updates/enabled$")
 
     def __init__(self, hs: "HomeServer"):
-        self.group_server = hs.get_groups_server_handler()
-        self.is_mine_id = hs.is_mine_id
-        self.auth = hs.get_auth()
-
-        self.data_stores = hs.get_datastores()
+        self._auth = hs.get_auth()
+        self._data_stores = hs.get_datastores()
 
     async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
-        requester = await self.auth.get_user_by_req(request)
-        await assert_user_is_admin(self.auth, requester.user)
+        requester = await self._auth.get_user_by_req(request)
+        await assert_user_is_admin(self._auth, requester.user)
 
         # We need to check that all configured databases have updates enabled.
         # (They *should* all be in sync.)
-        enabled = all(db.updates.enabled for db in self.data_stores.databases)
+        enabled = all(db.updates.enabled for db in self._data_stores.databases)
 
-        return 200, {"enabled": enabled}
+        return HTTPStatus.OK, {"enabled": enabled}
 
     async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
-        requester = await self.auth.get_user_by_req(request)
-        await assert_user_is_admin(self.auth, requester.user)
+        requester = await self._auth.get_user_by_req(request)
+        await assert_user_is_admin(self._auth, requester.user)
 
         body = parse_json_object_from_request(request)
 
         enabled = body.get("enabled", True)
 
         if not isinstance(enabled, bool):
-            raise SynapseError(400, "'enabled' parameter must be a boolean")
+            raise SynapseError(
+                HTTPStatus.BAD_REQUEST, "'enabled' parameter must be a boolean"
+            )
 
-        for db in self.data_stores.databases:
+        for db in self._data_stores.databases:
             db.updates.enabled = enabled
 
             # If we're re-enabling them ensure that we start the background
@@ -67,32 +71,29 @@ class BackgroundUpdateEnabledRestServlet(RestServlet):
             if enabled:
                 db.updates.start_doing_background_updates()
 
-        return 200, {"enabled": enabled}
+        return HTTPStatus.OK, {"enabled": enabled}
 
 
 class BackgroundUpdateRestServlet(RestServlet):
     """Fetch information about background updates"""
 
-    PATTERNS = admin_patterns("/background_updates/status")
+    PATTERNS = admin_patterns("/background_updates/status$")
 
     def __init__(self, hs: "HomeServer"):
-        self.group_server = hs.get_groups_server_handler()
-        self.is_mine_id = hs.is_mine_id
-        self.auth = hs.get_auth()
-
-        self.data_stores = hs.get_datastores()
+        self._auth = hs.get_auth()
+        self._data_stores = hs.get_datastores()
 
     async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
-        requester = await self.auth.get_user_by_req(request)
-        await assert_user_is_admin(self.auth, requester.user)
+        requester = await self._auth.get_user_by_req(request)
+        await assert_user_is_admin(self._auth, requester.user)
 
         # We need to check that all configured databases have updates enabled.
         # (They *should* all be in sync.)
-        enabled = all(db.updates.enabled for db in self.data_stores.databases)
+        enabled = all(db.updates.enabled for db in self._data_stores.databases)
 
         current_updates = {}
 
-        for db in self.data_stores.databases:
+        for db in self._data_stores.databases:
             update = db.updates.get_current_update()
             if not update:
                 continue
@@ -104,4 +105,72 @@ class BackgroundUpdateRestServlet(RestServlet):
                 "average_items_per_ms": update.average_items_per_ms(),
             }
 
-        return 200, {"enabled": enabled, "current_updates": current_updates}
+        return HTTPStatus.OK, {"enabled": enabled, "current_updates": current_updates}
+
+
+class BackgroundUpdateStartJobRestServlet(RestServlet):
+    """Allows to start specific background updates"""
+
+    PATTERNS = admin_patterns("/background_updates/start_job")
+
+    def __init__(self, hs: "HomeServer"):
+        self._auth = hs.get_auth()
+        self._store = hs.get_datastore()
+
+    async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
+        requester = await self._auth.get_user_by_req(request)
+        await assert_user_is_admin(self._auth, requester.user)
+
+        body = parse_json_object_from_request(request)
+        assert_params_in_dict(body, ["job_name"])
+
+        job_name = body["job_name"]
+
+        if job_name == "populate_stats_process_rooms":
+            jobs = [
+                {
+                    "update_name": "populate_stats_process_rooms",
+                    "progress_json": "{}",
+                },
+            ]
+        elif job_name == "regenerate_directory":
+            jobs = [
+                {
+                    "update_name": "populate_user_directory_createtables",
+                    "progress_json": "{}",
+                    "depends_on": "",
+                },
+                {
+                    "update_name": "populate_user_directory_process_rooms",
+                    "progress_json": "{}",
+                    "depends_on": "populate_user_directory_createtables",
+                },
+                {
+                    "update_name": "populate_user_directory_process_users",
+                    "progress_json": "{}",
+                    "depends_on": "populate_user_directory_process_rooms",
+                },
+                {
+                    "update_name": "populate_user_directory_cleanup",
+                    "progress_json": "{}",
+                    "depends_on": "populate_user_directory_process_users",
+                },
+            ]
+        else:
+            raise SynapseError(HTTPStatus.BAD_REQUEST, "Invalid job_name")
+
+        try:
+            await self._store.db_pool.simple_insert_many(
+                table="background_updates",
+                values=jobs,
+                desc=f"admin_api_run_{job_name}",
+            )
+        except self._store.db_pool.engine.module.IntegrityError:
+            raise SynapseError(
+                HTTPStatus.BAD_REQUEST,
+                "Job %s is already in queue of background updates." % (job_name,),
+            )
+
+        self._store.db_pool.updates.start_doing_background_updates()
+
+        return HTTPStatus.OK, {}
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index b9a8ca997e..b104f9032c 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -122,6 +122,8 @@ class BackgroundUpdater:
 
     def start_doing_background_updates(self) -> None:
         if self.enabled:
+            # if we start a new background update, not all updates are done.
+            self._all_done = False
             run_as_background_process("background_updates", self.run_background_updates)
 
     async def run_background_updates(self, sleep: bool = True) -> None:
diff --git a/tests/rest/admin/test_background_updates.py b/tests/rest/admin/test_background_updates.py
index 78c48db552..1786316763 100644
--- a/tests/rest/admin/test_background_updates.py
+++ b/tests/rest/admin/test_background_updates.py
@@ -11,8 +11,13 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+from http import HTTPStatus
+from typing import Collection
+
+from parameterized import parameterized
 
 import synapse.rest.admin
+from synapse.api.errors import Codes
 from synapse.rest.client import login
 from synapse.server import HomeServer
 
@@ -30,6 +35,60 @@ class BackgroundUpdatesTestCase(unittest.HomeserverTestCase):
         self.admin_user = self.register_user("admin", "pass", admin=True)
         self.admin_user_tok = self.login("admin", "pass")
 
+    @parameterized.expand(
+        [
+            ("GET", "/_synapse/admin/v1/background_updates/enabled"),
+            ("POST", "/_synapse/admin/v1/background_updates/enabled"),
+            ("GET", "/_synapse/admin/v1/background_updates/status"),
+            ("POST", "/_synapse/admin/v1/background_updates/start_job"),
+        ]
+    )
+    def test_requester_is_no_admin(self, method: str, url: str):
+        """
+        If the user is not a server admin, an error 403 is returned.
+        """
+
+        self.register_user("user", "pass", admin=False)
+        other_user_tok = self.login("user", "pass")
+
+        channel = self.make_request(
+            method,
+            url,
+            content={},
+            access_token=other_user_tok,
+        )
+
+        self.assertEqual(HTTPStatus.FORBIDDEN, channel.code, msg=channel.json_body)
+        self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
+
+    def test_invalid_parameter(self):
+        """
+        If parameters are invalid, an error is returned.
+        """
+        url = "/_synapse/admin/v1/background_updates/start_job"
+
+        # empty content
+        channel = self.make_request(
+            "POST",
+            url,
+            content={},
+            access_token=self.admin_user_tok,
+        )
+
+        self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
+        self.assertEqual(Codes.MISSING_PARAM, channel.json_body["errcode"])
+
+        # job_name invalid
+        channel = self.make_request(
+            "POST",
+            url,
+            content={"job_name": "unknown"},
+            access_token=self.admin_user_tok,
+        )
+
+        self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
+        self.assertEqual(Codes.UNKNOWN, channel.json_body["errcode"])
+
     def _register_bg_update(self):
         "Adds a bg update but doesn't start it"
 
@@ -60,7 +119,7 @@ class BackgroundUpdatesTestCase(unittest.HomeserverTestCase):
             "/_synapse/admin/v1/background_updates/status",
             access_token=self.admin_user_tok,
         )
-        self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+        self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
 
         # Background updates should be enabled, but none should be running.
         self.assertDictEqual(
@@ -82,7 +141,7 @@ class BackgroundUpdatesTestCase(unittest.HomeserverTestCase):
             "/_synapse/admin/v1/background_updates/status",
             access_token=self.admin_user_tok,
         )
-        self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+        self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
 
         # Background updates should be enabled, and one should be running.
         self.assertDictEqual(
@@ -114,7 +173,7 @@ class BackgroundUpdatesTestCase(unittest.HomeserverTestCase):
             "/_synapse/admin/v1/background_updates/enabled",
             access_token=self.admin_user_tok,
         )
-        self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+        self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
         self.assertDictEqual(channel.json_body, {"enabled": True})
 
         # Disable the BG updates
@@ -124,7 +183,7 @@ class BackgroundUpdatesTestCase(unittest.HomeserverTestCase):
             content={"enabled": False},
             access_token=self.admin_user_tok,
         )
-        self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+        self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
         self.assertDictEqual(channel.json_body, {"enabled": False})
 
         # Advance a bit and get the current status, note this will finish the in
@@ -137,7 +196,7 @@ class BackgroundUpdatesTestCase(unittest.HomeserverTestCase):
             "/_synapse/admin/v1/background_updates/status",
             access_token=self.admin_user_tok,
         )
-        self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+        self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
         self.assertDictEqual(
             channel.json_body,
             {
@@ -162,7 +221,7 @@ class BackgroundUpdatesTestCase(unittest.HomeserverTestCase):
             "/_synapse/admin/v1/background_updates/status",
             access_token=self.admin_user_tok,
         )
-        self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+        self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
 
         # There should be no change from the previous /status response.
         self.assertDictEqual(
@@ -188,7 +247,7 @@ class BackgroundUpdatesTestCase(unittest.HomeserverTestCase):
             content={"enabled": True},
             access_token=self.admin_user_tok,
         )
-        self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+        self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
 
         self.assertDictEqual(channel.json_body, {"enabled": True})
 
@@ -199,7 +258,7 @@ class BackgroundUpdatesTestCase(unittest.HomeserverTestCase):
             "/_synapse/admin/v1/background_updates/status",
             access_token=self.admin_user_tok,
         )
-        self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+        self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
 
         # Background updates should be enabled and making progress.
         self.assertDictEqual(
@@ -216,3 +275,82 @@ class BackgroundUpdatesTestCase(unittest.HomeserverTestCase):
                 "enabled": True,
             },
         )
+
+    @parameterized.expand(
+        [
+            ("populate_stats_process_rooms", ["populate_stats_process_rooms"]),
+            (
+                "regenerate_directory",
+                [
+                    "populate_user_directory_createtables",
+                    "populate_user_directory_process_rooms",
+                    "populate_user_directory_process_users",
+                    "populate_user_directory_cleanup",
+                ],
+            ),
+        ]
+    )
+    def test_start_backround_job(self, job_name: str, updates: Collection[str]):
+        """
+        Test that background updates add to database and be processed.
+
+        Args:
+            job_name: name of the job to call with API
+            updates: collection of background updates to be started
+        """
+
+        # no background update is waiting
+        self.assertTrue(
+            self.get_success(
+                self.store.db_pool.updates.has_completed_background_updates()
+            )
+        )
+
+        channel = self.make_request(
+            "POST",
+            "/_synapse/admin/v1/background_updates/start_job",
+            content={"job_name": job_name},
+            access_token=self.admin_user_tok,
+        )
+
+        self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
+
+        # test that each background update is waiting now
+        for update in updates:
+            self.assertFalse(
+                self.get_success(
+                    self.store.db_pool.updates.has_completed_background_update(update)
+                )
+            )
+
+        self.wait_for_background_updates()
+
+        # background updates are done
+        self.assertTrue(
+            self.get_success(
+                self.store.db_pool.updates.has_completed_background_updates()
+            )
+        )
+
+    def test_start_backround_job_twice(self):
+        """Test that add a background update twice return an error."""
+
+        # add job to database
+        self.get_success(
+            self.store.db_pool.simple_insert(
+                table="background_updates",
+                values={
+                    "update_name": "populate_stats_process_rooms",
+                    "progress_json": "{}",
+                },
+            )
+        )
+
+        channel = self.make_request(
+            "POST",
+            "/_synapse/admin/v1/background_updates/start_job",
+            content={"job_name": "populate_stats_process_rooms"},
+            access_token=self.admin_user_tok,
+        )
+
+        self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)