diff --git a/tests/rest/admin/test_user.py b/tests/rest/admin/test_user.py
index 9711405735..272637e965 100644
--- a/tests/rest/admin/test_user.py
+++ b/tests/rest/admin/test_user.py
@@ -23,13 +23,17 @@ from unittest.mock import Mock, patch
from parameterized import parameterized, parameterized_class
+from twisted.test.proto_helpers import MemoryReactor
+
import synapse.rest.admin
from synapse.api.constants import UserTypes
from synapse.api.errors import Codes, HttpResponseException, ResourceLimitError
from synapse.api.room_versions import RoomVersions
from synapse.rest.client import devices, login, logout, profile, room, sync
from synapse.rest.media.v1.filepath import MediaFilePaths
+from synapse.server import HomeServer
from synapse.types import JsonDict, UserID
+from synapse.util import Clock
from tests import unittest
from tests.server import FakeSite, make_request
@@ -44,7 +48,7 @@ class UserRegisterTestCase(unittest.HomeserverTestCase):
profile.register_servlets,
]
- def make_homeserver(self, reactor, clock):
+ def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
self.url = "/_synapse/admin/v1/register"
@@ -61,12 +65,12 @@ class UserRegisterTestCase(unittest.HomeserverTestCase):
self.hs.config.registration.registration_shared_secret = "shared"
- self.hs.get_media_repository = Mock()
- self.hs.get_deactivate_account_handler = Mock()
+ self.hs.get_media_repository = Mock() # type: ignore[assignment]
+ self.hs.get_deactivate_account_handler = Mock() # type: ignore[assignment]
return self.hs
- def test_disabled(self):
+ def test_disabled(self) -> None:
"""
If there is no shared secret, registration through this method will be
prevented.
@@ -80,7 +84,7 @@ class UserRegisterTestCase(unittest.HomeserverTestCase):
"Shared secret registration is not enabled", channel.json_body["error"]
)
- def test_get_nonce(self):
+ def test_get_nonce(self) -> None:
"""
Calling GET on the endpoint will return a randomised nonce, using the
homeserver's secrets provider.
@@ -93,7 +97,7 @@ class UserRegisterTestCase(unittest.HomeserverTestCase):
self.assertEqual(channel.json_body, {"nonce": "abcd"})
- def test_expired_nonce(self):
+ def test_expired_nonce(self) -> None:
"""
Calling GET on the endpoint will return a randomised nonce, which will
only last for SALT_TIMEOUT (60s).
@@ -118,7 +122,7 @@ class UserRegisterTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual("unrecognised nonce", channel.json_body["error"])
- def test_register_incorrect_nonce(self):
+ def test_register_incorrect_nonce(self) -> None:
"""
Only the provided nonce can be used, as it's checked in the MAC.
"""
@@ -141,7 +145,7 @@ class UserRegisterTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.FORBIDDEN, channel.code, msg=channel.json_body)
self.assertEqual("HMAC incorrect", channel.json_body["error"])
- def test_register_correct_nonce(self):
+ def test_register_correct_nonce(self) -> None:
"""
When the correct nonce is provided, and the right key is provided, the
user is registered.
@@ -168,7 +172,7 @@ class UserRegisterTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual("@bob:test", channel.json_body["user_id"])
- def test_nonce_reuse(self):
+ def test_nonce_reuse(self) -> None:
"""
A valid unrecognised nonce.
"""
@@ -197,14 +201,14 @@ class UserRegisterTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual("unrecognised nonce", channel.json_body["error"])
- def test_missing_parts(self):
+ def test_missing_parts(self) -> None:
"""
Synapse will complain if you don't give nonce, username, password, and
mac. Admin and user_types are optional. Additional checks are done for length
and type.
"""
- def nonce():
+ def nonce() -> str:
channel = self.make_request("GET", self.url)
return channel.json_body["nonce"]
@@ -297,7 +301,7 @@ class UserRegisterTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual("Invalid user type", channel.json_body["error"])
- def test_displayname(self):
+ def test_displayname(self) -> None:
"""
Test that displayname of new user is set
"""
@@ -400,7 +404,7 @@ class UserRegisterTestCase(unittest.HomeserverTestCase):
@override_config(
{"limit_usage_by_mau": True, "max_mau_value": 2, "mau_trial_days": 0}
)
- def test_register_mau_limit_reached(self):
+ def test_register_mau_limit_reached(self) -> None:
"""
Check we can register a user via the shared secret registration API
even if the MAU limit is reached.
@@ -450,13 +454,13 @@ class UsersListTestCase(unittest.HomeserverTestCase):
]
url = "/_synapse/admin/v2/users"
- def prepare(self, reactor, clock, hs):
+ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastore()
self.admin_user = self.register_user("admin", "pass", admin=True)
self.admin_user_tok = self.login("admin", "pass")
- def test_no_auth(self):
+ def test_no_auth(self) -> None:
"""
Try to list users without authentication.
"""
@@ -465,7 +469,7 @@ class UsersListTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.UNAUTHORIZED, channel.code, msg=channel.json_body)
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
- def test_requester_is_no_admin(self):
+ def test_requester_is_no_admin(self) -> None:
"""
If the user is not a server admin, an error is returned.
"""
@@ -477,7 +481,7 @@ class UsersListTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.FORBIDDEN, channel.code, msg=channel.json_body)
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
- def test_all_users(self):
+ def test_all_users(self) -> None:
"""
List all users, including deactivated users.
"""
@@ -497,7 +501,7 @@ class UsersListTestCase(unittest.HomeserverTestCase):
# Check that all fields are available
self._check_fields(channel.json_body["users"])
- def test_search_term(self):
+ def test_search_term(self) -> None:
"""Test that searching for a users works correctly"""
def _search_test(
@@ -505,7 +509,7 @@ class UsersListTestCase(unittest.HomeserverTestCase):
search_term: str,
search_field: Optional[str] = "name",
expected_http_code: Optional[int] = HTTPStatus.OK,
- ):
+ ) -> None:
"""Search for a user and check that the returned user's id is a match
Args:
@@ -575,7 +579,7 @@ class UsersListTestCase(unittest.HomeserverTestCase):
_search_test(None, "foo", "user_id")
_search_test(None, "bar", "user_id")
- def test_invalid_parameter(self):
+ def test_invalid_parameter(self) -> None:
"""
If parameters are invalid, an error is returned.
"""
@@ -640,7 +644,7 @@ class UsersListTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"])
- def test_limit(self):
+ def test_limit(self) -> None:
"""
Testing list of users with limit
"""
@@ -661,7 +665,7 @@ class UsersListTestCase(unittest.HomeserverTestCase):
self.assertEqual(channel.json_body["next_token"], "5")
self._check_fields(channel.json_body["users"])
- def test_from(self):
+ def test_from(self) -> None:
"""
Testing list of users with a defined starting point (from)
"""
@@ -682,7 +686,7 @@ class UsersListTestCase(unittest.HomeserverTestCase):
self.assertNotIn("next_token", channel.json_body)
self._check_fields(channel.json_body["users"])
- def test_limit_and_from(self):
+ def test_limit_and_from(self) -> None:
"""
Testing list of users with a defined starting point and limit
"""
@@ -703,7 +707,7 @@ class UsersListTestCase(unittest.HomeserverTestCase):
self.assertEqual(len(channel.json_body["users"]), 10)
self._check_fields(channel.json_body["users"])
- def test_next_token(self):
+ def test_next_token(self) -> None:
"""
Testing that `next_token` appears at the right place
"""
@@ -765,7 +769,7 @@ class UsersListTestCase(unittest.HomeserverTestCase):
self.assertEqual(len(channel.json_body["users"]), 1)
self.assertNotIn("next_token", channel.json_body)
- def test_order_by(self):
+ def test_order_by(self) -> None:
"""
Testing order list with parameter `order_by`
"""
@@ -843,7 +847,7 @@ class UsersListTestCase(unittest.HomeserverTestCase):
expected_user_list: List[str],
order_by: Optional[str],
dir: Optional[str] = None,
- ):
+ ) -> None:
"""Request the list of users in a certain order. Assert that order is what
we expect
Args:
@@ -870,7 +874,7 @@ class UsersListTestCase(unittest.HomeserverTestCase):
self.assertEqual(expected_user_list, returned_order)
self._check_fields(channel.json_body["users"])
- def _check_fields(self, content: List[JsonDict]):
+ def _check_fields(self, content: List[JsonDict]) -> None:
"""Checks that the expected user attributes are present in content
Args:
content: List that is checked for content
@@ -886,7 +890,7 @@ class UsersListTestCase(unittest.HomeserverTestCase):
self.assertIn("avatar_url", u)
self.assertIn("creation_ts", u)
- def _create_users(self, number_users: int):
+ def _create_users(self, number_users: int) -> None:
"""
Create a number of users
Args:
@@ -908,7 +912,7 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase):
login.register_servlets,
]
- def prepare(self, reactor, clock, hs):
+ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastore()
self.admin_user = self.register_user("admin", "pass", admin=True)
@@ -931,7 +935,7 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase):
self.store.user_add_threepid("@user:test", "email", "foo@bar.com", 0, 0)
)
- def test_no_auth(self):
+ def test_no_auth(self) -> None:
"""
Try to deactivate users without authentication.
"""
@@ -940,7 +944,7 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.UNAUTHORIZED, channel.code, msg=channel.json_body)
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
- def test_requester_is_not_admin(self):
+ def test_requester_is_not_admin(self) -> None:
"""
If the user is not a server admin, an error is returned.
"""
@@ -961,7 +965,7 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.FORBIDDEN, channel.code, msg=channel.json_body)
self.assertEqual("You are not a server admin", channel.json_body["error"])
- def test_user_does_not_exist(self):
+ def test_user_does_not_exist(self) -> None:
"""
Tests that deactivation for a user that does not exist returns a HTTPStatus.NOT_FOUND
"""
@@ -975,7 +979,7 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.NOT_FOUND, channel.code, msg=channel.json_body)
self.assertEqual(Codes.NOT_FOUND, channel.json_body["errcode"])
- def test_erase_is_not_bool(self):
+ def test_erase_is_not_bool(self) -> None:
"""
If parameter `erase` is not boolean, return an error
"""
@@ -990,7 +994,7 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(Codes.BAD_JSON, channel.json_body["errcode"])
- def test_user_is_not_local(self):
+ def test_user_is_not_local(self) -> None:
"""
Tests that deactivation for a user that is not a local returns a HTTPStatus.BAD_REQUEST
"""
@@ -1001,7 +1005,7 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual("Can only deactivate local users", channel.json_body["error"])
- def test_deactivate_user_erase_true(self):
+ def test_deactivate_user_erase_true(self) -> None:
"""
Test deactivating a user and set `erase` to `true`
"""
@@ -1046,7 +1050,7 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase):
self._is_erased("@user:test", True)
- def test_deactivate_user_erase_false(self):
+ def test_deactivate_user_erase_false(self) -> None:
"""
Test deactivating a user and set `erase` to `false`
"""
@@ -1091,7 +1095,7 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase):
self._is_erased("@user:test", False)
- def test_deactivate_user_erase_true_no_profile(self):
+ def test_deactivate_user_erase_true_no_profile(self) -> None:
"""
Test deactivating a user and set `erase` to `true`
if user has no profile information (stored in the database table `profiles`).
@@ -1162,7 +1166,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
sync.register_servlets,
]
- def prepare(self, reactor, clock, hs):
+ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastore()
self.auth_handler = hs.get_auth_handler()
@@ -1185,7 +1189,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
self.url_prefix = "/_synapse/admin/v2/users/%s"
self.url_other_user = self.url_prefix % self.other_user
- def test_requester_is_no_admin(self):
+ def test_requester_is_no_admin(self) -> None:
"""
If the user is not a server admin, an error is returned.
"""
@@ -1210,7 +1214,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.FORBIDDEN, channel.code, msg=channel.json_body)
self.assertEqual("You are not a server admin", channel.json_body["error"])
- def test_user_does_not_exist(self):
+ def test_user_does_not_exist(self) -> None:
"""
Tests that a lookup for a user that does not exist returns a HTTPStatus.NOT_FOUND
"""
@@ -1224,7 +1228,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.NOT_FOUND, channel.code, msg=channel.json_body)
self.assertEqual("M_NOT_FOUND", channel.json_body["errcode"])
- def test_invalid_parameter(self):
+ def test_invalid_parameter(self) -> None:
"""
If parameters are invalid, an error is returned.
"""
@@ -1319,7 +1323,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(Codes.MISSING_PARAM, channel.json_body["errcode"])
- def test_get_user(self):
+ def test_get_user(self) -> None:
"""
Test a simple get of a user.
"""
@@ -1334,7 +1338,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
self.assertEqual("User", channel.json_body["displayname"])
self._check_fields(channel.json_body)
- def test_create_server_admin(self):
+ def test_create_server_admin(self) -> None:
"""
Check that a new admin user is created successfully.
"""
@@ -1383,7 +1387,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
self.assertEqual("mxc://fibble/wibble", channel.json_body["avatar_url"])
self._check_fields(channel.json_body)
- def test_create_user(self):
+ def test_create_user(self) -> None:
"""
Check that a new regular user is created successfully.
"""
@@ -1450,7 +1454,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
@override_config(
{"limit_usage_by_mau": True, "max_mau_value": 2, "mau_trial_days": 0}
)
- def test_create_user_mau_limit_reached_active_admin(self):
+ def test_create_user_mau_limit_reached_active_admin(self) -> None:
"""
Check that an admin can register a new user via the admin API
even if the MAU limit is reached.
@@ -1496,7 +1500,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
@override_config(
{"limit_usage_by_mau": True, "max_mau_value": 2, "mau_trial_days": 0}
)
- def test_create_user_mau_limit_reached_passive_admin(self):
+ def test_create_user_mau_limit_reached_passive_admin(self) -> None:
"""
Check that an admin can register a new user via the admin API
even if the MAU limit is reached.
@@ -1541,7 +1545,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
"public_baseurl": "https://example.com",
}
)
- def test_create_user_email_notif_for_new_users(self):
+ def test_create_user_email_notif_for_new_users(self) -> None:
"""
Check that a new regular user is created successfully and
got an email pusher.
@@ -1584,7 +1588,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
"public_baseurl": "https://example.com",
}
)
- def test_create_user_email_no_notif_for_new_users(self):
+ def test_create_user_email_no_notif_for_new_users(self) -> None:
"""
Check that a new regular user is created successfully and
got not an email pusher.
@@ -1615,7 +1619,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
pushers = list(pushers)
self.assertEqual(len(pushers), 0)
- def test_set_password(self):
+ def test_set_password(self) -> None:
"""
Test setting a new password for another user.
"""
@@ -1631,7 +1635,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self._check_fields(channel.json_body)
- def test_set_displayname(self):
+ def test_set_displayname(self) -> None:
"""
Test setting the displayname of another user.
"""
@@ -1659,7 +1663,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
self.assertEqual("@user:test", channel.json_body["name"])
self.assertEqual("foobar", channel.json_body["displayname"])
- def test_set_threepid(self):
+ def test_set_threepid(self) -> None:
"""
Test setting threepid for an other user.
"""
@@ -1740,7 +1744,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(0, len(channel.json_body["threepids"]))
self._check_fields(channel.json_body)
- def test_set_duplicate_threepid(self):
+ def test_set_duplicate_threepid(self) -> None:
"""
Test setting the same threepid for a second user.
First user loses and second user gets mapping of this threepid.
@@ -1827,7 +1831,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(0, len(channel.json_body["threepids"]))
self._check_fields(channel.json_body)
- def test_set_external_id(self):
+ def test_set_external_id(self) -> None:
"""
Test setting external id for an other user.
"""
@@ -1925,7 +1929,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
self.assertEqual("@user:test", channel.json_body["name"])
self.assertEqual(0, len(channel.json_body["external_ids"]))
- def test_set_duplicate_external_id(self):
+ def test_set_duplicate_external_id(self) -> None:
"""
Test that setting the same external id for a second user fails and
external id from user must not be changed.
@@ -2048,7 +2052,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
)
self._check_fields(channel.json_body)
- def test_deactivate_user(self):
+ def test_deactivate_user(self) -> None:
"""
Test deactivating another user.
"""
@@ -2113,7 +2117,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
self.assertNotIn("password_hash", channel.json_body)
@override_config({"user_directory": {"enabled": True, "search_all_users": True}})
- def test_change_name_deactivate_user_user_directory(self):
+ def test_change_name_deactivate_user_user_directory(self) -> None:
"""
Test change profile information of a deactivated user and
check that it does not appear in user directory
@@ -2156,7 +2160,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
profile = self.get_success(self.store.get_user_in_directory(self.other_user))
self.assertIsNone(profile)
- def test_reactivate_user(self):
+ def test_reactivate_user(self) -> None:
"""
Test reactivating another user.
"""
@@ -2189,7 +2193,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
self.assertNotIn("password_hash", channel.json_body)
@override_config({"password_config": {"localdb_enabled": False}})
- def test_reactivate_user_localdb_disabled(self):
+ def test_reactivate_user_localdb_disabled(self) -> None:
"""
Test reactivating another user when using SSO.
"""
@@ -2223,7 +2227,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
self.assertNotIn("password_hash", channel.json_body)
@override_config({"password_config": {"enabled": False}})
- def test_reactivate_user_password_disabled(self):
+ def test_reactivate_user_password_disabled(self) -> None:
"""
Test reactivating another user when using SSO.
"""
@@ -2256,7 +2260,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
# This key was removed intentionally. Ensure it is not accidentally re-included.
self.assertNotIn("password_hash", channel.json_body)
- def test_set_user_as_admin(self):
+ def test_set_user_as_admin(self) -> None:
"""
Test setting the admin flag on a user.
"""
@@ -2284,7 +2288,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
self.assertEqual("@user:test", channel.json_body["name"])
self.assertTrue(channel.json_body["admin"])
- def test_set_user_type(self):
+ def test_set_user_type(self) -> None:
"""
Test changing user type.
"""
@@ -2335,7 +2339,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
self.assertEqual("@user:test", channel.json_body["name"])
self.assertIsNone(channel.json_body["user_type"])
- def test_accidental_deactivation_prevention(self):
+ def test_accidental_deactivation_prevention(self) -> None:
"""
Ensure an account can't accidentally be deactivated by using a str value
for the deactivated body parameter
@@ -2418,7 +2422,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
# This key was removed intentionally. Ensure it is not accidentally re-included.
self.assertNotIn("password_hash", channel.json_body)
- def _check_fields(self, content: JsonDict):
+ def _check_fields(self, content: JsonDict) -> None:
"""Checks that the expected user attributes are present in content
Args:
@@ -2448,7 +2452,7 @@ class UserMembershipRestTestCase(unittest.HomeserverTestCase):
room.register_servlets,
]
- def prepare(self, reactor, clock, hs):
+ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.admin_user = self.register_user("admin", "pass", admin=True)
self.admin_user_tok = self.login("admin", "pass")
@@ -2457,7 +2461,7 @@ class UserMembershipRestTestCase(unittest.HomeserverTestCase):
self.other_user
)
- def test_no_auth(self):
+ def test_no_auth(self) -> None:
"""
Try to list rooms of an user without authentication.
"""
@@ -2466,7 +2470,7 @@ class UserMembershipRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.UNAUTHORIZED, channel.code, msg=channel.json_body)
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
- def test_requester_is_no_admin(self):
+ def test_requester_is_no_admin(self) -> None:
"""
If the user is not a server admin, an error is returned.
"""
@@ -2481,7 +2485,7 @@ class UserMembershipRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.FORBIDDEN, channel.code, msg=channel.json_body)
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
- def test_user_does_not_exist(self):
+ def test_user_does_not_exist(self) -> None:
"""
Tests that a lookup for a user that does not exist returns an empty list
"""
@@ -2496,7 +2500,7 @@ class UserMembershipRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(0, channel.json_body["total"])
self.assertEqual(0, len(channel.json_body["joined_rooms"]))
- def test_user_is_not_local(self):
+ def test_user_is_not_local(self) -> None:
"""
Tests that a lookup for a user that is not a local and participates in no conversation returns an empty list
"""
@@ -2512,7 +2516,7 @@ class UserMembershipRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(0, channel.json_body["total"])
self.assertEqual(0, len(channel.json_body["joined_rooms"]))
- def test_no_memberships(self):
+ def test_no_memberships(self) -> None:
"""
Tests that a normal lookup for rooms is successfully
if user has no memberships
@@ -2528,7 +2532,7 @@ class UserMembershipRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(0, channel.json_body["total"])
self.assertEqual(0, len(channel.json_body["joined_rooms"]))
- def test_get_rooms(self):
+ def test_get_rooms(self) -> None:
"""
Tests that a normal lookup for rooms is successfully
"""
@@ -2549,7 +2553,7 @@ class UserMembershipRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(number_rooms, channel.json_body["total"])
self.assertEqual(number_rooms, len(channel.json_body["joined_rooms"]))
- def test_get_rooms_with_nonlocal_user(self):
+ def test_get_rooms_with_nonlocal_user(self) -> None:
"""
Tests that a normal lookup for rooms is successful with a non-local user
"""
@@ -2604,7 +2608,7 @@ class PushersRestTestCase(unittest.HomeserverTestCase):
login.register_servlets,
]
- def prepare(self, reactor, clock, hs):
+ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastore()
self.admin_user = self.register_user("admin", "pass", admin=True)
@@ -2615,7 +2619,7 @@ class PushersRestTestCase(unittest.HomeserverTestCase):
self.other_user
)
- def test_no_auth(self):
+ def test_no_auth(self) -> None:
"""
Try to list pushers of an user without authentication.
"""
@@ -2624,7 +2628,7 @@ class PushersRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.UNAUTHORIZED, channel.code, msg=channel.json_body)
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
- def test_requester_is_no_admin(self):
+ def test_requester_is_no_admin(self) -> None:
"""
If the user is not a server admin, an error is returned.
"""
@@ -2639,7 +2643,7 @@ class PushersRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.FORBIDDEN, channel.code, msg=channel.json_body)
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
- def test_user_does_not_exist(self):
+ def test_user_does_not_exist(self) -> None:
"""
Tests that a lookup for a user that does not exist returns a HTTPStatus.NOT_FOUND
"""
@@ -2653,7 +2657,7 @@ class PushersRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.NOT_FOUND, channel.code, msg=channel.json_body)
self.assertEqual(Codes.NOT_FOUND, channel.json_body["errcode"])
- def test_user_is_not_local(self):
+ def test_user_is_not_local(self) -> None:
"""
Tests that a lookup for a user that is not a local returns a HTTPStatus.BAD_REQUEST
"""
@@ -2668,7 +2672,7 @@ class PushersRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual("Can only look up local users", channel.json_body["error"])
- def test_get_pushers(self):
+ def test_get_pushers(self) -> None:
"""
Tests that a normal lookup for pushers is successfully
"""
@@ -2732,7 +2736,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase):
login.register_servlets,
]
- def prepare(self, reactor, clock, hs):
+ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastore()
self.media_repo = hs.get_media_repository_resource()
self.filepaths = MediaFilePaths(hs.config.media.media_store_path)
@@ -2746,7 +2750,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase):
)
@parameterized.expand(["GET", "DELETE"])
- def test_no_auth(self, method: str):
+ def test_no_auth(self, method: str) -> None:
"""Try to list media of an user without authentication."""
channel = self.make_request(method, self.url, {})
@@ -2754,7 +2758,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
@parameterized.expand(["GET", "DELETE"])
- def test_requester_is_no_admin(self, method: str):
+ def test_requester_is_no_admin(self, method: str) -> None:
"""If the user is not a server admin, an error is returned."""
other_user_token = self.login("user", "pass")
@@ -2768,7 +2772,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
@parameterized.expand(["GET", "DELETE"])
- def test_user_does_not_exist(self, method: str):
+ def test_user_does_not_exist(self, method: str) -> None:
"""Tests that a lookup for a user that does not exist returns a HTTPStatus.NOT_FOUND"""
url = "/_synapse/admin/v1/users/@unknown_person:test/media"
channel = self.make_request(
@@ -2781,7 +2785,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(Codes.NOT_FOUND, channel.json_body["errcode"])
@parameterized.expand(["GET", "DELETE"])
- def test_user_is_not_local(self, method: str):
+ def test_user_is_not_local(self, method: str) -> None:
"""Tests that a lookup for a user that is not a local returns a HTTPStatus.BAD_REQUEST"""
url = "/_synapse/admin/v1/users/@unknown_person:unknown_domain/media"
@@ -2794,7 +2798,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual("Can only look up local users", channel.json_body["error"])
- def test_limit_GET(self):
+ def test_limit_GET(self) -> None:
"""Testing list of media with limit"""
number_media = 20
@@ -2813,7 +2817,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(channel.json_body["next_token"], 5)
self._check_fields(channel.json_body["media"])
- def test_limit_DELETE(self):
+ def test_limit_DELETE(self) -> None:
"""Testing delete of media with limit"""
number_media = 20
@@ -2830,7 +2834,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(channel.json_body["total"], 5)
self.assertEqual(len(channel.json_body["deleted_media"]), 5)
- def test_from_GET(self):
+ def test_from_GET(self) -> None:
"""Testing list of media with a defined starting point (from)"""
number_media = 20
@@ -2849,7 +2853,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase):
self.assertNotIn("next_token", channel.json_body)
self._check_fields(channel.json_body["media"])
- def test_from_DELETE(self):
+ def test_from_DELETE(self) -> None:
"""Testing delete of media with a defined starting point (from)"""
number_media = 20
@@ -2866,7 +2870,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(channel.json_body["total"], 15)
self.assertEqual(len(channel.json_body["deleted_media"]), 15)
- def test_limit_and_from_GET(self):
+ def test_limit_and_from_GET(self) -> None:
"""Testing list of media with a defined starting point and limit"""
number_media = 20
@@ -2885,7 +2889,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(len(channel.json_body["media"]), 10)
self._check_fields(channel.json_body["media"])
- def test_limit_and_from_DELETE(self):
+ def test_limit_and_from_DELETE(self) -> None:
"""Testing delete of media with a defined starting point and limit"""
number_media = 20
@@ -2903,7 +2907,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(len(channel.json_body["deleted_media"]), 10)
@parameterized.expand(["GET", "DELETE"])
- def test_invalid_parameter(self, method: str):
+ def test_invalid_parameter(self, method: str) -> None:
"""If parameters are invalid, an error is returned."""
# unkown order_by
channel = self.make_request(
@@ -2945,7 +2949,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"])
- def test_next_token(self):
+ def test_next_token(self) -> None:
"""
Testing that `next_token` appears at the right place
@@ -3010,7 +3014,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(len(channel.json_body["media"]), 1)
self.assertNotIn("next_token", channel.json_body)
- def test_user_has_no_media_GET(self):
+ def test_user_has_no_media_GET(self) -> None:
"""
Tests that a normal lookup for media is successfully
if user has no media created
@@ -3026,7 +3030,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(0, channel.json_body["total"])
self.assertEqual(0, len(channel.json_body["media"]))
- def test_user_has_no_media_DELETE(self):
+ def test_user_has_no_media_DELETE(self) -> None:
"""
Tests that a delete is successful if user has no media
"""
@@ -3041,7 +3045,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(0, channel.json_body["total"])
self.assertEqual(0, len(channel.json_body["deleted_media"]))
- def test_get_media(self):
+ def test_get_media(self) -> None:
"""Tests that a normal lookup for media is successful"""
number_media = 5
@@ -3060,7 +3064,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase):
self.assertNotIn("next_token", channel.json_body)
self._check_fields(channel.json_body["media"])
- def test_delete_media(self):
+ def test_delete_media(self) -> None:
"""Tests that a normal delete of media is successful"""
number_media = 5
@@ -3089,7 +3093,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase):
for local_path in local_paths:
self.assertFalse(os.path.exists(local_path))
- def test_order_by(self):
+ def test_order_by(self) -> None:
"""
Testing order list with parameter `order_by`
"""
@@ -3252,7 +3256,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase):
return media_id
- def _check_fields(self, content: List[JsonDict]):
+ def _check_fields(self, content: List[JsonDict]) -> None:
"""Checks that the expected user attributes are present in content
Args:
content: List that is checked for content
@@ -3272,7 +3276,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase):
expected_media_list: List[str],
order_by: Optional[str],
dir: Optional[str] = None,
- ):
+ ) -> None:
"""Request the list of media in a certain order. Assert that order is what
we expect
Args:
@@ -3312,7 +3316,7 @@ class UserTokenRestTestCase(unittest.HomeserverTestCase):
logout.register_servlets,
]
- def prepare(self, reactor, clock, hs):
+ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastore()
self.admin_user = self.register_user("admin", "pass", admin=True)
@@ -3331,14 +3335,14 @@ class UserTokenRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
return channel.json_body["access_token"]
- def test_no_auth(self):
+ def test_no_auth(self) -> None:
"""Try to login as a user without authentication."""
channel = self.make_request("POST", self.url, b"{}")
self.assertEqual(HTTPStatus.UNAUTHORIZED, channel.code, msg=channel.json_body)
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
- def test_not_admin(self):
+ def test_not_admin(self) -> None:
"""Try to login as a user as a non-admin user."""
channel = self.make_request(
"POST", self.url, b"{}", access_token=self.other_user_tok
@@ -3346,7 +3350,7 @@ class UserTokenRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.FORBIDDEN, channel.code, msg=channel.json_body)
- def test_send_event(self):
+ def test_send_event(self) -> None:
"""Test that sending event as a user works."""
# Create a room.
room_id = self.helper.create_room_as(self.other_user, tok=self.other_user_tok)
@@ -3360,7 +3364,7 @@ class UserTokenRestTestCase(unittest.HomeserverTestCase):
event = self.get_success(self.store.get_event(event_id))
self.assertEqual(event.sender, self.other_user)
- def test_devices(self):
+ def test_devices(self) -> None:
"""Tests that logging in as a user doesn't create a new device for them."""
# Login in as the user
self._get_token()
@@ -3374,7 +3378,7 @@ class UserTokenRestTestCase(unittest.HomeserverTestCase):
# We should only see the one device (from the login in `prepare`)
self.assertEqual(len(channel.json_body["devices"]), 1)
- def test_logout(self):
+ def test_logout(self) -> None:
"""Test that calling `/logout` with the token works."""
# Login in as the user
puppet_token = self._get_token()
@@ -3397,7 +3401,7 @@ class UserTokenRestTestCase(unittest.HomeserverTestCase):
)
self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
- def test_user_logout_all(self):
+ def test_user_logout_all(self) -> None:
"""Tests that the target user calling `/logout/all` does *not* expire
the token.
"""
@@ -3424,7 +3428,7 @@ class UserTokenRestTestCase(unittest.HomeserverTestCase):
)
self.assertEqual(HTTPStatus.UNAUTHORIZED, channel.code, msg=channel.json_body)
- def test_admin_logout_all(self):
+ def test_admin_logout_all(self) -> None:
"""Tests that the admin user calling `/logout/all` does expire the
token.
"""
@@ -3464,7 +3468,7 @@ class UserTokenRestTestCase(unittest.HomeserverTestCase):
"form_secret": "123secret",
}
)
- def test_consent(self):
+ def test_consent(self) -> None:
"""Test that sending a message is not subject to the privacy policies."""
# Have the admin user accept the terms.
self.get_success(self.store.user_set_consent_version(self.admin_user, "1.0"))
@@ -3492,7 +3496,7 @@ class UserTokenRestTestCase(unittest.HomeserverTestCase):
@override_config(
{"limit_usage_by_mau": True, "max_mau_value": 1, "mau_trial_days": 0}
)
- def test_mau_limit(self):
+ def test_mau_limit(self) -> None:
# Create a room as the admin user. This will bump the monthly active users to 1.
room_id = self.helper.create_room_as(self.admin_user, tok=self.admin_user_tok)
@@ -3524,14 +3528,14 @@ class WhoisRestTestCase(unittest.HomeserverTestCase):
login.register_servlets,
]
- def prepare(self, reactor, clock, hs):
+ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.admin_user = self.register_user("admin", "pass", admin=True)
self.admin_user_tok = self.login("admin", "pass")
self.other_user = self.register_user("user", "pass")
- self.url = self.url_prefix % self.other_user
+ self.url = self.url_prefix % self.other_user # type: ignore[attr-defined]
- def test_no_auth(self):
+ def test_no_auth(self) -> None:
"""
Try to get information of an user without authentication.
"""
@@ -3539,7 +3543,7 @@ class WhoisRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.UNAUTHORIZED, channel.code, msg=channel.json_body)
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
- def test_requester_is_not_admin(self):
+ def test_requester_is_not_admin(self) -> None:
"""
If the user is not a server admin, an error is returned.
"""
@@ -3554,11 +3558,11 @@ class WhoisRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.FORBIDDEN, channel.code, msg=channel.json_body)
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
- def test_user_is_not_local(self):
+ def test_user_is_not_local(self) -> None:
"""
Tests that a lookup for a user that is not a local returns a HTTPStatus.BAD_REQUEST
"""
- url = self.url_prefix % "@unknown_person:unknown_domain"
+ url = self.url_prefix % "@unknown_person:unknown_domain" # type: ignore[attr-defined]
channel = self.make_request(
"GET",
@@ -3568,7 +3572,7 @@ class WhoisRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual("Can only whois a local user", channel.json_body["error"])
- def test_get_whois_admin(self):
+ def test_get_whois_admin(self) -> None:
"""
The lookup should succeed for an admin.
"""
@@ -3581,7 +3585,7 @@ class WhoisRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(self.other_user, channel.json_body["user_id"])
self.assertIn("devices", channel.json_body)
- def test_get_whois_user(self):
+ def test_get_whois_user(self) -> None:
"""
The lookup should succeed for a normal user looking up their own information.
"""
@@ -3604,7 +3608,7 @@ class ShadowBanRestTestCase(unittest.HomeserverTestCase):
login.register_servlets,
]
- def prepare(self, reactor, clock, hs):
+ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastore()
self.admin_user = self.register_user("admin", "pass", admin=True)
@@ -3617,7 +3621,7 @@ class ShadowBanRestTestCase(unittest.HomeserverTestCase):
)
@parameterized.expand(["POST", "DELETE"])
- def test_no_auth(self, method: str):
+ def test_no_auth(self, method: str) -> None:
"""
Try to get information of an user without authentication.
"""
@@ -3626,7 +3630,7 @@ class ShadowBanRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
@parameterized.expand(["POST", "DELETE"])
- def test_requester_is_not_admin(self, method: str):
+ def test_requester_is_not_admin(self, method: str) -> None:
"""
If the user is not a server admin, an error is returned.
"""
@@ -3637,7 +3641,7 @@ class ShadowBanRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
@parameterized.expand(["POST", "DELETE"])
- def test_user_is_not_local(self, method: str):
+ def test_user_is_not_local(self, method: str) -> None:
"""
Tests that shadow-banning for a user that is not a local returns a HTTPStatus.BAD_REQUEST
"""
@@ -3646,7 +3650,7 @@ class ShadowBanRestTestCase(unittest.HomeserverTestCase):
channel = self.make_request(method, url, access_token=self.admin_user_tok)
self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
- def test_success(self):
+ def test_success(self) -> None:
"""
Shadow-banning should succeed for an admin.
"""
@@ -3682,7 +3686,7 @@ class RateLimitTestCase(unittest.HomeserverTestCase):
login.register_servlets,
]
- def prepare(self, reactor, clock, hs):
+ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastore()
self.admin_user = self.register_user("admin", "pass", admin=True)
@@ -3695,7 +3699,7 @@ class RateLimitTestCase(unittest.HomeserverTestCase):
)
@parameterized.expand(["GET", "POST", "DELETE"])
- def test_no_auth(self, method: str):
+ def test_no_auth(self, method: str) -> None:
"""
Try to get information of a user without authentication.
"""
@@ -3705,7 +3709,7 @@ class RateLimitTestCase(unittest.HomeserverTestCase):
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
@parameterized.expand(["GET", "POST", "DELETE"])
- def test_requester_is_no_admin(self, method: str):
+ def test_requester_is_no_admin(self, method: str) -> None:
"""
If the user is not a server admin, an error is returned.
"""
@@ -3721,7 +3725,7 @@ class RateLimitTestCase(unittest.HomeserverTestCase):
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
@parameterized.expand(["GET", "POST", "DELETE"])
- def test_user_does_not_exist(self, method: str):
+ def test_user_does_not_exist(self, method: str) -> None:
"""
Tests that a lookup for a user that does not exist returns a HTTPStatus.NOT_FOUND
"""
@@ -3743,7 +3747,7 @@ class RateLimitTestCase(unittest.HomeserverTestCase):
("DELETE", "Only local users can be ratelimited"),
]
)
- def test_user_is_not_local(self, method: str, error_msg: str):
+ def test_user_is_not_local(self, method: str, error_msg: str) -> None:
"""
Tests that a lookup for a user that is not a local returns a HTTPStatus.BAD_REQUEST
"""
@@ -3760,7 +3764,7 @@ class RateLimitTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(error_msg, channel.json_body["error"])
- def test_invalid_parameter(self):
+ def test_invalid_parameter(self) -> None:
"""
If parameters are invalid, an error is returned.
"""
@@ -3808,7 +3812,7 @@ class RateLimitTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"])
- def test_return_zero_when_null(self):
+ def test_return_zero_when_null(self) -> None:
"""
If values in database are `null` API should return an int `0`
"""
@@ -3834,7 +3838,7 @@ class RateLimitTestCase(unittest.HomeserverTestCase):
self.assertEqual(0, channel.json_body["messages_per_second"])
self.assertEqual(0, channel.json_body["burst_count"])
- def test_success(self):
+ def test_success(self) -> None:
"""
Rate-limiting (set/update/delete) should succeed for an admin.
"""
@@ -3908,7 +3912,7 @@ class AccountDataTestCase(unittest.HomeserverTestCase):
login.register_servlets,
]
- def prepare(self, reactor, clock, hs) -> None:
+ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastore()
self.admin_user = self.register_user("admin", "pass", admin=True)
|