summary refs log tree commit diff
diff options
context:
space:
mode:
authorQuentin Gliech <quenting@element.io>2025-05-02 15:37:58 +0200
committerGitHub <noreply@github.com>2025-05-02 15:37:58 +0200
commitb8146d4b03d89a9407125b5934bd7accbe0680e0 (patch)
treebecf0d2ca7d01bc98609b7cfa6a0fdf595ba6c6d
parentApply `should_drop_federated_event` to federation invites (#18330) (diff)
downloadsynapse-b8146d4b03d89a9407125b5934bd7accbe0680e0.tar.xz
Allow a few admin APIs used by MAS to run on workers (#18313)
This should be reviewed commit by commit.

It adds a few admin servlets that are used by MAS when in delegation
mode to workers

---------

Co-authored-by: Olivier 'reivilibre <oliverw@matrix.org>
Co-authored-by: Devon Hudson <devon.dmytro@gmail.com>
Co-authored-by: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com>
-rw-r--r--changelog.d/18313.misc1
-rw-r--r--docs/workers.md9
-rw-r--r--synapse/app/generic_worker.py18
-rw-r--r--synapse/app/homeserver.py9
-rw-r--r--synapse/handlers/set_password.py18
-rw-r--r--synapse/rest/__init__.py1
-rw-r--r--synapse/rest/admin/__init__.py29
-rw-r--r--synapse/rest/admin/devices.py26
-rw-r--r--synapse/storage/databases/main/end_to_end_keys.py78
-rw-r--r--synapse/storage/databases/main/registration.py260
10 files changed, 249 insertions, 200 deletions
diff --git a/changelog.d/18313.misc b/changelog.d/18313.misc
new file mode 100644

index 0000000000..febf3ac06e --- /dev/null +++ b/changelog.d/18313.misc
@@ -0,0 +1 @@ +Allow a few admin APIs used by matrix-authentication-service to run on workers. diff --git a/docs/workers.md b/docs/workers.md
index 2597e78217..45a00696f3 100644 --- a/docs/workers.md +++ b/docs/workers.md
@@ -323,6 +323,15 @@ For multiple workers not handling the SSO endpoints properly, see [#7530](https://github.com/matrix-org/synapse/issues/7530) and [#9427](https://github.com/matrix-org/synapse/issues/9427). +Additionally, when MSC3861 is enabled (`experimental_features.msc3861.enabled` +set to `true`), the following endpoints can be handled by the worker: + + ^/_synapse/admin/v2/users/[^/]+$ + ^/_synapse/admin/v1/username_available$ + ^/_synapse/admin/v1/users/[^/]+/_allow_cross_signing_replacement_without_uia$ + # Only the GET method: + ^/_synapse/admin/v1/users/[^/]+/devices$ + Note that a [HTTP listener](usage/configuration/config_documentation.md#listeners) with `client` and `federation` `resources` must be configured in the [`worker_listeners`](usage/configuration/config_documentation.md#worker_listeners) diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index e4120ed424..f495d5b7e4 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py
@@ -51,8 +51,7 @@ from synapse.http.server import JsonResource, OptionsResource from synapse.logging.context import LoggingContext from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource -from synapse.rest import ClientRestResource -from synapse.rest.admin import AdminRestResource, register_servlets_for_media_repo +from synapse.rest import ClientRestResource, admin from synapse.rest.health import HealthResource from synapse.rest.key.v2 import KeyResource from synapse.rest.synapse.client import build_synapse_client_resource_tree @@ -176,8 +175,13 @@ class GenericWorkerServer(HomeServer): def _listen_http(self, listener_config: ListenerConfig) -> None: assert listener_config.http_options is not None - # We always include a health resource. - resources: Dict[str, Resource] = {"/health": HealthResource()} + # We always include an admin resource that we populate with servlets as needed + admin_resource = JsonResource(self, canonical_json=False) + resources: Dict[str, Resource] = { + # We always include a health resource. + "/health": HealthResource(), + "/_synapse/admin": admin_resource, + } for res in listener_config.http_options.resources: for name in res.names: @@ -190,7 +194,7 @@ class GenericWorkerServer(HomeServer): resources.update(build_synapse_client_resource_tree(self)) resources["/.well-known"] = well_known_resource(self) - resources["/_synapse/admin"] = AdminRestResource(self) + admin.register_servlets(self, admin_resource) elif name == "federation": resources[FEDERATION_PREFIX] = TransportLayerServer(self) @@ -200,15 +204,13 @@ class GenericWorkerServer(HomeServer): # We need to serve the admin servlets for media on the # worker. - admin_resource = JsonResource(self, canonical_json=False) - register_servlets_for_media_repo(self, admin_resource) + admin.register_servlets_for_media_repo(self, admin_resource) resources.update( { MEDIA_R0_PREFIX: media_repo, MEDIA_V3_PREFIX: media_repo, LEGACY_MEDIA_PREFIX: media_repo, - "/_synapse/admin": admin_resource, } ) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 2a824e8457..6da2194cf7 100644 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py
@@ -54,6 +54,7 @@ from synapse.config.server import ListenerConfig, TCPListenerConfig from synapse.federation.transport.server import TransportLayerServer from synapse.http.additional_resource import AdditionalResource from synapse.http.server import ( + JsonResource, OptionsResource, RootOptionsRedirectResource, StaticResource, @@ -61,8 +62,7 @@ from synapse.http.server import ( from synapse.logging.context import LoggingContext from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource -from synapse.rest import ClientRestResource -from synapse.rest.admin import AdminRestResource +from synapse.rest import ClientRestResource, admin from synapse.rest.health import HealthResource from synapse.rest.key.v2 import KeyResource from synapse.rest.synapse.client import build_synapse_client_resource_tree @@ -180,11 +180,14 @@ class SynapseHomeServer(HomeServer): if compress: client_resource = gz_wrap(client_resource) + admin_resource = JsonResource(self, canonical_json=False) + admin.register_servlets(self, admin_resource) + resources.update( { CLIENT_API_PREFIX: client_resource, "/.well-known": well_known_resource(self), - "/_synapse/admin": AdminRestResource(self), + "/_synapse/admin": admin_resource, **build_synapse_client_resource_tree(self), } ) diff --git a/synapse/handlers/set_password.py b/synapse/handlers/set_password.py
index 29cc03d71d..94301add9e 100644 --- a/synapse/handlers/set_password.py +++ b/synapse/handlers/set_password.py
@@ -36,10 +36,17 @@ class SetPasswordHandler: def __init__(self, hs: "HomeServer"): self.store = hs.get_datastores().main self._auth_handler = hs.get_auth_handler() - # This can only be instantiated on the main process. - device_handler = hs.get_device_handler() - assert isinstance(device_handler, DeviceHandler) - self._device_handler = device_handler + + # We don't need the device handler if password changing is disabled. + # This allows us to instantiate the SetPasswordHandler on the workers + # that have admin APIs for MAS + if self._auth_handler.can_change_password(): + # This can only be instantiated on the main process. + device_handler = hs.get_device_handler() + assert isinstance(device_handler, DeviceHandler) + self._device_handler: Optional[DeviceHandler] = device_handler + else: + self._device_handler = None async def set_password( self, @@ -51,6 +58,9 @@ class SetPasswordHandler: if not self._auth_handler.can_change_password(): raise SynapseError(403, "Password change disabled", errcode=Codes.FORBIDDEN) + # We should have this available only if password changing is enabled. + assert self._device_handler is not None + try: await self.store.user_set_password_hash(user_id, password_hash) except StoreError as e: diff --git a/synapse/rest/__init__.py b/synapse/rest/__init__.py
index 2f1ef84e26..00f108de08 100644 --- a/synapse/rest/__init__.py +++ b/synapse/rest/__init__.py
@@ -187,7 +187,6 @@ class ClientRestResource(JsonResource): mutual_rooms.register_servlets, login_token_request.register_servlets, rendezvous.register_servlets, - auth_metadata.register_servlets, ]: continue diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py
index cf809d1a27..b1335fed66 100644 --- a/synapse/rest/admin/__init__.py +++ b/synapse/rest/admin/__init__.py
@@ -39,7 +39,7 @@ from typing import TYPE_CHECKING, Optional, Tuple from synapse.api.errors import Codes, NotFoundError, SynapseError from synapse.handlers.pagination import PURGE_HISTORY_ACTION_NAME -from synapse.http.server import HttpServer, JsonResource +from synapse.http.server import HttpServer from synapse.http.servlet import RestServlet, parse_json_object_from_request from synapse.http.site import SynapseRequest from synapse.rest.admin._base import admin_patterns, assert_requester_is_admin @@ -51,6 +51,7 @@ from synapse.rest.admin.background_updates import ( from synapse.rest.admin.devices import ( DeleteDevicesRestServlet, DeviceRestServlet, + DevicesGetRestServlet, DevicesRestServlet, ) from synapse.rest.admin.event_reports import ( @@ -264,14 +265,6 @@ class PurgeHistoryStatusRestServlet(RestServlet): ######################################################################################## -class AdminRestResource(JsonResource): - """The REST resource which gets mounted at /_synapse/admin""" - - def __init__(self, hs: "HomeServer"): - JsonResource.__init__(self, hs, canonical_json=False) - register_servlets(hs, self) - - def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None: """ Register all the admin servlets. @@ -280,6 +273,10 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None: # Admin servlets below may not work on workers. if hs.config.worker.worker_app is not None: + # Some admin servlets can be mounted on workers when MSC3861 is enabled. + if hs.config.experimental.msc3861.enabled: + register_servlets_for_msc3861_delegation(hs, http_server) + return register_servlets_for_client_rest_resource(hs, http_server) @@ -367,4 +364,16 @@ def register_servlets_for_client_rest_resource( ListMediaInRoom(hs).register(http_server) # don't add more things here: new servlets should only be exposed on - # /_synapse/admin so should not go here. Instead register them in AdminRestResource. + # /_synapse/admin so should not go here. Instead register them in register_servlets. + + +def register_servlets_for_msc3861_delegation( + hs: "HomeServer", http_server: HttpServer +) -> None: + """Register servlets needed by MAS when MSC3861 is enabled""" + assert hs.config.experimental.msc3861.enabled + + UserRestServletV2(hs).register(http_server) + UsernameAvailableRestServlet(hs).register(http_server) + UserReplaceMasterCrossSigningKeyRestServlet(hs).register(http_server) + DevicesGetRestServlet(hs).register(http_server) diff --git a/synapse/rest/admin/devices.py b/synapse/rest/admin/devices.py
index 449b066923..125ed8c491 100644 --- a/synapse/rest/admin/devices.py +++ b/synapse/rest/admin/devices.py
@@ -113,18 +113,19 @@ class DeviceRestServlet(RestServlet): return HTTPStatus.OK, {} -class DevicesRestServlet(RestServlet): +class DevicesGetRestServlet(RestServlet): """ Retrieve the given user's devices + + This can be mounted on workers as it is read-only, as opposed + to `DevicesRestServlet`. """ PATTERNS = admin_patterns("/users/(?P<user_id>[^/]*)/devices$", "v2") def __init__(self, hs: "HomeServer"): self.auth = hs.get_auth() - handler = hs.get_device_handler() - assert isinstance(handler, DeviceHandler) - self.device_handler = handler + self.device_worker_handler = hs.get_device_handler() self.store = hs.get_datastores().main self.is_mine = hs.is_mine @@ -141,9 +142,24 @@ class DevicesRestServlet(RestServlet): if u is None: raise NotFoundError("Unknown user") - devices = await self.device_handler.get_devices_by_user(target_user.to_string()) + devices = await self.device_worker_handler.get_devices_by_user( + target_user.to_string() + ) return HTTPStatus.OK, {"devices": devices, "total": len(devices)} + +class DevicesRestServlet(DevicesGetRestServlet): + """ + Retrieve the given user's devices + """ + + PATTERNS = admin_patterns("/users/(?P<user_id>[^/]*)/devices$", "v2") + + def __init__(self, hs: "HomeServer"): + super().__init__(hs) + assert isinstance(self.device_worker_handler, DeviceHandler) + self.device_handler = self.device_worker_handler + async def on_POST( self, request: SynapseRequest, user_id: str ) -> Tuple[int, JsonDict]: diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py
index b4c7069958..341e7014d6 100644 --- a/synapse/storage/databases/main/end_to_end_keys.py +++ b/synapse/storage/databases/main/end_to_end_keys.py
@@ -1501,6 +1501,45 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker "delete_old_otks_for_next_user_batch", impl ) + async def allow_master_cross_signing_key_replacement_without_uia( + self, user_id: str, duration_ms: int + ) -> Optional[int]: + """Mark this user's latest master key as being replaceable without UIA. + + Said replacement will only be permitted for a short time after calling this + function. That time period is controlled by the duration argument. + + Returns: + None, if there is no such key. + Otherwise, the timestamp before which replacement is allowed without UIA. + """ + timestamp = self._clock.time_msec() + duration_ms + + def impl(txn: LoggingTransaction) -> Optional[int]: + txn.execute( + """ + UPDATE e2e_cross_signing_keys + SET updatable_without_uia_before_ms = ? + WHERE stream_id = ( + SELECT stream_id + FROM e2e_cross_signing_keys + WHERE user_id = ? AND keytype = 'master' + ORDER BY stream_id DESC + LIMIT 1 + ) + """, + (timestamp, user_id), + ) + if txn.rowcount == 0: + return None + + return timestamp + + return await self.db_pool.runInteraction( + "allow_master_cross_signing_key_replacement_without_uia", + impl, + ) + class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore): def __init__( @@ -1755,42 +1794,3 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore): ], desc="add_e2e_signing_key", ) - - async def allow_master_cross_signing_key_replacement_without_uia( - self, user_id: str, duration_ms: int - ) -> Optional[int]: - """Mark this user's latest master key as being replaceable without UIA. - - Said replacement will only be permitted for a short time after calling this - function. That time period is controlled by the duration argument. - - Returns: - None, if there is no such key. - Otherwise, the timestamp before which replacement is allowed without UIA. - """ - timestamp = self._clock.time_msec() + duration_ms - - def impl(txn: LoggingTransaction) -> Optional[int]: - txn.execute( - """ - UPDATE e2e_cross_signing_keys - SET updatable_without_uia_before_ms = ? - WHERE stream_id = ( - SELECT stream_id - FROM e2e_cross_signing_keys - WHERE user_id = ? AND keytype = 'master' - ORDER BY stream_id DESC - LIMIT 1 - ) - """, - (timestamp, user_id), - ) - if txn.rowcount == 0: - return None - - return timestamp - - return await self.db_pool.runInteraction( - "allow_master_cross_signing_key_replacement_without_uia", - impl, - ) diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py
index c43f31353b..1aeae951c5 100644 --- a/synapse/storage/databases/main/registration.py +++ b/synapse/storage/databases/main/registration.py
@@ -2105,6 +2105,136 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): func=is_user_approved_txn, ) + async def set_user_deactivated_status( + self, user_id: str, deactivated: bool + ) -> None: + """Set the `deactivated` property for the provided user to the provided value. + + Args: + user_id: The ID of the user to set the status for. + deactivated: The value to set for `deactivated`. + """ + + await self.db_pool.runInteraction( + "set_user_deactivated_status", + self.set_user_deactivated_status_txn, + user_id, + deactivated, + ) + + def set_user_deactivated_status_txn( + self, txn: LoggingTransaction, user_id: str, deactivated: bool + ) -> None: + self.db_pool.simple_update_one_txn( + txn=txn, + table="users", + keyvalues={"name": user_id}, + updatevalues={"deactivated": 1 if deactivated else 0}, + ) + self._invalidate_cache_and_stream( + txn, self.get_user_deactivated_status, (user_id,) + ) + self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,)) + self._invalidate_cache_and_stream(txn, self.is_guest, (user_id,)) + + async def set_user_suspended_status(self, user_id: str, suspended: bool) -> None: + """ + Set whether the user's account is suspended in the `users` table. + + Args: + user_id: The user ID of the user in question + suspended: True if the user is suspended, false if not + """ + await self.db_pool.runInteraction( + "set_user_suspended_status", + self.set_user_suspended_status_txn, + user_id, + suspended, + ) + + def set_user_suspended_status_txn( + self, txn: LoggingTransaction, user_id: str, suspended: bool + ) -> None: + self.db_pool.simple_update_one_txn( + txn=txn, + table="users", + keyvalues={"name": user_id}, + updatevalues={"suspended": suspended}, + ) + self._invalidate_cache_and_stream( + txn, self.get_user_suspended_status, (user_id,) + ) + self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,)) + + async def set_user_locked_status(self, user_id: str, locked: bool) -> None: + """Set the `locked` property for the provided user to the provided value. + + Args: + user_id: The ID of the user to set the status for. + locked: The value to set for `locked`. + """ + + await self.db_pool.runInteraction( + "set_user_locked_status", + self.set_user_locked_status_txn, + user_id, + locked, + ) + + def set_user_locked_status_txn( + self, txn: LoggingTransaction, user_id: str, locked: bool + ) -> None: + self.db_pool.simple_update_one_txn( + txn=txn, + table="users", + keyvalues={"name": user_id}, + updatevalues={"locked": locked}, + ) + self._invalidate_cache_and_stream(txn, self.get_user_locked_status, (user_id,)) + self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,)) + + async def update_user_approval_status( + self, user_id: UserID, approved: bool + ) -> None: + """Set the user's 'approved' flag to the given value. + + The boolean will be turned into an int (in update_user_approval_status_txn) + because the column is a smallint. + + Args: + user_id: the user to update the flag for. + approved: the value to set the flag to. + """ + await self.db_pool.runInteraction( + "update_user_approval_status", + self.update_user_approval_status_txn, + user_id.to_string(), + approved, + ) + + def update_user_approval_status_txn( + self, txn: LoggingTransaction, user_id: str, approved: bool + ) -> None: + """Set the user's 'approved' flag to the given value. + + The boolean is turned into an int because the column is a smallint. + + Args: + txn: the current database transaction. + user_id: the user to update the flag for. + approved: the value to set the flag to. + """ + self.db_pool.simple_update_one_txn( + txn=txn, + table="users", + keyvalues={"name": user_id}, + updatevalues={"approved": approved}, + ) + + # Invalidate the caches of methods that read the value of the 'approved' flag. + self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,)) + self._invalidate_cache_and_stream(txn, self.is_user_approved, (user_id,)) + class RegistrationBackgroundUpdateStore(RegistrationWorkerStore): def __init__( @@ -2217,117 +2347,6 @@ class RegistrationBackgroundUpdateStore(RegistrationWorkerStore): return nb_processed - async def set_user_deactivated_status( - self, user_id: str, deactivated: bool - ) -> None: - """Set the `deactivated` property for the provided user to the provided value. - - Args: - user_id: The ID of the user to set the status for. - deactivated: The value to set for `deactivated`. - """ - - await self.db_pool.runInteraction( - "set_user_deactivated_status", - self.set_user_deactivated_status_txn, - user_id, - deactivated, - ) - - def set_user_deactivated_status_txn( - self, txn: LoggingTransaction, user_id: str, deactivated: bool - ) -> None: - self.db_pool.simple_update_one_txn( - txn=txn, - table="users", - keyvalues={"name": user_id}, - updatevalues={"deactivated": 1 if deactivated else 0}, - ) - self._invalidate_cache_and_stream( - txn, self.get_user_deactivated_status, (user_id,) - ) - self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,)) - txn.call_after(self.is_guest.invalidate, (user_id,)) - - async def set_user_suspended_status(self, user_id: str, suspended: bool) -> None: - """ - Set whether the user's account is suspended in the `users` table. - - Args: - user_id: The user ID of the user in question - suspended: True if the user is suspended, false if not - """ - await self.db_pool.runInteraction( - "set_user_suspended_status", - self.set_user_suspended_status_txn, - user_id, - suspended, - ) - - def set_user_suspended_status_txn( - self, txn: LoggingTransaction, user_id: str, suspended: bool - ) -> None: - self.db_pool.simple_update_one_txn( - txn=txn, - table="users", - keyvalues={"name": user_id}, - updatevalues={"suspended": suspended}, - ) - self._invalidate_cache_and_stream( - txn, self.get_user_suspended_status, (user_id,) - ) - self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,)) - - async def set_user_locked_status(self, user_id: str, locked: bool) -> None: - """Set the `locked` property for the provided user to the provided value. - - Args: - user_id: The ID of the user to set the status for. - locked: The value to set for `locked`. - """ - - await self.db_pool.runInteraction( - "set_user_locked_status", - self.set_user_locked_status_txn, - user_id, - locked, - ) - - def set_user_locked_status_txn( - self, txn: LoggingTransaction, user_id: str, locked: bool - ) -> None: - self.db_pool.simple_update_one_txn( - txn=txn, - table="users", - keyvalues={"name": user_id}, - updatevalues={"locked": locked}, - ) - self._invalidate_cache_and_stream(txn, self.get_user_locked_status, (user_id,)) - self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,)) - - def update_user_approval_status_txn( - self, txn: LoggingTransaction, user_id: str, approved: bool - ) -> None: - """Set the user's 'approved' flag to the given value. - - The boolean is turned into an int because the column is a smallint. - - Args: - txn: the current database transaction. - user_id: the user to update the flag for. - approved: the value to set the flag to. - """ - self.db_pool.simple_update_one_txn( - txn=txn, - table="users", - keyvalues={"name": user_id}, - updatevalues={"approved": approved}, - ) - - # Invalidate the caches of methods that read the value of the 'approved' flag. - self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,)) - self._invalidate_cache_and_stream(txn, self.is_user_approved, (user_id,)) - class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore): def __init__( @@ -2956,25 +2975,6 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore): start_or_continue_validation_session_txn, ) - async def update_user_approval_status( - self, user_id: UserID, approved: bool - ) -> None: - """Set the user's 'approved' flag to the given value. - - The boolean will be turned into an int (in update_user_approval_status_txn) - because the column is a smallint. - - Args: - user_id: the user to update the flag for. - approved: the value to set the flag to. - """ - await self.db_pool.runInteraction( - "update_user_approval_status", - self.update_user_approval_status_txn, - user_id.to_string(), - approved, - ) - @wrap_as_background_process("delete_expired_login_tokens") async def _delete_expired_login_tokens(self) -> None: """Remove login tokens with expiry dates that have passed."""