diff --git a/changelog.d/11352.feature b/changelog.d/11352.feature
new file mode 100644
index 0000000000..a4d01b3549
--- /dev/null
+++ b/changelog.d/11352.feature
@@ -0,0 +1 @@
+Add admin API to run background jobs.
\ No newline at end of file
diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml
index 3c931468aa..aee300013f 100644
--- a/docs/sample_config.yaml
+++ b/docs/sample_config.yaml
@@ -2360,8 +2360,8 @@ user_directory:
# indexes were (re)built was before Synapse 1.44, you'll have to
# rebuild the indexes in order to search through all known users.
# These indexes are built the first time Synapse starts; admins can
- # manually trigger a rebuild following the instructions at
- # https://matrix-org.github.io/synapse/latest/user_directory.html
+ # manually trigger a rebuild via API following the instructions at
+ # https://matrix-org.github.io/synapse/latest/usage/administration/admin_api/background_updates.html#run
#
# Uncomment to return search results containing all known users, even if that
# user does not share a room with the requester.
diff --git a/docs/usage/administration/admin_api/background_updates.md b/docs/usage/administration/admin_api/background_updates.md
index b36d7fe398..9f6ac7d567 100644
--- a/docs/usage/administration/admin_api/background_updates.md
+++ b/docs/usage/administration/admin_api/background_updates.md
@@ -42,7 +42,6 @@ For each update:
`average_items_per_ms` how many items are processed per millisecond based on an exponential average.
-
## Enabled
This API allow pausing background updates.
@@ -82,3 +81,29 @@ The API returns the `enabled` param.
```
There is also a `GET` version which returns the `enabled` state.
+
+
+## Run
+
+This API schedules a specific background update to run. The job starts immediately after calling the API.
+
+
+The API is:
+
+```
+POST /_synapse/admin/v1/background_updates/start_job
+```
+
+with the following body:
+
+```json
+{
+ "job_name": "populate_stats_process_rooms"
+}
+```
+
+The following JSON body parameters are available:
+
+- `job_name` - A string which job to run. Valid values are:
+ - `populate_stats_process_rooms` - Recalculate the stats for all rooms.
+ - `regenerate_directory` - Recalculate the [user directory](../../../user_directory.md) if it is stale or out of sync.
diff --git a/docs/user_directory.md b/docs/user_directory.md
index 07fe954891..c4794b04cf 100644
--- a/docs/user_directory.md
+++ b/docs/user_directory.md
@@ -6,9 +6,9 @@ on this particular server - i.e. ones which your account shares a room with, or
who are present in a publicly viewable room present on the server.
The directory info is stored in various tables, which can (typically after
-DB corruption) get stale or out of sync. If this happens, for now the
-solution to fix it is to execute the SQL [here](https://github.com/matrix-org/synapse/blob/master/synapse/storage/schema/main/delta/53/user_dir_populate.sql)
-and then restart synapse. This should then start a background task to
+DB corruption) get stale or out of sync. If this happens, for now the
+solution to fix it is to use the [admin API](usage/administration/admin_api/background_updates.md#run)
+and execute the job `regenerate_directory`. This should then start a background task to
flush the current tables and regenerate the directory.
Data model
diff --git a/synapse/config/user_directory.py b/synapse/config/user_directory.py
index 2552f688d0..6d6678c7e4 100644
--- a/synapse/config/user_directory.py
+++ b/synapse/config/user_directory.py
@@ -53,8 +53,8 @@ class UserDirectoryConfig(Config):
# indexes were (re)built was before Synapse 1.44, you'll have to
# rebuild the indexes in order to search through all known users.
# These indexes are built the first time Synapse starts; admins can
- # manually trigger a rebuild following the instructions at
- # https://matrix-org.github.io/synapse/latest/user_directory.html
+ # manually trigger a rebuild via API following the instructions at
+ # https://matrix-org.github.io/synapse/latest/usage/administration/admin_api/background_updates.html#run
#
# Uncomment to return search results containing all known users, even if that
# user does not share a room with the requester.
diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py
index 65b76fa10c..ee4a5e481b 100644
--- a/synapse/rest/admin/__init__.py
+++ b/synapse/rest/admin/__init__.py
@@ -28,6 +28,7 @@ from synapse.rest.admin._base import admin_patterns, assert_requester_is_admin
from synapse.rest.admin.background_updates import (
BackgroundUpdateEnabledRestServlet,
BackgroundUpdateRestServlet,
+ BackgroundUpdateStartJobRestServlet,
)
from synapse.rest.admin.devices import (
DeleteDevicesRestServlet,
@@ -261,6 +262,7 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
SendServerNoticeServlet(hs).register(http_server)
BackgroundUpdateEnabledRestServlet(hs).register(http_server)
BackgroundUpdateRestServlet(hs).register(http_server)
+ BackgroundUpdateStartJobRestServlet(hs).register(http_server)
def register_servlets_for_client_rest_resource(
diff --git a/synapse/rest/admin/background_updates.py b/synapse/rest/admin/background_updates.py
index 0d0183bf20..479672d4d5 100644
--- a/synapse/rest/admin/background_updates.py
+++ b/synapse/rest/admin/background_updates.py
@@ -12,10 +12,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
+from http import HTTPStatus
from typing import TYPE_CHECKING, Tuple
from synapse.api.errors import SynapseError
-from synapse.http.servlet import RestServlet, parse_json_object_from_request
+from synapse.http.servlet import (
+ RestServlet,
+ assert_params_in_dict,
+ parse_json_object_from_request,
+)
from synapse.http.site import SynapseRequest
from synapse.rest.admin._base import admin_patterns, assert_user_is_admin
from synapse.types import JsonDict
@@ -29,37 +34,36 @@ logger = logging.getLogger(__name__)
class BackgroundUpdateEnabledRestServlet(RestServlet):
"""Allows temporarily disabling background updates"""
- PATTERNS = admin_patterns("/background_updates/enabled")
+ PATTERNS = admin_patterns("/background_updates/enabled$")
def __init__(self, hs: "HomeServer"):
- self.group_server = hs.get_groups_server_handler()
- self.is_mine_id = hs.is_mine_id
- self.auth = hs.get_auth()
-
- self.data_stores = hs.get_datastores()
+ self._auth = hs.get_auth()
+ self._data_stores = hs.get_datastores()
async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
- requester = await self.auth.get_user_by_req(request)
- await assert_user_is_admin(self.auth, requester.user)
+ requester = await self._auth.get_user_by_req(request)
+ await assert_user_is_admin(self._auth, requester.user)
# We need to check that all configured databases have updates enabled.
# (They *should* all be in sync.)
- enabled = all(db.updates.enabled for db in self.data_stores.databases)
+ enabled = all(db.updates.enabled for db in self._data_stores.databases)
- return 200, {"enabled": enabled}
+ return HTTPStatus.OK, {"enabled": enabled}
async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
- requester = await self.auth.get_user_by_req(request)
- await assert_user_is_admin(self.auth, requester.user)
+ requester = await self._auth.get_user_by_req(request)
+ await assert_user_is_admin(self._auth, requester.user)
body = parse_json_object_from_request(request)
enabled = body.get("enabled", True)
if not isinstance(enabled, bool):
- raise SynapseError(400, "'enabled' parameter must be a boolean")
+ raise SynapseError(
+ HTTPStatus.BAD_REQUEST, "'enabled' parameter must be a boolean"
+ )
- for db in self.data_stores.databases:
+ for db in self._data_stores.databases:
db.updates.enabled = enabled
# If we're re-enabling them ensure that we start the background
@@ -67,32 +71,29 @@ class BackgroundUpdateEnabledRestServlet(RestServlet):
if enabled:
db.updates.start_doing_background_updates()
- return 200, {"enabled": enabled}
+ return HTTPStatus.OK, {"enabled": enabled}
class BackgroundUpdateRestServlet(RestServlet):
"""Fetch information about background updates"""
- PATTERNS = admin_patterns("/background_updates/status")
+ PATTERNS = admin_patterns("/background_updates/status$")
def __init__(self, hs: "HomeServer"):
- self.group_server = hs.get_groups_server_handler()
- self.is_mine_id = hs.is_mine_id
- self.auth = hs.get_auth()
-
- self.data_stores = hs.get_datastores()
+ self._auth = hs.get_auth()
+ self._data_stores = hs.get_datastores()
async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
- requester = await self.auth.get_user_by_req(request)
- await assert_user_is_admin(self.auth, requester.user)
+ requester = await self._auth.get_user_by_req(request)
+ await assert_user_is_admin(self._auth, requester.user)
# We need to check that all configured databases have updates enabled.
# (They *should* all be in sync.)
- enabled = all(db.updates.enabled for db in self.data_stores.databases)
+ enabled = all(db.updates.enabled for db in self._data_stores.databases)
current_updates = {}
- for db in self.data_stores.databases:
+ for db in self._data_stores.databases:
update = db.updates.get_current_update()
if not update:
continue
@@ -104,4 +105,72 @@ class BackgroundUpdateRestServlet(RestServlet):
"average_items_per_ms": update.average_items_per_ms(),
}
- return 200, {"enabled": enabled, "current_updates": current_updates}
+ return HTTPStatus.OK, {"enabled": enabled, "current_updates": current_updates}
+
+
+class BackgroundUpdateStartJobRestServlet(RestServlet):
+ """Allows to start specific background updates"""
+
+ PATTERNS = admin_patterns("/background_updates/start_job")
+
+ def __init__(self, hs: "HomeServer"):
+ self._auth = hs.get_auth()
+ self._store = hs.get_datastore()
+
+ async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
+ requester = await self._auth.get_user_by_req(request)
+ await assert_user_is_admin(self._auth, requester.user)
+
+ body = parse_json_object_from_request(request)
+ assert_params_in_dict(body, ["job_name"])
+
+ job_name = body["job_name"]
+
+ if job_name == "populate_stats_process_rooms":
+ jobs = [
+ {
+ "update_name": "populate_stats_process_rooms",
+ "progress_json": "{}",
+ },
+ ]
+ elif job_name == "regenerate_directory":
+ jobs = [
+ {
+ "update_name": "populate_user_directory_createtables",
+ "progress_json": "{}",
+ "depends_on": "",
+ },
+ {
+ "update_name": "populate_user_directory_process_rooms",
+ "progress_json": "{}",
+ "depends_on": "populate_user_directory_createtables",
+ },
+ {
+ "update_name": "populate_user_directory_process_users",
+ "progress_json": "{}",
+ "depends_on": "populate_user_directory_process_rooms",
+ },
+ {
+ "update_name": "populate_user_directory_cleanup",
+ "progress_json": "{}",
+ "depends_on": "populate_user_directory_process_users",
+ },
+ ]
+ else:
+ raise SynapseError(HTTPStatus.BAD_REQUEST, "Invalid job_name")
+
+ try:
+ await self._store.db_pool.simple_insert_many(
+ table="background_updates",
+ values=jobs,
+ desc=f"admin_api_run_{job_name}",
+ )
+ except self._store.db_pool.engine.module.IntegrityError:
+ raise SynapseError(
+ HTTPStatus.BAD_REQUEST,
+ "Job %s is already in queue of background updates." % (job_name,),
+ )
+
+ self._store.db_pool.updates.start_doing_background_updates()
+
+ return HTTPStatus.OK, {}
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index b9a8ca997e..b104f9032c 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -122,6 +122,8 @@ class BackgroundUpdater:
def start_doing_background_updates(self) -> None:
if self.enabled:
+ # if we start a new background update, not all updates are done.
+ self._all_done = False
run_as_background_process("background_updates", self.run_background_updates)
async def run_background_updates(self, sleep: bool = True) -> None:
diff --git a/tests/rest/admin/test_background_updates.py b/tests/rest/admin/test_background_updates.py
index 78c48db552..1786316763 100644
--- a/tests/rest/admin/test_background_updates.py
+++ b/tests/rest/admin/test_background_updates.py
@@ -11,8 +11,13 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+from http import HTTPStatus
+from typing import Collection
+
+from parameterized import parameterized
import synapse.rest.admin
+from synapse.api.errors import Codes
from synapse.rest.client import login
from synapse.server import HomeServer
@@ -30,6 +35,60 @@ class BackgroundUpdatesTestCase(unittest.HomeserverTestCase):
self.admin_user = self.register_user("admin", "pass", admin=True)
self.admin_user_tok = self.login("admin", "pass")
+ @parameterized.expand(
+ [
+ ("GET", "/_synapse/admin/v1/background_updates/enabled"),
+ ("POST", "/_synapse/admin/v1/background_updates/enabled"),
+ ("GET", "/_synapse/admin/v1/background_updates/status"),
+ ("POST", "/_synapse/admin/v1/background_updates/start_job"),
+ ]
+ )
+ def test_requester_is_no_admin(self, method: str, url: str):
+ """
+ If the user is not a server admin, an error 403 is returned.
+ """
+
+ self.register_user("user", "pass", admin=False)
+ other_user_tok = self.login("user", "pass")
+
+ channel = self.make_request(
+ method,
+ url,
+ content={},
+ access_token=other_user_tok,
+ )
+
+ self.assertEqual(HTTPStatus.FORBIDDEN, channel.code, msg=channel.json_body)
+ self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
+
+ def test_invalid_parameter(self):
+ """
+ If parameters are invalid, an error is returned.
+ """
+ url = "/_synapse/admin/v1/background_updates/start_job"
+
+ # empty content
+ channel = self.make_request(
+ "POST",
+ url,
+ content={},
+ access_token=self.admin_user_tok,
+ )
+
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
+ self.assertEqual(Codes.MISSING_PARAM, channel.json_body["errcode"])
+
+ # job_name invalid
+ channel = self.make_request(
+ "POST",
+ url,
+ content={"job_name": "unknown"},
+ access_token=self.admin_user_tok,
+ )
+
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
+ self.assertEqual(Codes.UNKNOWN, channel.json_body["errcode"])
+
def _register_bg_update(self):
"Adds a bg update but doesn't start it"
@@ -60,7 +119,7 @@ class BackgroundUpdatesTestCase(unittest.HomeserverTestCase):
"/_synapse/admin/v1/background_updates/status",
access_token=self.admin_user_tok,
)
- self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
# Background updates should be enabled, but none should be running.
self.assertDictEqual(
@@ -82,7 +141,7 @@ class BackgroundUpdatesTestCase(unittest.HomeserverTestCase):
"/_synapse/admin/v1/background_updates/status",
access_token=self.admin_user_tok,
)
- self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
# Background updates should be enabled, and one should be running.
self.assertDictEqual(
@@ -114,7 +173,7 @@ class BackgroundUpdatesTestCase(unittest.HomeserverTestCase):
"/_synapse/admin/v1/background_updates/enabled",
access_token=self.admin_user_tok,
)
- self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertDictEqual(channel.json_body, {"enabled": True})
# Disable the BG updates
@@ -124,7 +183,7 @@ class BackgroundUpdatesTestCase(unittest.HomeserverTestCase):
content={"enabled": False},
access_token=self.admin_user_tok,
)
- self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertDictEqual(channel.json_body, {"enabled": False})
# Advance a bit and get the current status, note this will finish the in
@@ -137,7 +196,7 @@ class BackgroundUpdatesTestCase(unittest.HomeserverTestCase):
"/_synapse/admin/v1/background_updates/status",
access_token=self.admin_user_tok,
)
- self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertDictEqual(
channel.json_body,
{
@@ -162,7 +221,7 @@ class BackgroundUpdatesTestCase(unittest.HomeserverTestCase):
"/_synapse/admin/v1/background_updates/status",
access_token=self.admin_user_tok,
)
- self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
# There should be no change from the previous /status response.
self.assertDictEqual(
@@ -188,7 +247,7 @@ class BackgroundUpdatesTestCase(unittest.HomeserverTestCase):
content={"enabled": True},
access_token=self.admin_user_tok,
)
- self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertDictEqual(channel.json_body, {"enabled": True})
@@ -199,7 +258,7 @@ class BackgroundUpdatesTestCase(unittest.HomeserverTestCase):
"/_synapse/admin/v1/background_updates/status",
access_token=self.admin_user_tok,
)
- self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
# Background updates should be enabled and making progress.
self.assertDictEqual(
@@ -216,3 +275,82 @@ class BackgroundUpdatesTestCase(unittest.HomeserverTestCase):
"enabled": True,
},
)
+
+ @parameterized.expand(
+ [
+ ("populate_stats_process_rooms", ["populate_stats_process_rooms"]),
+ (
+ "regenerate_directory",
+ [
+ "populate_user_directory_createtables",
+ "populate_user_directory_process_rooms",
+ "populate_user_directory_process_users",
+ "populate_user_directory_cleanup",
+ ],
+ ),
+ ]
+ )
+ def test_start_backround_job(self, job_name: str, updates: Collection[str]):
+ """
+ Test that background updates add to database and be processed.
+
+ Args:
+ job_name: name of the job to call with API
+ updates: collection of background updates to be started
+ """
+
+ # no background update is waiting
+ self.assertTrue(
+ self.get_success(
+ self.store.db_pool.updates.has_completed_background_updates()
+ )
+ )
+
+ channel = self.make_request(
+ "POST",
+ "/_synapse/admin/v1/background_updates/start_job",
+ content={"job_name": job_name},
+ access_token=self.admin_user_tok,
+ )
+
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
+
+ # test that each background update is waiting now
+ for update in updates:
+ self.assertFalse(
+ self.get_success(
+ self.store.db_pool.updates.has_completed_background_update(update)
+ )
+ )
+
+ self.wait_for_background_updates()
+
+ # background updates are done
+ self.assertTrue(
+ self.get_success(
+ self.store.db_pool.updates.has_completed_background_updates()
+ )
+ )
+
+ def test_start_backround_job_twice(self):
+ """Test that add a background update twice return an error."""
+
+ # add job to database
+ self.get_success(
+ self.store.db_pool.simple_insert(
+ table="background_updates",
+ values={
+ "update_name": "populate_stats_process_rooms",
+ "progress_json": "{}",
+ },
+ )
+ )
+
+ channel = self.make_request(
+ "POST",
+ "/_synapse/admin/v1/background_updates/start_job",
+ content={"job_name": "populate_stats_process_rooms"},
+ access_token=self.admin_user_tok,
+ )
+
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
|