diff --git a/changelog.d/18313.misc b/changelog.d/18313.misc
new file mode 100644
index 0000000000..febf3ac06e
--- /dev/null
+++ b/changelog.d/18313.misc
@@ -0,0 +1 @@
+Allow a few admin APIs used by matrix-authentication-service to run on workers.
diff --git a/docs/workers.md b/docs/workers.md
index 2597e78217..45a00696f3 100644
--- a/docs/workers.md
+++ b/docs/workers.md
@@ -323,6 +323,15 @@ For multiple workers not handling the SSO endpoints properly, see
[#7530](https://github.com/matrix-org/synapse/issues/7530) and
[#9427](https://github.com/matrix-org/synapse/issues/9427).
+Additionally, when MSC3861 is enabled (`experimental_features.msc3861.enabled`
+set to `true`), the following endpoints can be handled by the worker:
+
+ ^/_synapse/admin/v2/users/[^/]+$
+ ^/_synapse/admin/v1/username_available$
+ ^/_synapse/admin/v1/users/[^/]+/_allow_cross_signing_replacement_without_uia$
+ # Only the GET method:
+ ^/_synapse/admin/v1/users/[^/]+/devices$
+
Note that a [HTTP listener](usage/configuration/config_documentation.md#listeners)
with `client` and `federation` `resources` must be configured in the
[`worker_listeners`](usage/configuration/config_documentation.md#worker_listeners)
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index e4120ed424..f495d5b7e4 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -51,8 +51,7 @@ from synapse.http.server import JsonResource, OptionsResource
from synapse.logging.context import LoggingContext
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource
-from synapse.rest import ClientRestResource
-from synapse.rest.admin import AdminRestResource, register_servlets_for_media_repo
+from synapse.rest import ClientRestResource, admin
from synapse.rest.health import HealthResource
from synapse.rest.key.v2 import KeyResource
from synapse.rest.synapse.client import build_synapse_client_resource_tree
@@ -176,8 +175,13 @@ class GenericWorkerServer(HomeServer):
def _listen_http(self, listener_config: ListenerConfig) -> None:
assert listener_config.http_options is not None
- # We always include a health resource.
- resources: Dict[str, Resource] = {"/health": HealthResource()}
+ # We always include an admin resource that we populate with servlets as needed
+ admin_resource = JsonResource(self, canonical_json=False)
+ resources: Dict[str, Resource] = {
+ # We always include a health resource.
+ "/health": HealthResource(),
+ "/_synapse/admin": admin_resource,
+ }
for res in listener_config.http_options.resources:
for name in res.names:
@@ -190,7 +194,7 @@ class GenericWorkerServer(HomeServer):
resources.update(build_synapse_client_resource_tree(self))
resources["/.well-known"] = well_known_resource(self)
- resources["/_synapse/admin"] = AdminRestResource(self)
+ admin.register_servlets(self, admin_resource)
elif name == "federation":
resources[FEDERATION_PREFIX] = TransportLayerServer(self)
@@ -200,15 +204,13 @@ class GenericWorkerServer(HomeServer):
# We need to serve the admin servlets for media on the
# worker.
- admin_resource = JsonResource(self, canonical_json=False)
- register_servlets_for_media_repo(self, admin_resource)
+ admin.register_servlets_for_media_repo(self, admin_resource)
resources.update(
{
MEDIA_R0_PREFIX: media_repo,
MEDIA_V3_PREFIX: media_repo,
LEGACY_MEDIA_PREFIX: media_repo,
- "/_synapse/admin": admin_resource,
}
)
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 2a824e8457..6da2194cf7 100644
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -54,6 +54,7 @@ from synapse.config.server import ListenerConfig, TCPListenerConfig
from synapse.federation.transport.server import TransportLayerServer
from synapse.http.additional_resource import AdditionalResource
from synapse.http.server import (
+ JsonResource,
OptionsResource,
RootOptionsRedirectResource,
StaticResource,
@@ -61,8 +62,7 @@ from synapse.http.server import (
from synapse.logging.context import LoggingContext
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource
-from synapse.rest import ClientRestResource
-from synapse.rest.admin import AdminRestResource
+from synapse.rest import ClientRestResource, admin
from synapse.rest.health import HealthResource
from synapse.rest.key.v2 import KeyResource
from synapse.rest.synapse.client import build_synapse_client_resource_tree
@@ -180,11 +180,14 @@ class SynapseHomeServer(HomeServer):
if compress:
client_resource = gz_wrap(client_resource)
+ admin_resource = JsonResource(self, canonical_json=False)
+ admin.register_servlets(self, admin_resource)
+
resources.update(
{
CLIENT_API_PREFIX: client_resource,
"/.well-known": well_known_resource(self),
- "/_synapse/admin": AdminRestResource(self),
+ "/_synapse/admin": admin_resource,
**build_synapse_client_resource_tree(self),
}
)
diff --git a/synapse/handlers/set_password.py b/synapse/handlers/set_password.py
index 29cc03d71d..94301add9e 100644
--- a/synapse/handlers/set_password.py
+++ b/synapse/handlers/set_password.py
@@ -36,10 +36,17 @@ class SetPasswordHandler:
def __init__(self, hs: "HomeServer"):
self.store = hs.get_datastores().main
self._auth_handler = hs.get_auth_handler()
- # This can only be instantiated on the main process.
- device_handler = hs.get_device_handler()
- assert isinstance(device_handler, DeviceHandler)
- self._device_handler = device_handler
+
+ # We don't need the device handler if password changing is disabled.
+ # This allows us to instantiate the SetPasswordHandler on the workers
+ # that have admin APIs for MAS
+ if self._auth_handler.can_change_password():
+ # This can only be instantiated on the main process.
+ device_handler = hs.get_device_handler()
+ assert isinstance(device_handler, DeviceHandler)
+ self._device_handler: Optional[DeviceHandler] = device_handler
+ else:
+ self._device_handler = None
async def set_password(
self,
@@ -51,6 +58,9 @@ class SetPasswordHandler:
if not self._auth_handler.can_change_password():
raise SynapseError(403, "Password change disabled", errcode=Codes.FORBIDDEN)
+ # We should have this available only if password changing is enabled.
+ assert self._device_handler is not None
+
try:
await self.store.user_set_password_hash(user_id, password_hash)
except StoreError as e:
diff --git a/synapse/rest/__init__.py b/synapse/rest/__init__.py
index 2f1ef84e26..00f108de08 100644
--- a/synapse/rest/__init__.py
+++ b/synapse/rest/__init__.py
@@ -187,7 +187,6 @@ class ClientRestResource(JsonResource):
mutual_rooms.register_servlets,
login_token_request.register_servlets,
rendezvous.register_servlets,
- auth_metadata.register_servlets,
]:
continue
diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py
index cf809d1a27..b1335fed66 100644
--- a/synapse/rest/admin/__init__.py
+++ b/synapse/rest/admin/__init__.py
@@ -39,7 +39,7 @@ from typing import TYPE_CHECKING, Optional, Tuple
from synapse.api.errors import Codes, NotFoundError, SynapseError
from synapse.handlers.pagination import PURGE_HISTORY_ACTION_NAME
-from synapse.http.server import HttpServer, JsonResource
+from synapse.http.server import HttpServer
from synapse.http.servlet import RestServlet, parse_json_object_from_request
from synapse.http.site import SynapseRequest
from synapse.rest.admin._base import admin_patterns, assert_requester_is_admin
@@ -51,6 +51,7 @@ from synapse.rest.admin.background_updates import (
from synapse.rest.admin.devices import (
DeleteDevicesRestServlet,
DeviceRestServlet,
+ DevicesGetRestServlet,
DevicesRestServlet,
)
from synapse.rest.admin.event_reports import (
@@ -264,14 +265,6 @@ class PurgeHistoryStatusRestServlet(RestServlet):
########################################################################################
-class AdminRestResource(JsonResource):
- """The REST resource which gets mounted at /_synapse/admin"""
-
- def __init__(self, hs: "HomeServer"):
- JsonResource.__init__(self, hs, canonical_json=False)
- register_servlets(hs, self)
-
-
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
"""
Register all the admin servlets.
@@ -280,6 +273,10 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
# Admin servlets below may not work on workers.
if hs.config.worker.worker_app is not None:
+ # Some admin servlets can be mounted on workers when MSC3861 is enabled.
+ if hs.config.experimental.msc3861.enabled:
+ register_servlets_for_msc3861_delegation(hs, http_server)
+
return
register_servlets_for_client_rest_resource(hs, http_server)
@@ -367,4 +364,16 @@ def register_servlets_for_client_rest_resource(
ListMediaInRoom(hs).register(http_server)
# don't add more things here: new servlets should only be exposed on
- # /_synapse/admin so should not go here. Instead register them in AdminRestResource.
+ # /_synapse/admin so should not go here. Instead register them in register_servlets.
+
+
+def register_servlets_for_msc3861_delegation(
+ hs: "HomeServer", http_server: HttpServer
+) -> None:
+ """Register servlets needed by MAS when MSC3861 is enabled"""
+ assert hs.config.experimental.msc3861.enabled
+
+ UserRestServletV2(hs).register(http_server)
+ UsernameAvailableRestServlet(hs).register(http_server)
+ UserReplaceMasterCrossSigningKeyRestServlet(hs).register(http_server)
+ DevicesGetRestServlet(hs).register(http_server)
diff --git a/synapse/rest/admin/devices.py b/synapse/rest/admin/devices.py
index 449b066923..125ed8c491 100644
--- a/synapse/rest/admin/devices.py
+++ b/synapse/rest/admin/devices.py
@@ -113,18 +113,19 @@ class DeviceRestServlet(RestServlet):
return HTTPStatus.OK, {}
-class DevicesRestServlet(RestServlet):
+class DevicesGetRestServlet(RestServlet):
"""
Retrieve the given user's devices
+
+ This can be mounted on workers as it is read-only, as opposed
+ to `DevicesRestServlet`.
"""
PATTERNS = admin_patterns("/users/(?P<user_id>[^/]*)/devices$", "v2")
def __init__(self, hs: "HomeServer"):
self.auth = hs.get_auth()
- handler = hs.get_device_handler()
- assert isinstance(handler, DeviceHandler)
- self.device_handler = handler
+ self.device_worker_handler = hs.get_device_handler()
self.store = hs.get_datastores().main
self.is_mine = hs.is_mine
@@ -141,9 +142,24 @@ class DevicesRestServlet(RestServlet):
if u is None:
raise NotFoundError("Unknown user")
- devices = await self.device_handler.get_devices_by_user(target_user.to_string())
+ devices = await self.device_worker_handler.get_devices_by_user(
+ target_user.to_string()
+ )
return HTTPStatus.OK, {"devices": devices, "total": len(devices)}
+
+class DevicesRestServlet(DevicesGetRestServlet):
+ """
+ Retrieve the given user's devices
+ """
+
+ PATTERNS = admin_patterns("/users/(?P<user_id>[^/]*)/devices$", "v2")
+
+ def __init__(self, hs: "HomeServer"):
+ super().__init__(hs)
+ assert isinstance(self.device_worker_handler, DeviceHandler)
+ self.device_handler = self.device_worker_handler
+
async def on_POST(
self, request: SynapseRequest, user_id: str
) -> Tuple[int, JsonDict]:
diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py
index b4c7069958..341e7014d6 100644
--- a/synapse/storage/databases/main/end_to_end_keys.py
+++ b/synapse/storage/databases/main/end_to_end_keys.py
@@ -1501,6 +1501,45 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
"delete_old_otks_for_next_user_batch", impl
)
+ async def allow_master_cross_signing_key_replacement_without_uia(
+ self, user_id: str, duration_ms: int
+ ) -> Optional[int]:
+ """Mark this user's latest master key as being replaceable without UIA.
+
+ Said replacement will only be permitted for a short time after calling this
+ function. That time period is controlled by the duration argument.
+
+ Returns:
+ None, if there is no such key.
+ Otherwise, the timestamp before which replacement is allowed without UIA.
+ """
+ timestamp = self._clock.time_msec() + duration_ms
+
+ def impl(txn: LoggingTransaction) -> Optional[int]:
+ txn.execute(
+ """
+ UPDATE e2e_cross_signing_keys
+ SET updatable_without_uia_before_ms = ?
+ WHERE stream_id = (
+ SELECT stream_id
+ FROM e2e_cross_signing_keys
+ WHERE user_id = ? AND keytype = 'master'
+ ORDER BY stream_id DESC
+ LIMIT 1
+ )
+ """,
+ (timestamp, user_id),
+ )
+ if txn.rowcount == 0:
+ return None
+
+ return timestamp
+
+ return await self.db_pool.runInteraction(
+ "allow_master_cross_signing_key_replacement_without_uia",
+ impl,
+ )
+
class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
def __init__(
@@ -1755,42 +1794,3 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
],
desc="add_e2e_signing_key",
)
-
- async def allow_master_cross_signing_key_replacement_without_uia(
- self, user_id: str, duration_ms: int
- ) -> Optional[int]:
- """Mark this user's latest master key as being replaceable without UIA.
-
- Said replacement will only be permitted for a short time after calling this
- function. That time period is controlled by the duration argument.
-
- Returns:
- None, if there is no such key.
- Otherwise, the timestamp before which replacement is allowed without UIA.
- """
- timestamp = self._clock.time_msec() + duration_ms
-
- def impl(txn: LoggingTransaction) -> Optional[int]:
- txn.execute(
- """
- UPDATE e2e_cross_signing_keys
- SET updatable_without_uia_before_ms = ?
- WHERE stream_id = (
- SELECT stream_id
- FROM e2e_cross_signing_keys
- WHERE user_id = ? AND keytype = 'master'
- ORDER BY stream_id DESC
- LIMIT 1
- )
- """,
- (timestamp, user_id),
- )
- if txn.rowcount == 0:
- return None
-
- return timestamp
-
- return await self.db_pool.runInteraction(
- "allow_master_cross_signing_key_replacement_without_uia",
- impl,
- )
diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py
index c43f31353b..1aeae951c5 100644
--- a/synapse/storage/databases/main/registration.py
+++ b/synapse/storage/databases/main/registration.py
@@ -2105,6 +2105,136 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
func=is_user_approved_txn,
)
+ async def set_user_deactivated_status(
+ self, user_id: str, deactivated: bool
+ ) -> None:
+ """Set the `deactivated` property for the provided user to the provided value.
+
+ Args:
+ user_id: The ID of the user to set the status for.
+ deactivated: The value to set for `deactivated`.
+ """
+
+ await self.db_pool.runInteraction(
+ "set_user_deactivated_status",
+ self.set_user_deactivated_status_txn,
+ user_id,
+ deactivated,
+ )
+
+ def set_user_deactivated_status_txn(
+ self, txn: LoggingTransaction, user_id: str, deactivated: bool
+ ) -> None:
+ self.db_pool.simple_update_one_txn(
+ txn=txn,
+ table="users",
+ keyvalues={"name": user_id},
+ updatevalues={"deactivated": 1 if deactivated else 0},
+ )
+ self._invalidate_cache_and_stream(
+ txn, self.get_user_deactivated_status, (user_id,)
+ )
+ self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
+ self._invalidate_cache_and_stream(txn, self.is_guest, (user_id,))
+
+ async def set_user_suspended_status(self, user_id: str, suspended: bool) -> None:
+ """
+ Set whether the user's account is suspended in the `users` table.
+
+ Args:
+ user_id: The user ID of the user in question
+ suspended: True if the user is suspended, false if not
+ """
+ await self.db_pool.runInteraction(
+ "set_user_suspended_status",
+ self.set_user_suspended_status_txn,
+ user_id,
+ suspended,
+ )
+
+ def set_user_suspended_status_txn(
+ self, txn: LoggingTransaction, user_id: str, suspended: bool
+ ) -> None:
+ self.db_pool.simple_update_one_txn(
+ txn=txn,
+ table="users",
+ keyvalues={"name": user_id},
+ updatevalues={"suspended": suspended},
+ )
+ self._invalidate_cache_and_stream(
+ txn, self.get_user_suspended_status, (user_id,)
+ )
+ self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
+
+ async def set_user_locked_status(self, user_id: str, locked: bool) -> None:
+ """Set the `locked` property for the provided user to the provided value.
+
+ Args:
+ user_id: The ID of the user to set the status for.
+ locked: The value to set for `locked`.
+ """
+
+ await self.db_pool.runInteraction(
+ "set_user_locked_status",
+ self.set_user_locked_status_txn,
+ user_id,
+ locked,
+ )
+
+ def set_user_locked_status_txn(
+ self, txn: LoggingTransaction, user_id: str, locked: bool
+ ) -> None:
+ self.db_pool.simple_update_one_txn(
+ txn=txn,
+ table="users",
+ keyvalues={"name": user_id},
+ updatevalues={"locked": locked},
+ )
+ self._invalidate_cache_and_stream(txn, self.get_user_locked_status, (user_id,))
+ self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
+
+ async def update_user_approval_status(
+ self, user_id: UserID, approved: bool
+ ) -> None:
+ """Set the user's 'approved' flag to the given value.
+
+ The boolean will be turned into an int (in update_user_approval_status_txn)
+ because the column is a smallint.
+
+ Args:
+ user_id: the user to update the flag for.
+ approved: the value to set the flag to.
+ """
+ await self.db_pool.runInteraction(
+ "update_user_approval_status",
+ self.update_user_approval_status_txn,
+ user_id.to_string(),
+ approved,
+ )
+
+ def update_user_approval_status_txn(
+ self, txn: LoggingTransaction, user_id: str, approved: bool
+ ) -> None:
+ """Set the user's 'approved' flag to the given value.
+
+ The boolean is turned into an int because the column is a smallint.
+
+ Args:
+ txn: the current database transaction.
+ user_id: the user to update the flag for.
+ approved: the value to set the flag to.
+ """
+ self.db_pool.simple_update_one_txn(
+ txn=txn,
+ table="users",
+ keyvalues={"name": user_id},
+ updatevalues={"approved": approved},
+ )
+
+ # Invalidate the caches of methods that read the value of the 'approved' flag.
+ self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
+ self._invalidate_cache_and_stream(txn, self.is_user_approved, (user_id,))
+
class RegistrationBackgroundUpdateStore(RegistrationWorkerStore):
def __init__(
@@ -2217,117 +2347,6 @@ class RegistrationBackgroundUpdateStore(RegistrationWorkerStore):
return nb_processed
- async def set_user_deactivated_status(
- self, user_id: str, deactivated: bool
- ) -> None:
- """Set the `deactivated` property for the provided user to the provided value.
-
- Args:
- user_id: The ID of the user to set the status for.
- deactivated: The value to set for `deactivated`.
- """
-
- await self.db_pool.runInteraction(
- "set_user_deactivated_status",
- self.set_user_deactivated_status_txn,
- user_id,
- deactivated,
- )
-
- def set_user_deactivated_status_txn(
- self, txn: LoggingTransaction, user_id: str, deactivated: bool
- ) -> None:
- self.db_pool.simple_update_one_txn(
- txn=txn,
- table="users",
- keyvalues={"name": user_id},
- updatevalues={"deactivated": 1 if deactivated else 0},
- )
- self._invalidate_cache_and_stream(
- txn, self.get_user_deactivated_status, (user_id,)
- )
- self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
- txn.call_after(self.is_guest.invalidate, (user_id,))
-
- async def set_user_suspended_status(self, user_id: str, suspended: bool) -> None:
- """
- Set whether the user's account is suspended in the `users` table.
-
- Args:
- user_id: The user ID of the user in question
- suspended: True if the user is suspended, false if not
- """
- await self.db_pool.runInteraction(
- "set_user_suspended_status",
- self.set_user_suspended_status_txn,
- user_id,
- suspended,
- )
-
- def set_user_suspended_status_txn(
- self, txn: LoggingTransaction, user_id: str, suspended: bool
- ) -> None:
- self.db_pool.simple_update_one_txn(
- txn=txn,
- table="users",
- keyvalues={"name": user_id},
- updatevalues={"suspended": suspended},
- )
- self._invalidate_cache_and_stream(
- txn, self.get_user_suspended_status, (user_id,)
- )
- self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
-
- async def set_user_locked_status(self, user_id: str, locked: bool) -> None:
- """Set the `locked` property for the provided user to the provided value.
-
- Args:
- user_id: The ID of the user to set the status for.
- locked: The value to set for `locked`.
- """
-
- await self.db_pool.runInteraction(
- "set_user_locked_status",
- self.set_user_locked_status_txn,
- user_id,
- locked,
- )
-
- def set_user_locked_status_txn(
- self, txn: LoggingTransaction, user_id: str, locked: bool
- ) -> None:
- self.db_pool.simple_update_one_txn(
- txn=txn,
- table="users",
- keyvalues={"name": user_id},
- updatevalues={"locked": locked},
- )
- self._invalidate_cache_and_stream(txn, self.get_user_locked_status, (user_id,))
- self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
-
- def update_user_approval_status_txn(
- self, txn: LoggingTransaction, user_id: str, approved: bool
- ) -> None:
- """Set the user's 'approved' flag to the given value.
-
- The boolean is turned into an int because the column is a smallint.
-
- Args:
- txn: the current database transaction.
- user_id: the user to update the flag for.
- approved: the value to set the flag to.
- """
- self.db_pool.simple_update_one_txn(
- txn=txn,
- table="users",
- keyvalues={"name": user_id},
- updatevalues={"approved": approved},
- )
-
- # Invalidate the caches of methods that read the value of the 'approved' flag.
- self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
- self._invalidate_cache_and_stream(txn, self.is_user_approved, (user_id,))
-
class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore):
def __init__(
@@ -2956,25 +2975,6 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore):
start_or_continue_validation_session_txn,
)
- async def update_user_approval_status(
- self, user_id: UserID, approved: bool
- ) -> None:
- """Set the user's 'approved' flag to the given value.
-
- The boolean will be turned into an int (in update_user_approval_status_txn)
- because the column is a smallint.
-
- Args:
- user_id: the user to update the flag for.
- approved: the value to set the flag to.
- """
- await self.db_pool.runInteraction(
- "update_user_approval_status",
- self.update_user_approval_status_txn,
- user_id.to_string(),
- approved,
- )
-
@wrap_as_background_process("delete_expired_login_tokens")
async def _delete_expired_login_tokens(self) -> None:
"""Remove login tokens with expiry dates that have passed."""
|