diff options
Diffstat (limited to 'synapse/rest/admin/_base.py')
-rw-r--r-- | synapse/rest/admin/_base.py | 58 |
1 files changed, 32 insertions, 26 deletions
diff --git a/synapse/rest/admin/_base.py b/synapse/rest/admin/_base.py index 5a9b08d3ef..d82eaf5e38 100644 --- a/synapse/rest/admin/_base.py +++ b/synapse/rest/admin/_base.py @@ -15,9 +15,11 @@ import re -from twisted.internet import defer +import twisted.web.server +import synapse.api.auth from synapse.api.errors import AuthError +from synapse.types import UserID def historical_admin_path_patterns(path_regex): @@ -31,7 +33,7 @@ def historical_admin_path_patterns(path_regex): Note that this should only be used for existing endpoints: new ones should just register for the /_synapse/admin path. """ - return list( + return [ re.compile(prefix + path_regex) for prefix in ( "^/_synapse/admin/v1", @@ -39,46 +41,50 @@ def historical_admin_path_patterns(path_regex): "^/_matrix/client/unstable/admin", "^/_matrix/client/r0/admin", ) - ) + ] -@defer.inlineCallbacks -def assert_requester_is_admin(auth, request): - """Verify that the requester is an admin user - - WARNING: MAKE SURE YOU YIELD ON THE RESULT! +def admin_patterns(path_regex: str): + """Returns the list of patterns for an admin endpoint Args: - auth (synapse.api.auth.Auth): - request (twisted.web.server.Request): incoming request + path_regex: The regex string to match. This should NOT have a ^ + as this will be prefixed. Returns: - Deferred + A list of regex patterns. + """ + admin_prefix = "^/_synapse/admin/v1" + patterns = [re.compile(admin_prefix + path_regex)] + return patterns + + +async def assert_requester_is_admin( + auth: synapse.api.auth.Auth, request: twisted.web.server.Request +) -> None: + """Verify that the requester is an admin user + + Args: + auth: api.auth.Auth singleton + request: incoming request Raises: - AuthError if the requester is not an admin + AuthError if the requester is not a server admin """ - requester = yield auth.get_user_by_req(request) - yield assert_user_is_admin(auth, requester.user) + requester = await auth.get_user_by_req(request) + await assert_user_is_admin(auth, requester.user) -@defer.inlineCallbacks -def assert_user_is_admin(auth, user_id): +async def assert_user_is_admin(auth: synapse.api.auth.Auth, user_id: UserID) -> None: """Verify that the given user is an admin user - WARNING: MAKE SURE YOU YIELD ON THE RESULT! - Args: - auth (synapse.api.auth.Auth): - user_id (UserID): - - Returns: - Deferred + auth: api.auth.Auth singleton + user_id: user to check Raises: - AuthError if the user is not an admin + AuthError if the user is not a server admin """ - - is_admin = yield auth.is_server_admin(user_id) + is_admin = await auth.is_server_admin(user_id) if not is_admin: raise AuthError(403, "You are not a server admin") |