diff options
Diffstat (limited to 'tests/rest/admin/test_user.py')
-rw-r--r-- | tests/rest/admin/test_user.py | 262 |
1 files changed, 133 insertions, 129 deletions
diff --git a/tests/rest/admin/test_user.py b/tests/rest/admin/test_user.py index 9711405735..272637e965 100644 --- a/tests/rest/admin/test_user.py +++ b/tests/rest/admin/test_user.py @@ -23,13 +23,17 @@ from unittest.mock import Mock, patch from parameterized import parameterized, parameterized_class +from twisted.test.proto_helpers import MemoryReactor + import synapse.rest.admin from synapse.api.constants import UserTypes from synapse.api.errors import Codes, HttpResponseException, ResourceLimitError from synapse.api.room_versions import RoomVersions from synapse.rest.client import devices, login, logout, profile, room, sync from synapse.rest.media.v1.filepath import MediaFilePaths +from synapse.server import HomeServer from synapse.types import JsonDict, UserID +from synapse.util import Clock from tests import unittest from tests.server import FakeSite, make_request @@ -44,7 +48,7 @@ class UserRegisterTestCase(unittest.HomeserverTestCase): profile.register_servlets, ] - def make_homeserver(self, reactor, clock): + def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer: self.url = "/_synapse/admin/v1/register" @@ -61,12 +65,12 @@ class UserRegisterTestCase(unittest.HomeserverTestCase): self.hs.config.registration.registration_shared_secret = "shared" - self.hs.get_media_repository = Mock() - self.hs.get_deactivate_account_handler = Mock() + self.hs.get_media_repository = Mock() # type: ignore[assignment] + self.hs.get_deactivate_account_handler = Mock() # type: ignore[assignment] return self.hs - def test_disabled(self): + def test_disabled(self) -> None: """ If there is no shared secret, registration through this method will be prevented. @@ -80,7 +84,7 @@ class UserRegisterTestCase(unittest.HomeserverTestCase): "Shared secret registration is not enabled", channel.json_body["error"] ) - def test_get_nonce(self): + def test_get_nonce(self) -> None: """ Calling GET on the endpoint will return a randomised nonce, using the homeserver's secrets provider. @@ -93,7 +97,7 @@ class UserRegisterTestCase(unittest.HomeserverTestCase): self.assertEqual(channel.json_body, {"nonce": "abcd"}) - def test_expired_nonce(self): + def test_expired_nonce(self) -> None: """ Calling GET on the endpoint will return a randomised nonce, which will only last for SALT_TIMEOUT (60s). @@ -118,7 +122,7 @@ class UserRegisterTestCase(unittest.HomeserverTestCase): self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body) self.assertEqual("unrecognised nonce", channel.json_body["error"]) - def test_register_incorrect_nonce(self): + def test_register_incorrect_nonce(self) -> None: """ Only the provided nonce can be used, as it's checked in the MAC. """ @@ -141,7 +145,7 @@ class UserRegisterTestCase(unittest.HomeserverTestCase): self.assertEqual(HTTPStatus.FORBIDDEN, channel.code, msg=channel.json_body) self.assertEqual("HMAC incorrect", channel.json_body["error"]) - def test_register_correct_nonce(self): + def test_register_correct_nonce(self) -> None: """ When the correct nonce is provided, and the right key is provided, the user is registered. @@ -168,7 +172,7 @@ class UserRegisterTestCase(unittest.HomeserverTestCase): self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body) self.assertEqual("@bob:test", channel.json_body["user_id"]) - def test_nonce_reuse(self): + def test_nonce_reuse(self) -> None: """ A valid unrecognised nonce. """ @@ -197,14 +201,14 @@ class UserRegisterTestCase(unittest.HomeserverTestCase): self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body) self.assertEqual("unrecognised nonce", channel.json_body["error"]) - def test_missing_parts(self): + def test_missing_parts(self) -> None: """ Synapse will complain if you don't give nonce, username, password, and mac. Admin and user_types are optional. Additional checks are done for length and type. """ - def nonce(): + def nonce() -> str: channel = self.make_request("GET", self.url) return channel.json_body["nonce"] @@ -297,7 +301,7 @@ class UserRegisterTestCase(unittest.HomeserverTestCase): self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body) self.assertEqual("Invalid user type", channel.json_body["error"]) - def test_displayname(self): + def test_displayname(self) -> None: """ Test that displayname of new user is set """ @@ -400,7 +404,7 @@ class UserRegisterTestCase(unittest.HomeserverTestCase): @override_config( {"limit_usage_by_mau": True, "max_mau_value": 2, "mau_trial_days": 0} ) - def test_register_mau_limit_reached(self): + def test_register_mau_limit_reached(self) -> None: """ Check we can register a user via the shared secret registration API even if the MAU limit is reached. @@ -450,13 +454,13 @@ class UsersListTestCase(unittest.HomeserverTestCase): ] url = "/_synapse/admin/v2/users" - def prepare(self, reactor, clock, hs): + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.store = hs.get_datastore() self.admin_user = self.register_user("admin", "pass", admin=True) self.admin_user_tok = self.login("admin", "pass") - def test_no_auth(self): + def test_no_auth(self) -> None: """ Try to list users without authentication. """ @@ -465,7 +469,7 @@ class UsersListTestCase(unittest.HomeserverTestCase): self.assertEqual(HTTPStatus.UNAUTHORIZED, channel.code, msg=channel.json_body) self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"]) - def test_requester_is_no_admin(self): + def test_requester_is_no_admin(self) -> None: """ If the user is not a server admin, an error is returned. """ @@ -477,7 +481,7 @@ class UsersListTestCase(unittest.HomeserverTestCase): self.assertEqual(HTTPStatus.FORBIDDEN, channel.code, msg=channel.json_body) self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"]) - def test_all_users(self): + def test_all_users(self) -> None: """ List all users, including deactivated users. """ @@ -497,7 +501,7 @@ class UsersListTestCase(unittest.HomeserverTestCase): # Check that all fields are available self._check_fields(channel.json_body["users"]) - def test_search_term(self): + def test_search_term(self) -> None: """Test that searching for a users works correctly""" def _search_test( @@ -505,7 +509,7 @@ class UsersListTestCase(unittest.HomeserverTestCase): search_term: str, search_field: Optional[str] = "name", expected_http_code: Optional[int] = HTTPStatus.OK, - ): + ) -> None: """Search for a user and check that the returned user's id is a match Args: @@ -575,7 +579,7 @@ class UsersListTestCase(unittest.HomeserverTestCase): _search_test(None, "foo", "user_id") _search_test(None, "bar", "user_id") - def test_invalid_parameter(self): + def test_invalid_parameter(self) -> None: """ If parameters are invalid, an error is returned. """ @@ -640,7 +644,7 @@ class UsersListTestCase(unittest.HomeserverTestCase): self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body) self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"]) - def test_limit(self): + def test_limit(self) -> None: """ Testing list of users with limit """ @@ -661,7 +665,7 @@ class UsersListTestCase(unittest.HomeserverTestCase): self.assertEqual(channel.json_body["next_token"], "5") self._check_fields(channel.json_body["users"]) - def test_from(self): + def test_from(self) -> None: """ Testing list of users with a defined starting point (from) """ @@ -682,7 +686,7 @@ class UsersListTestCase(unittest.HomeserverTestCase): self.assertNotIn("next_token", channel.json_body) self._check_fields(channel.json_body["users"]) - def test_limit_and_from(self): + def test_limit_and_from(self) -> None: """ Testing list of users with a defined starting point and limit """ @@ -703,7 +707,7 @@ class UsersListTestCase(unittest.HomeserverTestCase): self.assertEqual(len(channel.json_body["users"]), 10) self._check_fields(channel.json_body["users"]) - def test_next_token(self): + def test_next_token(self) -> None: """ Testing that `next_token` appears at the right place """ @@ -765,7 +769,7 @@ class UsersListTestCase(unittest.HomeserverTestCase): self.assertEqual(len(channel.json_body["users"]), 1) self.assertNotIn("next_token", channel.json_body) - def test_order_by(self): + def test_order_by(self) -> None: """ Testing order list with parameter `order_by` """ @@ -843,7 +847,7 @@ class UsersListTestCase(unittest.HomeserverTestCase): expected_user_list: List[str], order_by: Optional[str], dir: Optional[str] = None, - ): + ) -> None: """Request the list of users in a certain order. Assert that order is what we expect Args: @@ -870,7 +874,7 @@ class UsersListTestCase(unittest.HomeserverTestCase): self.assertEqual(expected_user_list, returned_order) self._check_fields(channel.json_body["users"]) - def _check_fields(self, content: List[JsonDict]): + def _check_fields(self, content: List[JsonDict]) -> None: """Checks that the expected user attributes are present in content Args: content: List that is checked for content @@ -886,7 +890,7 @@ class UsersListTestCase(unittest.HomeserverTestCase): self.assertIn("avatar_url", u) self.assertIn("creation_ts", u) - def _create_users(self, number_users: int): + def _create_users(self, number_users: int) -> None: """ Create a number of users Args: @@ -908,7 +912,7 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase): login.register_servlets, ] - def prepare(self, reactor, clock, hs): + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.store = hs.get_datastore() self.admin_user = self.register_user("admin", "pass", admin=True) @@ -931,7 +935,7 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase): self.store.user_add_threepid("@user:test", "email", "foo@bar.com", 0, 0) ) - def test_no_auth(self): + def test_no_auth(self) -> None: """ Try to deactivate users without authentication. """ @@ -940,7 +944,7 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase): self.assertEqual(HTTPStatus.UNAUTHORIZED, channel.code, msg=channel.json_body) self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"]) - def test_requester_is_not_admin(self): + def test_requester_is_not_admin(self) -> None: """ If the user is not a server admin, an error is returned. """ @@ -961,7 +965,7 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase): self.assertEqual(HTTPStatus.FORBIDDEN, channel.code, msg=channel.json_body) self.assertEqual("You are not a server admin", channel.json_body["error"]) - def test_user_does_not_exist(self): + def test_user_does_not_exist(self) -> None: """ Tests that deactivation for a user that does not exist returns a HTTPStatus.NOT_FOUND """ @@ -975,7 +979,7 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase): self.assertEqual(HTTPStatus.NOT_FOUND, channel.code, msg=channel.json_body) self.assertEqual(Codes.NOT_FOUND, channel.json_body["errcode"]) - def test_erase_is_not_bool(self): + def test_erase_is_not_bool(self) -> None: """ If parameter `erase` is not boolean, return an error """ @@ -990,7 +994,7 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase): self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body) self.assertEqual(Codes.BAD_JSON, channel.json_body["errcode"]) - def test_user_is_not_local(self): + def test_user_is_not_local(self) -> None: """ Tests that deactivation for a user that is not a local returns a HTTPStatus.BAD_REQUEST """ @@ -1001,7 +1005,7 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase): self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body) self.assertEqual("Can only deactivate local users", channel.json_body["error"]) - def test_deactivate_user_erase_true(self): + def test_deactivate_user_erase_true(self) -> None: """ Test deactivating a user and set `erase` to `true` """ @@ -1046,7 +1050,7 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase): self._is_erased("@user:test", True) - def test_deactivate_user_erase_false(self): + def test_deactivate_user_erase_false(self) -> None: """ Test deactivating a user and set `erase` to `false` """ @@ -1091,7 +1095,7 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase): self._is_erased("@user:test", False) - def test_deactivate_user_erase_true_no_profile(self): + def test_deactivate_user_erase_true_no_profile(self) -> None: """ Test deactivating a user and set `erase` to `true` if user has no profile information (stored in the database table `profiles`). @@ -1162,7 +1166,7 @@ class UserRestTestCase(unittest.HomeserverTestCase): sync.register_servlets, ] - def prepare(self, reactor, clock, hs): + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.store = hs.get_datastore() self.auth_handler = hs.get_auth_handler() @@ -1185,7 +1189,7 @@ class UserRestTestCase(unittest.HomeserverTestCase): self.url_prefix = "/_synapse/admin/v2/users/%s" self.url_other_user = self.url_prefix % self.other_user - def test_requester_is_no_admin(self): + def test_requester_is_no_admin(self) -> None: """ If the user is not a server admin, an error is returned. """ @@ -1210,7 +1214,7 @@ class UserRestTestCase(unittest.HomeserverTestCase): self.assertEqual(HTTPStatus.FORBIDDEN, channel.code, msg=channel.json_body) self.assertEqual("You are not a server admin", channel.json_body["error"]) - def test_user_does_not_exist(self): + def test_user_does_not_exist(self) -> None: """ Tests that a lookup for a user that does not exist returns a HTTPStatus.NOT_FOUND """ @@ -1224,7 +1228,7 @@ class UserRestTestCase(unittest.HomeserverTestCase): self.assertEqual(HTTPStatus.NOT_FOUND, channel.code, msg=channel.json_body) self.assertEqual("M_NOT_FOUND", channel.json_body["errcode"]) - def test_invalid_parameter(self): + def test_invalid_parameter(self) -> None: """ If parameters are invalid, an error is returned. """ @@ -1319,7 +1323,7 @@ class UserRestTestCase(unittest.HomeserverTestCase): self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body) self.assertEqual(Codes.MISSING_PARAM, channel.json_body["errcode"]) - def test_get_user(self): + def test_get_user(self) -> None: """ Test a simple get of a user. """ @@ -1334,7 +1338,7 @@ class UserRestTestCase(unittest.HomeserverTestCase): self.assertEqual("User", channel.json_body["displayname"]) self._check_fields(channel.json_body) - def test_create_server_admin(self): + def test_create_server_admin(self) -> None: """ Check that a new admin user is created successfully. """ @@ -1383,7 +1387,7 @@ class UserRestTestCase(unittest.HomeserverTestCase): self.assertEqual("mxc://fibble/wibble", channel.json_body["avatar_url"]) self._check_fields(channel.json_body) - def test_create_user(self): + def test_create_user(self) -> None: """ Check that a new regular user is created successfully. """ @@ -1450,7 +1454,7 @@ class UserRestTestCase(unittest.HomeserverTestCase): @override_config( {"limit_usage_by_mau": True, "max_mau_value": 2, "mau_trial_days": 0} ) - def test_create_user_mau_limit_reached_active_admin(self): + def test_create_user_mau_limit_reached_active_admin(self) -> None: """ Check that an admin can register a new user via the admin API even if the MAU limit is reached. @@ -1496,7 +1500,7 @@ class UserRestTestCase(unittest.HomeserverTestCase): @override_config( {"limit_usage_by_mau": True, "max_mau_value": 2, "mau_trial_days": 0} ) - def test_create_user_mau_limit_reached_passive_admin(self): + def test_create_user_mau_limit_reached_passive_admin(self) -> None: """ Check that an admin can register a new user via the admin API even if the MAU limit is reached. @@ -1541,7 +1545,7 @@ class UserRestTestCase(unittest.HomeserverTestCase): "public_baseurl": "https://example.com", } ) - def test_create_user_email_notif_for_new_users(self): + def test_create_user_email_notif_for_new_users(self) -> None: """ Check that a new regular user is created successfully and got an email pusher. @@ -1584,7 +1588,7 @@ class UserRestTestCase(unittest.HomeserverTestCase): "public_baseurl": "https://example.com", } ) - def test_create_user_email_no_notif_for_new_users(self): + def test_create_user_email_no_notif_for_new_users(self) -> None: """ Check that a new regular user is created successfully and got not an email pusher. @@ -1615,7 +1619,7 @@ class UserRestTestCase(unittest.HomeserverTestCase): pushers = list(pushers) self.assertEqual(len(pushers), 0) - def test_set_password(self): + def test_set_password(self) -> None: """ Test setting a new password for another user. """ @@ -1631,7 +1635,7 @@ class UserRestTestCase(unittest.HomeserverTestCase): self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body) self._check_fields(channel.json_body) - def test_set_displayname(self): + def test_set_displayname(self) -> None: """ Test setting the displayname of another user. """ @@ -1659,7 +1663,7 @@ class UserRestTestCase(unittest.HomeserverTestCase): self.assertEqual("@user:test", channel.json_body["name"]) self.assertEqual("foobar", channel.json_body["displayname"]) - def test_set_threepid(self): + def test_set_threepid(self) -> None: """ Test setting threepid for an other user. """ @@ -1740,7 +1744,7 @@ class UserRestTestCase(unittest.HomeserverTestCase): self.assertEqual(0, len(channel.json_body["threepids"])) self._check_fields(channel.json_body) - def test_set_duplicate_threepid(self): + def test_set_duplicate_threepid(self) -> None: """ Test setting the same threepid for a second user. First user loses and second user gets mapping of this threepid. @@ -1827,7 +1831,7 @@ class UserRestTestCase(unittest.HomeserverTestCase): self.assertEqual(0, len(channel.json_body["threepids"])) self._check_fields(channel.json_body) - def test_set_external_id(self): + def test_set_external_id(self) -> None: """ Test setting external id for an other user. """ @@ -1925,7 +1929,7 @@ class UserRestTestCase(unittest.HomeserverTestCase): self.assertEqual("@user:test", channel.json_body["name"]) self.assertEqual(0, len(channel.json_body["external_ids"])) - def test_set_duplicate_external_id(self): + def test_set_duplicate_external_id(self) -> None: """ Test that setting the same external id for a second user fails and external id from user must not be changed. @@ -2048,7 +2052,7 @@ class UserRestTestCase(unittest.HomeserverTestCase): ) self._check_fields(channel.json_body) - def test_deactivate_user(self): + def test_deactivate_user(self) -> None: """ Test deactivating another user. """ @@ -2113,7 +2117,7 @@ class UserRestTestCase(unittest.HomeserverTestCase): self.assertNotIn("password_hash", channel.json_body) @override_config({"user_directory": {"enabled": True, "search_all_users": True}}) - def test_change_name_deactivate_user_user_directory(self): + def test_change_name_deactivate_user_user_directory(self) -> None: """ Test change profile information of a deactivated user and check that it does not appear in user directory @@ -2156,7 +2160,7 @@ class UserRestTestCase(unittest.HomeserverTestCase): profile = self.get_success(self.store.get_user_in_directory(self.other_user)) self.assertIsNone(profile) - def test_reactivate_user(self): + def test_reactivate_user(self) -> None: """ Test reactivating another user. """ @@ -2189,7 +2193,7 @@ class UserRestTestCase(unittest.HomeserverTestCase): self.assertNotIn("password_hash", channel.json_body) @override_config({"password_config": {"localdb_enabled": False}}) - def test_reactivate_user_localdb_disabled(self): + def test_reactivate_user_localdb_disabled(self) -> None: """ Test reactivating another user when using SSO. """ @@ -2223,7 +2227,7 @@ class UserRestTestCase(unittest.HomeserverTestCase): self.assertNotIn("password_hash", channel.json_body) @override_config({"password_config": {"enabled": False}}) - def test_reactivate_user_password_disabled(self): + def test_reactivate_user_password_disabled(self) -> None: """ Test reactivating another user when using SSO. """ @@ -2256,7 +2260,7 @@ class UserRestTestCase(unittest.HomeserverTestCase): # This key was removed intentionally. Ensure it is not accidentally re-included. self.assertNotIn("password_hash", channel.json_body) - def test_set_user_as_admin(self): + def test_set_user_as_admin(self) -> None: """ Test setting the admin flag on a user. """ @@ -2284,7 +2288,7 @@ class UserRestTestCase(unittest.HomeserverTestCase): self.assertEqual("@user:test", channel.json_body["name"]) self.assertTrue(channel.json_body["admin"]) - def test_set_user_type(self): + def test_set_user_type(self) -> None: """ Test changing user type. """ @@ -2335,7 +2339,7 @@ class UserRestTestCase(unittest.HomeserverTestCase): self.assertEqual("@user:test", channel.json_body["name"]) self.assertIsNone(channel.json_body["user_type"]) - def test_accidental_deactivation_prevention(self): + def test_accidental_deactivation_prevention(self) -> None: """ Ensure an account can't accidentally be deactivated by using a str value for the deactivated body parameter @@ -2418,7 +2422,7 @@ class UserRestTestCase(unittest.HomeserverTestCase): # This key was removed intentionally. Ensure it is not accidentally re-included. self.assertNotIn("password_hash", channel.json_body) - def _check_fields(self, content: JsonDict): + def _check_fields(self, content: JsonDict) -> None: """Checks that the expected user attributes are present in content Args: @@ -2448,7 +2452,7 @@ class UserMembershipRestTestCase(unittest.HomeserverTestCase): room.register_servlets, ] - def prepare(self, reactor, clock, hs): + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.admin_user = self.register_user("admin", "pass", admin=True) self.admin_user_tok = self.login("admin", "pass") @@ -2457,7 +2461,7 @@ class UserMembershipRestTestCase(unittest.HomeserverTestCase): self.other_user ) - def test_no_auth(self): + def test_no_auth(self) -> None: """ Try to list rooms of an user without authentication. """ @@ -2466,7 +2470,7 @@ class UserMembershipRestTestCase(unittest.HomeserverTestCase): self.assertEqual(HTTPStatus.UNAUTHORIZED, channel.code, msg=channel.json_body) self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"]) - def test_requester_is_no_admin(self): + def test_requester_is_no_admin(self) -> None: """ If the user is not a server admin, an error is returned. """ @@ -2481,7 +2485,7 @@ class UserMembershipRestTestCase(unittest.HomeserverTestCase): self.assertEqual(HTTPStatus.FORBIDDEN, channel.code, msg=channel.json_body) self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"]) - def test_user_does_not_exist(self): + def test_user_does_not_exist(self) -> None: """ Tests that a lookup for a user that does not exist returns an empty list """ @@ -2496,7 +2500,7 @@ class UserMembershipRestTestCase(unittest.HomeserverTestCase): self.assertEqual(0, channel.json_body["total"]) self.assertEqual(0, len(channel.json_body["joined_rooms"])) - def test_user_is_not_local(self): + def test_user_is_not_local(self) -> None: """ Tests that a lookup for a user that is not a local and participates in no conversation returns an empty list """ @@ -2512,7 +2516,7 @@ class UserMembershipRestTestCase(unittest.HomeserverTestCase): self.assertEqual(0, channel.json_body["total"]) self.assertEqual(0, len(channel.json_body["joined_rooms"])) - def test_no_memberships(self): + def test_no_memberships(self) -> None: """ Tests that a normal lookup for rooms is successfully if user has no memberships @@ -2528,7 +2532,7 @@ class UserMembershipRestTestCase(unittest.HomeserverTestCase): self.assertEqual(0, channel.json_body["total"]) self.assertEqual(0, len(channel.json_body["joined_rooms"])) - def test_get_rooms(self): + def test_get_rooms(self) -> None: """ Tests that a normal lookup for rooms is successfully """ @@ -2549,7 +2553,7 @@ class UserMembershipRestTestCase(unittest.HomeserverTestCase): self.assertEqual(number_rooms, channel.json_body["total"]) self.assertEqual(number_rooms, len(channel.json_body["joined_rooms"])) - def test_get_rooms_with_nonlocal_user(self): + def test_get_rooms_with_nonlocal_user(self) -> None: """ Tests that a normal lookup for rooms is successful with a non-local user """ @@ -2604,7 +2608,7 @@ class PushersRestTestCase(unittest.HomeserverTestCase): login.register_servlets, ] - def prepare(self, reactor, clock, hs): + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.store = hs.get_datastore() self.admin_user = self.register_user("admin", "pass", admin=True) @@ -2615,7 +2619,7 @@ class PushersRestTestCase(unittest.HomeserverTestCase): self.other_user ) - def test_no_auth(self): + def test_no_auth(self) -> None: """ Try to list pushers of an user without authentication. """ @@ -2624,7 +2628,7 @@ class PushersRestTestCase(unittest.HomeserverTestCase): self.assertEqual(HTTPStatus.UNAUTHORIZED, channel.code, msg=channel.json_body) self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"]) - def test_requester_is_no_admin(self): + def test_requester_is_no_admin(self) -> None: """ If the user is not a server admin, an error is returned. """ @@ -2639,7 +2643,7 @@ class PushersRestTestCase(unittest.HomeserverTestCase): self.assertEqual(HTTPStatus.FORBIDDEN, channel.code, msg=channel.json_body) self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"]) - def test_user_does_not_exist(self): + def test_user_does_not_exist(self) -> None: """ Tests that a lookup for a user that does not exist returns a HTTPStatus.NOT_FOUND """ @@ -2653,7 +2657,7 @@ class PushersRestTestCase(unittest.HomeserverTestCase): self.assertEqual(HTTPStatus.NOT_FOUND, channel.code, msg=channel.json_body) self.assertEqual(Codes.NOT_FOUND, channel.json_body["errcode"]) - def test_user_is_not_local(self): + def test_user_is_not_local(self) -> None: """ Tests that a lookup for a user that is not a local returns a HTTPStatus.BAD_REQUEST """ @@ -2668,7 +2672,7 @@ class PushersRestTestCase(unittest.HomeserverTestCase): self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body) self.assertEqual("Can only look up local users", channel.json_body["error"]) - def test_get_pushers(self): + def test_get_pushers(self) -> None: """ Tests that a normal lookup for pushers is successfully """ @@ -2732,7 +2736,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase): login.register_servlets, ] - def prepare(self, reactor, clock, hs): + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.store = hs.get_datastore() self.media_repo = hs.get_media_repository_resource() self.filepaths = MediaFilePaths(hs.config.media.media_store_path) @@ -2746,7 +2750,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase): ) @parameterized.expand(["GET", "DELETE"]) - def test_no_auth(self, method: str): + def test_no_auth(self, method: str) -> None: """Try to list media of an user without authentication.""" channel = self.make_request(method, self.url, {}) @@ -2754,7 +2758,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase): self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"]) @parameterized.expand(["GET", "DELETE"]) - def test_requester_is_no_admin(self, method: str): + def test_requester_is_no_admin(self, method: str) -> None: """If the user is not a server admin, an error is returned.""" other_user_token = self.login("user", "pass") @@ -2768,7 +2772,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase): self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"]) @parameterized.expand(["GET", "DELETE"]) - def test_user_does_not_exist(self, method: str): + def test_user_does_not_exist(self, method: str) -> None: """Tests that a lookup for a user that does not exist returns a HTTPStatus.NOT_FOUND""" url = "/_synapse/admin/v1/users/@unknown_person:test/media" channel = self.make_request( @@ -2781,7 +2785,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase): self.assertEqual(Codes.NOT_FOUND, channel.json_body["errcode"]) @parameterized.expand(["GET", "DELETE"]) - def test_user_is_not_local(self, method: str): + def test_user_is_not_local(self, method: str) -> None: """Tests that a lookup for a user that is not a local returns a HTTPStatus.BAD_REQUEST""" url = "/_synapse/admin/v1/users/@unknown_person:unknown_domain/media" @@ -2794,7 +2798,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase): self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body) self.assertEqual("Can only look up local users", channel.json_body["error"]) - def test_limit_GET(self): + def test_limit_GET(self) -> None: """Testing list of media with limit""" number_media = 20 @@ -2813,7 +2817,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase): self.assertEqual(channel.json_body["next_token"], 5) self._check_fields(channel.json_body["media"]) - def test_limit_DELETE(self): + def test_limit_DELETE(self) -> None: """Testing delete of media with limit""" number_media = 20 @@ -2830,7 +2834,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase): self.assertEqual(channel.json_body["total"], 5) self.assertEqual(len(channel.json_body["deleted_media"]), 5) - def test_from_GET(self): + def test_from_GET(self) -> None: """Testing list of media with a defined starting point (from)""" number_media = 20 @@ -2849,7 +2853,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase): self.assertNotIn("next_token", channel.json_body) self._check_fields(channel.json_body["media"]) - def test_from_DELETE(self): + def test_from_DELETE(self) -> None: """Testing delete of media with a defined starting point (from)""" number_media = 20 @@ -2866,7 +2870,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase): self.assertEqual(channel.json_body["total"], 15) self.assertEqual(len(channel.json_body["deleted_media"]), 15) - def test_limit_and_from_GET(self): + def test_limit_and_from_GET(self) -> None: """Testing list of media with a defined starting point and limit""" number_media = 20 @@ -2885,7 +2889,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase): self.assertEqual(len(channel.json_body["media"]), 10) self._check_fields(channel.json_body["media"]) - def test_limit_and_from_DELETE(self): + def test_limit_and_from_DELETE(self) -> None: """Testing delete of media with a defined starting point and limit""" number_media = 20 @@ -2903,7 +2907,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase): self.assertEqual(len(channel.json_body["deleted_media"]), 10) @parameterized.expand(["GET", "DELETE"]) - def test_invalid_parameter(self, method: str): + def test_invalid_parameter(self, method: str) -> None: """If parameters are invalid, an error is returned.""" # unkown order_by channel = self.make_request( @@ -2945,7 +2949,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase): self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body) self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"]) - def test_next_token(self): + def test_next_token(self) -> None: """ Testing that `next_token` appears at the right place @@ -3010,7 +3014,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase): self.assertEqual(len(channel.json_body["media"]), 1) self.assertNotIn("next_token", channel.json_body) - def test_user_has_no_media_GET(self): + def test_user_has_no_media_GET(self) -> None: """ Tests that a normal lookup for media is successfully if user has no media created @@ -3026,7 +3030,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase): self.assertEqual(0, channel.json_body["total"]) self.assertEqual(0, len(channel.json_body["media"])) - def test_user_has_no_media_DELETE(self): + def test_user_has_no_media_DELETE(self) -> None: """ Tests that a delete is successful if user has no media """ @@ -3041,7 +3045,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase): self.assertEqual(0, channel.json_body["total"]) self.assertEqual(0, len(channel.json_body["deleted_media"])) - def test_get_media(self): + def test_get_media(self) -> None: """Tests that a normal lookup for media is successful""" number_media = 5 @@ -3060,7 +3064,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase): self.assertNotIn("next_token", channel.json_body) self._check_fields(channel.json_body["media"]) - def test_delete_media(self): + def test_delete_media(self) -> None: """Tests that a normal delete of media is successful""" number_media = 5 @@ -3089,7 +3093,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase): for local_path in local_paths: self.assertFalse(os.path.exists(local_path)) - def test_order_by(self): + def test_order_by(self) -> None: """ Testing order list with parameter `order_by` """ @@ -3252,7 +3256,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase): return media_id - def _check_fields(self, content: List[JsonDict]): + def _check_fields(self, content: List[JsonDict]) -> None: """Checks that the expected user attributes are present in content Args: content: List that is checked for content @@ -3272,7 +3276,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase): expected_media_list: List[str], order_by: Optional[str], dir: Optional[str] = None, - ): + ) -> None: """Request the list of media in a certain order. Assert that order is what we expect Args: @@ -3312,7 +3316,7 @@ class UserTokenRestTestCase(unittest.HomeserverTestCase): logout.register_servlets, ] - def prepare(self, reactor, clock, hs): + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.store = hs.get_datastore() self.admin_user = self.register_user("admin", "pass", admin=True) @@ -3331,14 +3335,14 @@ class UserTokenRestTestCase(unittest.HomeserverTestCase): self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body) return channel.json_body["access_token"] - def test_no_auth(self): + def test_no_auth(self) -> None: """Try to login as a user without authentication.""" channel = self.make_request("POST", self.url, b"{}") self.assertEqual(HTTPStatus.UNAUTHORIZED, channel.code, msg=channel.json_body) self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"]) - def test_not_admin(self): + def test_not_admin(self) -> None: """Try to login as a user as a non-admin user.""" channel = self.make_request( "POST", self.url, b"{}", access_token=self.other_user_tok @@ -3346,7 +3350,7 @@ class UserTokenRestTestCase(unittest.HomeserverTestCase): self.assertEqual(HTTPStatus.FORBIDDEN, channel.code, msg=channel.json_body) - def test_send_event(self): + def test_send_event(self) -> None: """Test that sending event as a user works.""" # Create a room. room_id = self.helper.create_room_as(self.other_user, tok=self.other_user_tok) @@ -3360,7 +3364,7 @@ class UserTokenRestTestCase(unittest.HomeserverTestCase): event = self.get_success(self.store.get_event(event_id)) self.assertEqual(event.sender, self.other_user) - def test_devices(self): + def test_devices(self) -> None: """Tests that logging in as a user doesn't create a new device for them.""" # Login in as the user self._get_token() @@ -3374,7 +3378,7 @@ class UserTokenRestTestCase(unittest.HomeserverTestCase): # We should only see the one device (from the login in `prepare`) self.assertEqual(len(channel.json_body["devices"]), 1) - def test_logout(self): + def test_logout(self) -> None: """Test that calling `/logout` with the token works.""" # Login in as the user puppet_token = self._get_token() @@ -3397,7 +3401,7 @@ class UserTokenRestTestCase(unittest.HomeserverTestCase): ) self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body) - def test_user_logout_all(self): + def test_user_logout_all(self) -> None: """Tests that the target user calling `/logout/all` does *not* expire the token. """ @@ -3424,7 +3428,7 @@ class UserTokenRestTestCase(unittest.HomeserverTestCase): ) self.assertEqual(HTTPStatus.UNAUTHORIZED, channel.code, msg=channel.json_body) - def test_admin_logout_all(self): + def test_admin_logout_all(self) -> None: """Tests that the admin user calling `/logout/all` does expire the token. """ @@ -3464,7 +3468,7 @@ class UserTokenRestTestCase(unittest.HomeserverTestCase): "form_secret": "123secret", } ) - def test_consent(self): + def test_consent(self) -> None: """Test that sending a message is not subject to the privacy policies.""" # Have the admin user accept the terms. self.get_success(self.store.user_set_consent_version(self.admin_user, "1.0")) @@ -3492,7 +3496,7 @@ class UserTokenRestTestCase(unittest.HomeserverTestCase): @override_config( {"limit_usage_by_mau": True, "max_mau_value": 1, "mau_trial_days": 0} ) - def test_mau_limit(self): + def test_mau_limit(self) -> None: # Create a room as the admin user. This will bump the monthly active users to 1. room_id = self.helper.create_room_as(self.admin_user, tok=self.admin_user_tok) @@ -3524,14 +3528,14 @@ class WhoisRestTestCase(unittest.HomeserverTestCase): login.register_servlets, ] - def prepare(self, reactor, clock, hs): + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.admin_user = self.register_user("admin", "pass", admin=True) self.admin_user_tok = self.login("admin", "pass") self.other_user = self.register_user("user", "pass") - self.url = self.url_prefix % self.other_user + self.url = self.url_prefix % self.other_user # type: ignore[attr-defined] - def test_no_auth(self): + def test_no_auth(self) -> None: """ Try to get information of an user without authentication. """ @@ -3539,7 +3543,7 @@ class WhoisRestTestCase(unittest.HomeserverTestCase): self.assertEqual(HTTPStatus.UNAUTHORIZED, channel.code, msg=channel.json_body) self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"]) - def test_requester_is_not_admin(self): + def test_requester_is_not_admin(self) -> None: """ If the user is not a server admin, an error is returned. """ @@ -3554,11 +3558,11 @@ class WhoisRestTestCase(unittest.HomeserverTestCase): self.assertEqual(HTTPStatus.FORBIDDEN, channel.code, msg=channel.json_body) self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"]) - def test_user_is_not_local(self): + def test_user_is_not_local(self) -> None: """ Tests that a lookup for a user that is not a local returns a HTTPStatus.BAD_REQUEST """ - url = self.url_prefix % "@unknown_person:unknown_domain" + url = self.url_prefix % "@unknown_person:unknown_domain" # type: ignore[attr-defined] channel = self.make_request( "GET", @@ -3568,7 +3572,7 @@ class WhoisRestTestCase(unittest.HomeserverTestCase): self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body) self.assertEqual("Can only whois a local user", channel.json_body["error"]) - def test_get_whois_admin(self): + def test_get_whois_admin(self) -> None: """ The lookup should succeed for an admin. """ @@ -3581,7 +3585,7 @@ class WhoisRestTestCase(unittest.HomeserverTestCase): self.assertEqual(self.other_user, channel.json_body["user_id"]) self.assertIn("devices", channel.json_body) - def test_get_whois_user(self): + def test_get_whois_user(self) -> None: """ The lookup should succeed for a normal user looking up their own information. """ @@ -3604,7 +3608,7 @@ class ShadowBanRestTestCase(unittest.HomeserverTestCase): login.register_servlets, ] - def prepare(self, reactor, clock, hs): + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.store = hs.get_datastore() self.admin_user = self.register_user("admin", "pass", admin=True) @@ -3617,7 +3621,7 @@ class ShadowBanRestTestCase(unittest.HomeserverTestCase): ) @parameterized.expand(["POST", "DELETE"]) - def test_no_auth(self, method: str): + def test_no_auth(self, method: str) -> None: """ Try to get information of an user without authentication. """ @@ -3626,7 +3630,7 @@ class ShadowBanRestTestCase(unittest.HomeserverTestCase): self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"]) @parameterized.expand(["POST", "DELETE"]) - def test_requester_is_not_admin(self, method: str): + def test_requester_is_not_admin(self, method: str) -> None: """ If the user is not a server admin, an error is returned. """ @@ -3637,7 +3641,7 @@ class ShadowBanRestTestCase(unittest.HomeserverTestCase): self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"]) @parameterized.expand(["POST", "DELETE"]) - def test_user_is_not_local(self, method: str): + def test_user_is_not_local(self, method: str) -> None: """ Tests that shadow-banning for a user that is not a local returns a HTTPStatus.BAD_REQUEST """ @@ -3646,7 +3650,7 @@ class ShadowBanRestTestCase(unittest.HomeserverTestCase): channel = self.make_request(method, url, access_token=self.admin_user_tok) self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body) - def test_success(self): + def test_success(self) -> None: """ Shadow-banning should succeed for an admin. """ @@ -3682,7 +3686,7 @@ class RateLimitTestCase(unittest.HomeserverTestCase): login.register_servlets, ] - def prepare(self, reactor, clock, hs): + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.store = hs.get_datastore() self.admin_user = self.register_user("admin", "pass", admin=True) @@ -3695,7 +3699,7 @@ class RateLimitTestCase(unittest.HomeserverTestCase): ) @parameterized.expand(["GET", "POST", "DELETE"]) - def test_no_auth(self, method: str): + def test_no_auth(self, method: str) -> None: """ Try to get information of a user without authentication. """ @@ -3705,7 +3709,7 @@ class RateLimitTestCase(unittest.HomeserverTestCase): self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"]) @parameterized.expand(["GET", "POST", "DELETE"]) - def test_requester_is_no_admin(self, method: str): + def test_requester_is_no_admin(self, method: str) -> None: """ If the user is not a server admin, an error is returned. """ @@ -3721,7 +3725,7 @@ class RateLimitTestCase(unittest.HomeserverTestCase): self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"]) @parameterized.expand(["GET", "POST", "DELETE"]) - def test_user_does_not_exist(self, method: str): + def test_user_does_not_exist(self, method: str) -> None: """ Tests that a lookup for a user that does not exist returns a HTTPStatus.NOT_FOUND """ @@ -3743,7 +3747,7 @@ class RateLimitTestCase(unittest.HomeserverTestCase): ("DELETE", "Only local users can be ratelimited"), ] ) - def test_user_is_not_local(self, method: str, error_msg: str): + def test_user_is_not_local(self, method: str, error_msg: str) -> None: """ Tests that a lookup for a user that is not a local returns a HTTPStatus.BAD_REQUEST """ @@ -3760,7 +3764,7 @@ class RateLimitTestCase(unittest.HomeserverTestCase): self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body) self.assertEqual(error_msg, channel.json_body["error"]) - def test_invalid_parameter(self): + def test_invalid_parameter(self) -> None: """ If parameters are invalid, an error is returned. """ @@ -3808,7 +3812,7 @@ class RateLimitTestCase(unittest.HomeserverTestCase): self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body) self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"]) - def test_return_zero_when_null(self): + def test_return_zero_when_null(self) -> None: """ If values in database are `null` API should return an int `0` """ @@ -3834,7 +3838,7 @@ class RateLimitTestCase(unittest.HomeserverTestCase): self.assertEqual(0, channel.json_body["messages_per_second"]) self.assertEqual(0, channel.json_body["burst_count"]) - def test_success(self): + def test_success(self) -> None: """ Rate-limiting (set/update/delete) should succeed for an admin. """ @@ -3908,7 +3912,7 @@ class AccountDataTestCase(unittest.HomeserverTestCase): login.register_servlets, ] - def prepare(self, reactor, clock, hs) -> None: + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.store = hs.get_datastore() self.admin_user = self.register_user("admin", "pass", admin=True) |