From d790d0d314c64c6c03ee88e74853a59e2781d6ae Mon Sep 17 00:00:00 2001 From: Dirk Klimpel <5740567+dklimpel@users.noreply.github.com> Date: Wed, 3 Mar 2021 14:09:39 +0100 Subject: Add type hints to user admin API. (#9521) --- synapse/rest/admin/users.py | 85 +++++++++++++++++++++++++++++---------------- 1 file changed, 56 insertions(+), 29 deletions(-) (limited to 'synapse/rest') diff --git a/synapse/rest/admin/users.py b/synapse/rest/admin/users.py index 9c701c7348..267a993430 100644 --- a/synapse/rest/admin/users.py +++ b/synapse/rest/admin/users.py @@ -16,7 +16,7 @@ import hashlib import hmac import logging from http import HTTPStatus -from typing import TYPE_CHECKING, Tuple +from typing import TYPE_CHECKING, Dict, List, Optional, Tuple from synapse.api.constants import UserTypes from synapse.api.errors import Codes, NotFoundError, SynapseError @@ -47,13 +47,15 @@ logger = logging.getLogger(__name__) class UsersRestServlet(RestServlet): PATTERNS = admin_patterns("/users/(?P[^/]*)$") - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): self.hs = hs self.store = hs.get_datastore() self.auth = hs.get_auth() self.admin_handler = hs.get_admin_handler() - async def on_GET(self, request, user_id): + async def on_GET( + self, request: SynapseRequest, user_id: str + ) -> Tuple[int, List[JsonDict]]: target_user = UserID.from_string(user_id) await assert_requester_is_admin(self.auth, request) @@ -153,7 +155,7 @@ class UserRestServletV2(RestServlet): otherwise an error. """ - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): self.hs = hs self.auth = hs.get_auth() self.admin_handler = hs.get_admin_handler() @@ -165,7 +167,9 @@ class UserRestServletV2(RestServlet): self.registration_handler = hs.get_registration_handler() self.pusher_pool = hs.get_pusherpool() - async def on_GET(self, request, user_id): + async def on_GET( + self, request: SynapseRequest, user_id: str + ) -> Tuple[int, JsonDict]: await assert_requester_is_admin(self.auth, request) target_user = UserID.from_string(user_id) @@ -179,7 +183,9 @@ class UserRestServletV2(RestServlet): return 200, ret - async def on_PUT(self, request, user_id): + async def on_PUT( + self, request: SynapseRequest, user_id: str + ) -> Tuple[int, JsonDict]: requester = await self.auth.get_user_by_req(request) await assert_user_is_admin(self.auth, requester.user) @@ -273,6 +279,8 @@ class UserRestServletV2(RestServlet): ) user = await self.admin_handler.get_user(target_user) + assert user is not None + return 200, user else: # create user @@ -330,9 +338,10 @@ class UserRestServletV2(RestServlet): target_user, requester, body["avatar_url"], True ) - ret = await self.admin_handler.get_user(target_user) + user = await self.admin_handler.get_user(target_user) + assert user is not None - return 201, ret + return 201, user class UserRegisterServlet(RestServlet): @@ -346,10 +355,10 @@ class UserRegisterServlet(RestServlet): PATTERNS = admin_patterns("/register") NONCE_TIMEOUT = 60 - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): self.auth_handler = hs.get_auth_handler() self.reactor = hs.get_reactor() - self.nonces = {} + self.nonces = {} # type: Dict[str, int] self.hs = hs def _clear_old_nonces(self): @@ -362,7 +371,7 @@ class UserRegisterServlet(RestServlet): if now - v > self.NONCE_TIMEOUT: del self.nonces[k] - def on_GET(self, request): + def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]: """ Generate a new nonce. """ @@ -372,7 +381,7 @@ class UserRegisterServlet(RestServlet): self.nonces[nonce] = int(self.reactor.seconds()) return 200, {"nonce": nonce} - async def on_POST(self, request): + async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]: self._clear_old_nonces() if not self.hs.config.registration_shared_secret: @@ -478,12 +487,14 @@ class WhoisRestServlet(RestServlet): client_patterns("/admin" + path_regex, v1=True) ) - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): self.hs = hs self.auth = hs.get_auth() self.admin_handler = hs.get_admin_handler() - async def on_GET(self, request, user_id): + async def on_GET( + self, request: SynapseRequest, user_id: str + ) -> Tuple[int, JsonDict]: target_user = UserID.from_string(user_id) requester = await self.auth.get_user_by_req(request) auth_user = requester.user @@ -508,7 +519,9 @@ class DeactivateAccountRestServlet(RestServlet): self.is_mine = hs.is_mine self.store = hs.get_datastore() - async def on_POST(self, request: str, target_user_id: str) -> Tuple[int, JsonDict]: + async def on_POST( + self, request: SynapseRequest, target_user_id: str + ) -> Tuple[int, JsonDict]: requester = await self.auth.get_user_by_req(request) await assert_user_is_admin(self.auth, requester.user) @@ -550,7 +563,7 @@ class AccountValidityRenewServlet(RestServlet): self.account_activity_handler = hs.get_account_validity_handler() self.auth = hs.get_auth() - async def on_POST(self, request): + async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]: await assert_requester_is_admin(self.auth, request) body = parse_json_object_from_request(request) @@ -584,14 +597,16 @@ class ResetPasswordRestServlet(RestServlet): PATTERNS = admin_patterns("/reset_password/(?P[^/]*)") - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): self.store = hs.get_datastore() self.hs = hs self.auth = hs.get_auth() self.auth_handler = hs.get_auth_handler() self._set_password_handler = hs.get_set_password_handler() - async def on_POST(self, request, target_user_id): + async def on_POST( + self, request: SynapseRequest, target_user_id: str + ) -> Tuple[int, JsonDict]: """Post request to allow an administrator reset password for a user. This needs user to have administrator access in Synapse. """ @@ -626,12 +641,14 @@ class SearchUsersRestServlet(RestServlet): PATTERNS = admin_patterns("/search_users/(?P[^/]*)") - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): self.hs = hs self.store = hs.get_datastore() self.auth = hs.get_auth() - async def on_GET(self, request, target_user_id): + async def on_GET( + self, request: SynapseRequest, target_user_id: str + ) -> Tuple[int, Optional[List[JsonDict]]]: """Get request to search user table for specific users according to search term. This needs user to have a administrator access in Synapse. @@ -682,12 +699,14 @@ class UserAdminServlet(RestServlet): PATTERNS = admin_patterns("/users/(?P[^/]*)/admin$") - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): self.hs = hs self.store = hs.get_datastore() self.auth = hs.get_auth() - async def on_GET(self, request, user_id): + async def on_GET( + self, request: SynapseRequest, user_id: str + ) -> Tuple[int, JsonDict]: await assert_requester_is_admin(self.auth, request) target_user = UserID.from_string(user_id) @@ -699,7 +718,9 @@ class UserAdminServlet(RestServlet): return 200, {"admin": is_admin} - async def on_PUT(self, request, user_id): + async def on_PUT( + self, request: SynapseRequest, user_id: str + ) -> Tuple[int, JsonDict]: requester = await self.auth.get_user_by_req(request) await assert_user_is_admin(self.auth, requester.user) auth_user = requester.user @@ -730,12 +751,14 @@ class UserMembershipRestServlet(RestServlet): PATTERNS = admin_patterns("/users/(?P[^/]+)/joined_rooms$") - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): self.is_mine = hs.is_mine self.auth = hs.get_auth() self.store = hs.get_datastore() - async def on_GET(self, request, user_id): + async def on_GET( + self, request: SynapseRequest, user_id: str + ) -> Tuple[int, JsonDict]: await assert_requester_is_admin(self.auth, request) room_ids = await self.store.get_rooms_for_user(user_id) @@ -758,7 +781,7 @@ class PushersRestServlet(RestServlet): PATTERNS = admin_patterns("/users/(?P[^/]*)/pushers$") - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): self.is_mine = hs.is_mine self.store = hs.get_datastore() self.auth = hs.get_auth() @@ -799,7 +822,7 @@ class UserMediaRestServlet(RestServlet): PATTERNS = admin_patterns("/users/(?P[^/]+)/media$") - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): self.is_mine = hs.is_mine self.auth = hs.get_auth() self.store = hs.get_datastore() @@ -891,7 +914,9 @@ class UserTokenRestServlet(RestServlet): self.auth = hs.get_auth() self.auth_handler = hs.get_auth_handler() - async def on_POST(self, request, user_id): + async def on_POST( + self, request: SynapseRequest, user_id: str + ) -> Tuple[int, JsonDict]: requester = await self.auth.get_user_by_req(request) await assert_user_is_admin(self.auth, requester.user) auth_user = requester.user @@ -943,7 +968,9 @@ class ShadowBanRestServlet(RestServlet): self.store = hs.get_datastore() self.auth = hs.get_auth() - async def on_POST(self, request, user_id): + async def on_POST( + self, request: SynapseRequest, user_id: str + ) -> Tuple[int, JsonDict]: await assert_requester_is_admin(self.auth, request) if not self.hs.is_mine_id(user_id): -- cgit 1.4.1