diff --git a/tests/rest/admin/test_admin.py b/tests/rest/admin/test_admin.py
index 3adadcb46b..849d00ab4d 100644
--- a/tests/rest/admin/test_admin.py
+++ b/tests/rest/admin/test_admin.py
@@ -12,18 +12,20 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import os
import urllib.parse
from http import HTTPStatus
-from unittest.mock import Mock
+from typing import List
-from twisted.internet.defer import Deferred
+from parameterized import parameterized
+
+from twisted.test.proto_helpers import MemoryReactor
import synapse.rest.admin
from synapse.http.server import JsonResource
-from synapse.logging.context import make_deferred_yieldable
from synapse.rest.admin import VersionServlet
from synapse.rest.client import groups, login, room
+from synapse.server import HomeServer
+from synapse.util import Clock
from tests import unittest
from tests.server import FakeSite, make_request
@@ -33,12 +35,12 @@ from tests.test_utils import SMALL_PNG
class VersionTestCase(unittest.HomeserverTestCase):
url = "/_synapse/admin/v1/server_version"
- def create_test_resource(self):
+ def create_test_resource(self) -> JsonResource:
resource = JsonResource(self.hs)
VersionServlet(self.hs).register(resource)
return resource
- def test_version_string(self):
+ def test_version_string(self) -> None:
channel = self.make_request("GET", self.url, shorthand=False)
self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
@@ -54,14 +56,14 @@ class DeleteGroupTestCase(unittest.HomeserverTestCase):
groups.register_servlets,
]
- def prepare(self, reactor, clock, hs):
+ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.admin_user = self.register_user("admin", "pass", admin=True)
self.admin_user_tok = self.login("admin", "pass")
self.other_user = self.register_user("user", "pass")
self.other_user_token = self.login("user", "pass")
- def test_delete_group(self):
+ def test_delete_group(self) -> None:
# Create a new group
channel = self.make_request(
"POST",
@@ -112,7 +114,7 @@ class DeleteGroupTestCase(unittest.HomeserverTestCase):
self.assertNotIn(group_id, self._get_groups_user_is_in(self.admin_user_tok))
self.assertNotIn(group_id, self._get_groups_user_is_in(self.other_user_token))
- def _check_group(self, group_id, expect_code):
+ def _check_group(self, group_id: str, expect_code: int) -> None:
"""Assert that trying to fetch the given group results in the given
HTTP status code
"""
@@ -124,7 +126,7 @@ class DeleteGroupTestCase(unittest.HomeserverTestCase):
self.assertEqual(expect_code, channel.code, msg=channel.json_body)
- def _get_groups_user_is_in(self, access_token):
+ def _get_groups_user_is_in(self, access_token: str) -> List[str]:
"""Returns the list of groups the user is in (given their access token)"""
channel = self.make_request("GET", b"/joined_groups", access_token=access_token)
@@ -143,59 +145,15 @@ class QuarantineMediaTestCase(unittest.HomeserverTestCase):
room.register_servlets,
]
- def prepare(self, reactor, clock, hs):
+ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
# Allow for uploading and downloading to/from the media repo
self.media_repo = hs.get_media_repository_resource()
self.download_resource = self.media_repo.children[b"download"]
self.upload_resource = self.media_repo.children[b"upload"]
- def make_homeserver(self, reactor, clock):
-
- self.fetches = []
-
- async def get_file(destination, path, output_stream, args=None, max_size=None):
- """
- Returns tuple[int,dict,str,int] of file length, response headers,
- absolute URI, and response code.
- """
-
- def write_to(r):
- data, response = r
- output_stream.write(data)
- return response
-
- d = Deferred()
- d.addCallback(write_to)
- self.fetches.append((d, destination, path, args))
- return await make_deferred_yieldable(d)
-
- client = Mock()
- client.get_file = get_file
-
- self.storage_path = self.mktemp()
- self.media_store_path = self.mktemp()
- os.mkdir(self.storage_path)
- os.mkdir(self.media_store_path)
-
- config = self.default_config()
- config["media_store_path"] = self.media_store_path
- config["thumbnail_requirements"] = {}
- config["max_image_pixels"] = 2000000
-
- provider_config = {
- "module": "synapse.rest.media.v1.storage_provider.FileStorageProviderBackend",
- "store_local": True,
- "store_synchronous": False,
- "store_remote": True,
- "config": {"directory": self.storage_path},
- }
- config["media_storage_providers"] = [provider_config]
-
- hs = self.setup_test_homeserver(config=config, federation_http_client=client)
-
- return hs
-
- def _ensure_quarantined(self, admin_user_tok, server_and_media_id):
+ def _ensure_quarantined(
+ self, admin_user_tok: str, server_and_media_id: str
+ ) -> None:
"""Ensure a piece of media is quarantined when trying to access it."""
channel = make_request(
self.reactor,
@@ -216,12 +174,18 @@ class QuarantineMediaTestCase(unittest.HomeserverTestCase):
),
)
- def test_quarantine_media_requires_admin(self):
+ @parameterized.expand(
+ [
+ # Attempt quarantine media APIs as non-admin
+ "/_synapse/admin/v1/media/quarantine/example.org/abcde12345",
+ # And the roomID/userID endpoint
+ "/_synapse/admin/v1/room/!room%3Aexample.com/media/quarantine",
+ ]
+ )
+ def test_quarantine_media_requires_admin(self, url: str) -> None:
self.register_user("nonadmin", "pass", admin=False)
non_admin_user_tok = self.login("nonadmin", "pass")
- # Attempt quarantine media APIs as non-admin
- url = "/_synapse/admin/v1/media/quarantine/example.org/abcde12345"
channel = self.make_request(
"POST",
url.encode("ascii"),
@@ -235,22 +199,7 @@ class QuarantineMediaTestCase(unittest.HomeserverTestCase):
msg="Expected forbidden on quarantining media as a non-admin",
)
- # And the roomID/userID endpoint
- url = "/_synapse/admin/v1/room/!room%3Aexample.com/media/quarantine"
- channel = self.make_request(
- "POST",
- url.encode("ascii"),
- access_token=non_admin_user_tok,
- )
-
- # Expect a forbidden error
- self.assertEqual(
- HTTPStatus.FORBIDDEN,
- channel.code,
- msg="Expected forbidden on quarantining media as a non-admin",
- )
-
- def test_quarantine_media_by_id(self):
+ def test_quarantine_media_by_id(self) -> None:
self.register_user("id_admin", "pass", admin=True)
admin_user_tok = self.login("id_admin", "pass")
@@ -295,7 +244,15 @@ class QuarantineMediaTestCase(unittest.HomeserverTestCase):
# Attempt to access the media
self._ensure_quarantined(admin_user_tok, server_name_and_media_id)
- def test_quarantine_all_media_in_room(self, override_url_template=None):
+ @parameterized.expand(
+ [
+ # regular API path
+ "/_synapse/admin/v1/room/%s/media/quarantine",
+ # deprecated API path
+ "/_synapse/admin/v1/quarantine_media/%s",
+ ]
+ )
+ def test_quarantine_all_media_in_room(self, url: str) -> None:
self.register_user("room_admin", "pass", admin=True)
admin_user_tok = self.login("room_admin", "pass")
@@ -333,16 +290,9 @@ class QuarantineMediaTestCase(unittest.HomeserverTestCase):
tok=non_admin_user_tok,
)
- # Quarantine all media in the room
- if override_url_template:
- url = override_url_template % urllib.parse.quote(room_id)
- else:
- url = "/_synapse/admin/v1/room/%s/media/quarantine" % urllib.parse.quote(
- room_id
- )
channel = self.make_request(
"POST",
- url,
+ url % urllib.parse.quote(room_id),
access_token=admin_user_tok,
)
self.pump(1.0)
@@ -359,11 +309,7 @@ class QuarantineMediaTestCase(unittest.HomeserverTestCase):
self._ensure_quarantined(admin_user_tok, server_and_media_id_1)
self._ensure_quarantined(admin_user_tok, server_and_media_id_2)
- def test_quarantine_all_media_in_room_deprecated_api_path(self):
- # Perform the above test with the deprecated API path
- self.test_quarantine_all_media_in_room("/_synapse/admin/v1/quarantine_media/%s")
-
- def test_quarantine_all_media_by_user(self):
+ def test_quarantine_all_media_by_user(self) -> None:
self.register_user("user_admin", "pass", admin=True)
admin_user_tok = self.login("user_admin", "pass")
@@ -401,7 +347,7 @@ class QuarantineMediaTestCase(unittest.HomeserverTestCase):
self._ensure_quarantined(admin_user_tok, server_and_media_id_1)
self._ensure_quarantined(admin_user_tok, server_and_media_id_2)
- def test_cannot_quarantine_safe_media(self):
+ def test_cannot_quarantine_safe_media(self) -> None:
self.register_user("user_admin", "pass", admin=True)
admin_user_tok = self.login("user_admin", "pass")
@@ -475,7 +421,7 @@ class PurgeHistoryTestCase(unittest.HomeserverTestCase):
room.register_servlets,
]
- def prepare(self, reactor, clock, hs):
+ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.admin_user = self.register_user("admin", "pass", admin=True)
self.admin_user_tok = self.login("admin", "pass")
@@ -488,7 +434,7 @@ class PurgeHistoryTestCase(unittest.HomeserverTestCase):
self.url = f"/_synapse/admin/v1/purge_history/{self.room_id}"
self.url_status = "/_synapse/admin/v1/purge_history_status/"
- def test_purge_history(self):
+ def test_purge_history(self) -> None:
"""
Simple test of purge history API.
Test only that is is possible to call, get status HTTPStatus.OK and purge_id.
diff --git a/tests/rest/admin/test_user.py b/tests/rest/admin/test_user.py
index 9711405735..272637e965 100644
--- a/tests/rest/admin/test_user.py
+++ b/tests/rest/admin/test_user.py
@@ -23,13 +23,17 @@ from unittest.mock import Mock, patch
from parameterized import parameterized, parameterized_class
+from twisted.test.proto_helpers import MemoryReactor
+
import synapse.rest.admin
from synapse.api.constants import UserTypes
from synapse.api.errors import Codes, HttpResponseException, ResourceLimitError
from synapse.api.room_versions import RoomVersions
from synapse.rest.client import devices, login, logout, profile, room, sync
from synapse.rest.media.v1.filepath import MediaFilePaths
+from synapse.server import HomeServer
from synapse.types import JsonDict, UserID
+from synapse.util import Clock
from tests import unittest
from tests.server import FakeSite, make_request
@@ -44,7 +48,7 @@ class UserRegisterTestCase(unittest.HomeserverTestCase):
profile.register_servlets,
]
- def make_homeserver(self, reactor, clock):
+ def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
self.url = "/_synapse/admin/v1/register"
@@ -61,12 +65,12 @@ class UserRegisterTestCase(unittest.HomeserverTestCase):
self.hs.config.registration.registration_shared_secret = "shared"
- self.hs.get_media_repository = Mock()
- self.hs.get_deactivate_account_handler = Mock()
+ self.hs.get_media_repository = Mock() # type: ignore[assignment]
+ self.hs.get_deactivate_account_handler = Mock() # type: ignore[assignment]
return self.hs
- def test_disabled(self):
+ def test_disabled(self) -> None:
"""
If there is no shared secret, registration through this method will be
prevented.
@@ -80,7 +84,7 @@ class UserRegisterTestCase(unittest.HomeserverTestCase):
"Shared secret registration is not enabled", channel.json_body["error"]
)
- def test_get_nonce(self):
+ def test_get_nonce(self) -> None:
"""
Calling GET on the endpoint will return a randomised nonce, using the
homeserver's secrets provider.
@@ -93,7 +97,7 @@ class UserRegisterTestCase(unittest.HomeserverTestCase):
self.assertEqual(channel.json_body, {"nonce": "abcd"})
- def test_expired_nonce(self):
+ def test_expired_nonce(self) -> None:
"""
Calling GET on the endpoint will return a randomised nonce, which will
only last for SALT_TIMEOUT (60s).
@@ -118,7 +122,7 @@ class UserRegisterTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual("unrecognised nonce", channel.json_body["error"])
- def test_register_incorrect_nonce(self):
+ def test_register_incorrect_nonce(self) -> None:
"""
Only the provided nonce can be used, as it's checked in the MAC.
"""
@@ -141,7 +145,7 @@ class UserRegisterTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.FORBIDDEN, channel.code, msg=channel.json_body)
self.assertEqual("HMAC incorrect", channel.json_body["error"])
- def test_register_correct_nonce(self):
+ def test_register_correct_nonce(self) -> None:
"""
When the correct nonce is provided, and the right key is provided, the
user is registered.
@@ -168,7 +172,7 @@ class UserRegisterTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual("@bob:test", channel.json_body["user_id"])
- def test_nonce_reuse(self):
+ def test_nonce_reuse(self) -> None:
"""
A valid unrecognised nonce.
"""
@@ -197,14 +201,14 @@ class UserRegisterTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual("unrecognised nonce", channel.json_body["error"])
- def test_missing_parts(self):
+ def test_missing_parts(self) -> None:
"""
Synapse will complain if you don't give nonce, username, password, and
mac. Admin and user_types are optional. Additional checks are done for length
and type.
"""
- def nonce():
+ def nonce() -> str:
channel = self.make_request("GET", self.url)
return channel.json_body["nonce"]
@@ -297,7 +301,7 @@ class UserRegisterTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual("Invalid user type", channel.json_body["error"])
- def test_displayname(self):
+ def test_displayname(self) -> None:
"""
Test that displayname of new user is set
"""
@@ -400,7 +404,7 @@ class UserRegisterTestCase(unittest.HomeserverTestCase):
@override_config(
{"limit_usage_by_mau": True, "max_mau_value": 2, "mau_trial_days": 0}
)
- def test_register_mau_limit_reached(self):
+ def test_register_mau_limit_reached(self) -> None:
"""
Check we can register a user via the shared secret registration API
even if the MAU limit is reached.
@@ -450,13 +454,13 @@ class UsersListTestCase(unittest.HomeserverTestCase):
]
url = "/_synapse/admin/v2/users"
- def prepare(self, reactor, clock, hs):
+ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastore()
self.admin_user = self.register_user("admin", "pass", admin=True)
self.admin_user_tok = self.login("admin", "pass")
- def test_no_auth(self):
+ def test_no_auth(self) -> None:
"""
Try to list users without authentication.
"""
@@ -465,7 +469,7 @@ class UsersListTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.UNAUTHORIZED, channel.code, msg=channel.json_body)
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
- def test_requester_is_no_admin(self):
+ def test_requester_is_no_admin(self) -> None:
"""
If the user is not a server admin, an error is returned.
"""
@@ -477,7 +481,7 @@ class UsersListTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.FORBIDDEN, channel.code, msg=channel.json_body)
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
- def test_all_users(self):
+ def test_all_users(self) -> None:
"""
List all users, including deactivated users.
"""
@@ -497,7 +501,7 @@ class UsersListTestCase(unittest.HomeserverTestCase):
# Check that all fields are available
self._check_fields(channel.json_body["users"])
- def test_search_term(self):
+ def test_search_term(self) -> None:
"""Test that searching for a users works correctly"""
def _search_test(
@@ -505,7 +509,7 @@ class UsersListTestCase(unittest.HomeserverTestCase):
search_term: str,
search_field: Optional[str] = "name",
expected_http_code: Optional[int] = HTTPStatus.OK,
- ):
+ ) -> None:
"""Search for a user and check that the returned user's id is a match
Args:
@@ -575,7 +579,7 @@ class UsersListTestCase(unittest.HomeserverTestCase):
_search_test(None, "foo", "user_id")
_search_test(None, "bar", "user_id")
- def test_invalid_parameter(self):
+ def test_invalid_parameter(self) -> None:
"""
If parameters are invalid, an error is returned.
"""
@@ -640,7 +644,7 @@ class UsersListTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"])
- def test_limit(self):
+ def test_limit(self) -> None:
"""
Testing list of users with limit
"""
@@ -661,7 +665,7 @@ class UsersListTestCase(unittest.HomeserverTestCase):
self.assertEqual(channel.json_body["next_token"], "5")
self._check_fields(channel.json_body["users"])
- def test_from(self):
+ def test_from(self) -> None:
"""
Testing list of users with a defined starting point (from)
"""
@@ -682,7 +686,7 @@ class UsersListTestCase(unittest.HomeserverTestCase):
self.assertNotIn("next_token", channel.json_body)
self._check_fields(channel.json_body["users"])
- def test_limit_and_from(self):
+ def test_limit_and_from(self) -> None:
"""
Testing list of users with a defined starting point and limit
"""
@@ -703,7 +707,7 @@ class UsersListTestCase(unittest.HomeserverTestCase):
self.assertEqual(len(channel.json_body["users"]), 10)
self._check_fields(channel.json_body["users"])
- def test_next_token(self):
+ def test_next_token(self) -> None:
"""
Testing that `next_token` appears at the right place
"""
@@ -765,7 +769,7 @@ class UsersListTestCase(unittest.HomeserverTestCase):
self.assertEqual(len(channel.json_body["users"]), 1)
self.assertNotIn("next_token", channel.json_body)
- def test_order_by(self):
+ def test_order_by(self) -> None:
"""
Testing order list with parameter `order_by`
"""
@@ -843,7 +847,7 @@ class UsersListTestCase(unittest.HomeserverTestCase):
expected_user_list: List[str],
order_by: Optional[str],
dir: Optional[str] = None,
- ):
+ ) -> None:
"""Request the list of users in a certain order. Assert that order is what
we expect
Args:
@@ -870,7 +874,7 @@ class UsersListTestCase(unittest.HomeserverTestCase):
self.assertEqual(expected_user_list, returned_order)
self._check_fields(channel.json_body["users"])
- def _check_fields(self, content: List[JsonDict]):
+ def _check_fields(self, content: List[JsonDict]) -> None:
"""Checks that the expected user attributes are present in content
Args:
content: List that is checked for content
@@ -886,7 +890,7 @@ class UsersListTestCase(unittest.HomeserverTestCase):
self.assertIn("avatar_url", u)
self.assertIn("creation_ts", u)
- def _create_users(self, number_users: int):
+ def _create_users(self, number_users: int) -> None:
"""
Create a number of users
Args:
@@ -908,7 +912,7 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase):
login.register_servlets,
]
- def prepare(self, reactor, clock, hs):
+ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastore()
self.admin_user = self.register_user("admin", "pass", admin=True)
@@ -931,7 +935,7 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase):
self.store.user_add_threepid("@user:test", "email", "foo@bar.com", 0, 0)
)
- def test_no_auth(self):
+ def test_no_auth(self) -> None:
"""
Try to deactivate users without authentication.
"""
@@ -940,7 +944,7 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.UNAUTHORIZED, channel.code, msg=channel.json_body)
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
- def test_requester_is_not_admin(self):
+ def test_requester_is_not_admin(self) -> None:
"""
If the user is not a server admin, an error is returned.
"""
@@ -961,7 +965,7 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.FORBIDDEN, channel.code, msg=channel.json_body)
self.assertEqual("You are not a server admin", channel.json_body["error"])
- def test_user_does_not_exist(self):
+ def test_user_does_not_exist(self) -> None:
"""
Tests that deactivation for a user that does not exist returns a HTTPStatus.NOT_FOUND
"""
@@ -975,7 +979,7 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.NOT_FOUND, channel.code, msg=channel.json_body)
self.assertEqual(Codes.NOT_FOUND, channel.json_body["errcode"])
- def test_erase_is_not_bool(self):
+ def test_erase_is_not_bool(self) -> None:
"""
If parameter `erase` is not boolean, return an error
"""
@@ -990,7 +994,7 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(Codes.BAD_JSON, channel.json_body["errcode"])
- def test_user_is_not_local(self):
+ def test_user_is_not_local(self) -> None:
"""
Tests that deactivation for a user that is not a local returns a HTTPStatus.BAD_REQUEST
"""
@@ -1001,7 +1005,7 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual("Can only deactivate local users", channel.json_body["error"])
- def test_deactivate_user_erase_true(self):
+ def test_deactivate_user_erase_true(self) -> None:
"""
Test deactivating a user and set `erase` to `true`
"""
@@ -1046,7 +1050,7 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase):
self._is_erased("@user:test", True)
- def test_deactivate_user_erase_false(self):
+ def test_deactivate_user_erase_false(self) -> None:
"""
Test deactivating a user and set `erase` to `false`
"""
@@ -1091,7 +1095,7 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase):
self._is_erased("@user:test", False)
- def test_deactivate_user_erase_true_no_profile(self):
+ def test_deactivate_user_erase_true_no_profile(self) -> None:
"""
Test deactivating a user and set `erase` to `true`
if user has no profile information (stored in the database table `profiles`).
@@ -1162,7 +1166,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
sync.register_servlets,
]
- def prepare(self, reactor, clock, hs):
+ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastore()
self.auth_handler = hs.get_auth_handler()
@@ -1185,7 +1189,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
self.url_prefix = "/_synapse/admin/v2/users/%s"
self.url_other_user = self.url_prefix % self.other_user
- def test_requester_is_no_admin(self):
+ def test_requester_is_no_admin(self) -> None:
"""
If the user is not a server admin, an error is returned.
"""
@@ -1210,7 +1214,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.FORBIDDEN, channel.code, msg=channel.json_body)
self.assertEqual("You are not a server admin", channel.json_body["error"])
- def test_user_does_not_exist(self):
+ def test_user_does_not_exist(self) -> None:
"""
Tests that a lookup for a user that does not exist returns a HTTPStatus.NOT_FOUND
"""
@@ -1224,7 +1228,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.NOT_FOUND, channel.code, msg=channel.json_body)
self.assertEqual("M_NOT_FOUND", channel.json_body["errcode"])
- def test_invalid_parameter(self):
+ def test_invalid_parameter(self) -> None:
"""
If parameters are invalid, an error is returned.
"""
@@ -1319,7 +1323,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(Codes.MISSING_PARAM, channel.json_body["errcode"])
- def test_get_user(self):
+ def test_get_user(self) -> None:
"""
Test a simple get of a user.
"""
@@ -1334,7 +1338,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
self.assertEqual("User", channel.json_body["displayname"])
self._check_fields(channel.json_body)
- def test_create_server_admin(self):
+ def test_create_server_admin(self) -> None:
"""
Check that a new admin user is created successfully.
"""
@@ -1383,7 +1387,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
self.assertEqual("mxc://fibble/wibble", channel.json_body["avatar_url"])
self._check_fields(channel.json_body)
- def test_create_user(self):
+ def test_create_user(self) -> None:
"""
Check that a new regular user is created successfully.
"""
@@ -1450,7 +1454,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
@override_config(
{"limit_usage_by_mau": True, "max_mau_value": 2, "mau_trial_days": 0}
)
- def test_create_user_mau_limit_reached_active_admin(self):
+ def test_create_user_mau_limit_reached_active_admin(self) -> None:
"""
Check that an admin can register a new user via the admin API
even if the MAU limit is reached.
@@ -1496,7 +1500,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
@override_config(
{"limit_usage_by_mau": True, "max_mau_value": 2, "mau_trial_days": 0}
)
- def test_create_user_mau_limit_reached_passive_admin(self):
+ def test_create_user_mau_limit_reached_passive_admin(self) -> None:
"""
Check that an admin can register a new user via the admin API
even if the MAU limit is reached.
@@ -1541,7 +1545,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
"public_baseurl": "https://example.com",
}
)
- def test_create_user_email_notif_for_new_users(self):
+ def test_create_user_email_notif_for_new_users(self) -> None:
"""
Check that a new regular user is created successfully and
got an email pusher.
@@ -1584,7 +1588,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
"public_baseurl": "https://example.com",
}
)
- def test_create_user_email_no_notif_for_new_users(self):
+ def test_create_user_email_no_notif_for_new_users(self) -> None:
"""
Check that a new regular user is created successfully and
got not an email pusher.
@@ -1615,7 +1619,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
pushers = list(pushers)
self.assertEqual(len(pushers), 0)
- def test_set_password(self):
+ def test_set_password(self) -> None:
"""
Test setting a new password for another user.
"""
@@ -1631,7 +1635,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self._check_fields(channel.json_body)
- def test_set_displayname(self):
+ def test_set_displayname(self) -> None:
"""
Test setting the displayname of another user.
"""
@@ -1659,7 +1663,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
self.assertEqual("@user:test", channel.json_body["name"])
self.assertEqual("foobar", channel.json_body["displayname"])
- def test_set_threepid(self):
+ def test_set_threepid(self) -> None:
"""
Test setting threepid for an other user.
"""
@@ -1740,7 +1744,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(0, len(channel.json_body["threepids"]))
self._check_fields(channel.json_body)
- def test_set_duplicate_threepid(self):
+ def test_set_duplicate_threepid(self) -> None:
"""
Test setting the same threepid for a second user.
First user loses and second user gets mapping of this threepid.
@@ -1827,7 +1831,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(0, len(channel.json_body["threepids"]))
self._check_fields(channel.json_body)
- def test_set_external_id(self):
+ def test_set_external_id(self) -> None:
"""
Test setting external id for an other user.
"""
@@ -1925,7 +1929,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
self.assertEqual("@user:test", channel.json_body["name"])
self.assertEqual(0, len(channel.json_body["external_ids"]))
- def test_set_duplicate_external_id(self):
+ def test_set_duplicate_external_id(self) -> None:
"""
Test that setting the same external id for a second user fails and
external id from user must not be changed.
@@ -2048,7 +2052,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
)
self._check_fields(channel.json_body)
- def test_deactivate_user(self):
+ def test_deactivate_user(self) -> None:
"""
Test deactivating another user.
"""
@@ -2113,7 +2117,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
self.assertNotIn("password_hash", channel.json_body)
@override_config({"user_directory": {"enabled": True, "search_all_users": True}})
- def test_change_name_deactivate_user_user_directory(self):
+ def test_change_name_deactivate_user_user_directory(self) -> None:
"""
Test change profile information of a deactivated user and
check that it does not appear in user directory
@@ -2156,7 +2160,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
profile = self.get_success(self.store.get_user_in_directory(self.other_user))
self.assertIsNone(profile)
- def test_reactivate_user(self):
+ def test_reactivate_user(self) -> None:
"""
Test reactivating another user.
"""
@@ -2189,7 +2193,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
self.assertNotIn("password_hash", channel.json_body)
@override_config({"password_config": {"localdb_enabled": False}})
- def test_reactivate_user_localdb_disabled(self):
+ def test_reactivate_user_localdb_disabled(self) -> None:
"""
Test reactivating another user when using SSO.
"""
@@ -2223,7 +2227,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
self.assertNotIn("password_hash", channel.json_body)
@override_config({"password_config": {"enabled": False}})
- def test_reactivate_user_password_disabled(self):
+ def test_reactivate_user_password_disabled(self) -> None:
"""
Test reactivating another user when using SSO.
"""
@@ -2256,7 +2260,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
# This key was removed intentionally. Ensure it is not accidentally re-included.
self.assertNotIn("password_hash", channel.json_body)
- def test_set_user_as_admin(self):
+ def test_set_user_as_admin(self) -> None:
"""
Test setting the admin flag on a user.
"""
@@ -2284,7 +2288,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
self.assertEqual("@user:test", channel.json_body["name"])
self.assertTrue(channel.json_body["admin"])
- def test_set_user_type(self):
+ def test_set_user_type(self) -> None:
"""
Test changing user type.
"""
@@ -2335,7 +2339,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
self.assertEqual("@user:test", channel.json_body["name"])
self.assertIsNone(channel.json_body["user_type"])
- def test_accidental_deactivation_prevention(self):
+ def test_accidental_deactivation_prevention(self) -> None:
"""
Ensure an account can't accidentally be deactivated by using a str value
for the deactivated body parameter
@@ -2418,7 +2422,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
# This key was removed intentionally. Ensure it is not accidentally re-included.
self.assertNotIn("password_hash", channel.json_body)
- def _check_fields(self, content: JsonDict):
+ def _check_fields(self, content: JsonDict) -> None:
"""Checks that the expected user attributes are present in content
Args:
@@ -2448,7 +2452,7 @@ class UserMembershipRestTestCase(unittest.HomeserverTestCase):
room.register_servlets,
]
- def prepare(self, reactor, clock, hs):
+ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.admin_user = self.register_user("admin", "pass", admin=True)
self.admin_user_tok = self.login("admin", "pass")
@@ -2457,7 +2461,7 @@ class UserMembershipRestTestCase(unittest.HomeserverTestCase):
self.other_user
)
- def test_no_auth(self):
+ def test_no_auth(self) -> None:
"""
Try to list rooms of an user without authentication.
"""
@@ -2466,7 +2470,7 @@ class UserMembershipRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.UNAUTHORIZED, channel.code, msg=channel.json_body)
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
- def test_requester_is_no_admin(self):
+ def test_requester_is_no_admin(self) -> None:
"""
If the user is not a server admin, an error is returned.
"""
@@ -2481,7 +2485,7 @@ class UserMembershipRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.FORBIDDEN, channel.code, msg=channel.json_body)
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
- def test_user_does_not_exist(self):
+ def test_user_does_not_exist(self) -> None:
"""
Tests that a lookup for a user that does not exist returns an empty list
"""
@@ -2496,7 +2500,7 @@ class UserMembershipRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(0, channel.json_body["total"])
self.assertEqual(0, len(channel.json_body["joined_rooms"]))
- def test_user_is_not_local(self):
+ def test_user_is_not_local(self) -> None:
"""
Tests that a lookup for a user that is not a local and participates in no conversation returns an empty list
"""
@@ -2512,7 +2516,7 @@ class UserMembershipRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(0, channel.json_body["total"])
self.assertEqual(0, len(channel.json_body["joined_rooms"]))
- def test_no_memberships(self):
+ def test_no_memberships(self) -> None:
"""
Tests that a normal lookup for rooms is successfully
if user has no memberships
@@ -2528,7 +2532,7 @@ class UserMembershipRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(0, channel.json_body["total"])
self.assertEqual(0, len(channel.json_body["joined_rooms"]))
- def test_get_rooms(self):
+ def test_get_rooms(self) -> None:
"""
Tests that a normal lookup for rooms is successfully
"""
@@ -2549,7 +2553,7 @@ class UserMembershipRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(number_rooms, channel.json_body["total"])
self.assertEqual(number_rooms, len(channel.json_body["joined_rooms"]))
- def test_get_rooms_with_nonlocal_user(self):
+ def test_get_rooms_with_nonlocal_user(self) -> None:
"""
Tests that a normal lookup for rooms is successful with a non-local user
"""
@@ -2604,7 +2608,7 @@ class PushersRestTestCase(unittest.HomeserverTestCase):
login.register_servlets,
]
- def prepare(self, reactor, clock, hs):
+ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastore()
self.admin_user = self.register_user("admin", "pass", admin=True)
@@ -2615,7 +2619,7 @@ class PushersRestTestCase(unittest.HomeserverTestCase):
self.other_user
)
- def test_no_auth(self):
+ def test_no_auth(self) -> None:
"""
Try to list pushers of an user without authentication.
"""
@@ -2624,7 +2628,7 @@ class PushersRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.UNAUTHORIZED, channel.code, msg=channel.json_body)
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
- def test_requester_is_no_admin(self):
+ def test_requester_is_no_admin(self) -> None:
"""
If the user is not a server admin, an error is returned.
"""
@@ -2639,7 +2643,7 @@ class PushersRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.FORBIDDEN, channel.code, msg=channel.json_body)
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
- def test_user_does_not_exist(self):
+ def test_user_does_not_exist(self) -> None:
"""
Tests that a lookup for a user that does not exist returns a HTTPStatus.NOT_FOUND
"""
@@ -2653,7 +2657,7 @@ class PushersRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.NOT_FOUND, channel.code, msg=channel.json_body)
self.assertEqual(Codes.NOT_FOUND, channel.json_body["errcode"])
- def test_user_is_not_local(self):
+ def test_user_is_not_local(self) -> None:
"""
Tests that a lookup for a user that is not a local returns a HTTPStatus.BAD_REQUEST
"""
@@ -2668,7 +2672,7 @@ class PushersRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual("Can only look up local users", channel.json_body["error"])
- def test_get_pushers(self):
+ def test_get_pushers(self) -> None:
"""
Tests that a normal lookup for pushers is successfully
"""
@@ -2732,7 +2736,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase):
login.register_servlets,
]
- def prepare(self, reactor, clock, hs):
+ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastore()
self.media_repo = hs.get_media_repository_resource()
self.filepaths = MediaFilePaths(hs.config.media.media_store_path)
@@ -2746,7 +2750,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase):
)
@parameterized.expand(["GET", "DELETE"])
- def test_no_auth(self, method: str):
+ def test_no_auth(self, method: str) -> None:
"""Try to list media of an user without authentication."""
channel = self.make_request(method, self.url, {})
@@ -2754,7 +2758,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
@parameterized.expand(["GET", "DELETE"])
- def test_requester_is_no_admin(self, method: str):
+ def test_requester_is_no_admin(self, method: str) -> None:
"""If the user is not a server admin, an error is returned."""
other_user_token = self.login("user", "pass")
@@ -2768,7 +2772,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
@parameterized.expand(["GET", "DELETE"])
- def test_user_does_not_exist(self, method: str):
+ def test_user_does_not_exist(self, method: str) -> None:
"""Tests that a lookup for a user that does not exist returns a HTTPStatus.NOT_FOUND"""
url = "/_synapse/admin/v1/users/@unknown_person:test/media"
channel = self.make_request(
@@ -2781,7 +2785,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(Codes.NOT_FOUND, channel.json_body["errcode"])
@parameterized.expand(["GET", "DELETE"])
- def test_user_is_not_local(self, method: str):
+ def test_user_is_not_local(self, method: str) -> None:
"""Tests that a lookup for a user that is not a local returns a HTTPStatus.BAD_REQUEST"""
url = "/_synapse/admin/v1/users/@unknown_person:unknown_domain/media"
@@ -2794,7 +2798,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual("Can only look up local users", channel.json_body["error"])
- def test_limit_GET(self):
+ def test_limit_GET(self) -> None:
"""Testing list of media with limit"""
number_media = 20
@@ -2813,7 +2817,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(channel.json_body["next_token"], 5)
self._check_fields(channel.json_body["media"])
- def test_limit_DELETE(self):
+ def test_limit_DELETE(self) -> None:
"""Testing delete of media with limit"""
number_media = 20
@@ -2830,7 +2834,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(channel.json_body["total"], 5)
self.assertEqual(len(channel.json_body["deleted_media"]), 5)
- def test_from_GET(self):
+ def test_from_GET(self) -> None:
"""Testing list of media with a defined starting point (from)"""
number_media = 20
@@ -2849,7 +2853,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase):
self.assertNotIn("next_token", channel.json_body)
self._check_fields(channel.json_body["media"])
- def test_from_DELETE(self):
+ def test_from_DELETE(self) -> None:
"""Testing delete of media with a defined starting point (from)"""
number_media = 20
@@ -2866,7 +2870,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(channel.json_body["total"], 15)
self.assertEqual(len(channel.json_body["deleted_media"]), 15)
- def test_limit_and_from_GET(self):
+ def test_limit_and_from_GET(self) -> None:
"""Testing list of media with a defined starting point and limit"""
number_media = 20
@@ -2885,7 +2889,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(len(channel.json_body["media"]), 10)
self._check_fields(channel.json_body["media"])
- def test_limit_and_from_DELETE(self):
+ def test_limit_and_from_DELETE(self) -> None:
"""Testing delete of media with a defined starting point and limit"""
number_media = 20
@@ -2903,7 +2907,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(len(channel.json_body["deleted_media"]), 10)
@parameterized.expand(["GET", "DELETE"])
- def test_invalid_parameter(self, method: str):
+ def test_invalid_parameter(self, method: str) -> None:
"""If parameters are invalid, an error is returned."""
# unkown order_by
channel = self.make_request(
@@ -2945,7 +2949,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"])
- def test_next_token(self):
+ def test_next_token(self) -> None:
"""
Testing that `next_token` appears at the right place
@@ -3010,7 +3014,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(len(channel.json_body["media"]), 1)
self.assertNotIn("next_token", channel.json_body)
- def test_user_has_no_media_GET(self):
+ def test_user_has_no_media_GET(self) -> None:
"""
Tests that a normal lookup for media is successfully
if user has no media created
@@ -3026,7 +3030,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(0, channel.json_body["total"])
self.assertEqual(0, len(channel.json_body["media"]))
- def test_user_has_no_media_DELETE(self):
+ def test_user_has_no_media_DELETE(self) -> None:
"""
Tests that a delete is successful if user has no media
"""
@@ -3041,7 +3045,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(0, channel.json_body["total"])
self.assertEqual(0, len(channel.json_body["deleted_media"]))
- def test_get_media(self):
+ def test_get_media(self) -> None:
"""Tests that a normal lookup for media is successful"""
number_media = 5
@@ -3060,7 +3064,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase):
self.assertNotIn("next_token", channel.json_body)
self._check_fields(channel.json_body["media"])
- def test_delete_media(self):
+ def test_delete_media(self) -> None:
"""Tests that a normal delete of media is successful"""
number_media = 5
@@ -3089,7 +3093,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase):
for local_path in local_paths:
self.assertFalse(os.path.exists(local_path))
- def test_order_by(self):
+ def test_order_by(self) -> None:
"""
Testing order list with parameter `order_by`
"""
@@ -3252,7 +3256,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase):
return media_id
- def _check_fields(self, content: List[JsonDict]):
+ def _check_fields(self, content: List[JsonDict]) -> None:
"""Checks that the expected user attributes are present in content
Args:
content: List that is checked for content
@@ -3272,7 +3276,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase):
expected_media_list: List[str],
order_by: Optional[str],
dir: Optional[str] = None,
- ):
+ ) -> None:
"""Request the list of media in a certain order. Assert that order is what
we expect
Args:
@@ -3312,7 +3316,7 @@ class UserTokenRestTestCase(unittest.HomeserverTestCase):
logout.register_servlets,
]
- def prepare(self, reactor, clock, hs):
+ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastore()
self.admin_user = self.register_user("admin", "pass", admin=True)
@@ -3331,14 +3335,14 @@ class UserTokenRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
return channel.json_body["access_token"]
- def test_no_auth(self):
+ def test_no_auth(self) -> None:
"""Try to login as a user without authentication."""
channel = self.make_request("POST", self.url, b"{}")
self.assertEqual(HTTPStatus.UNAUTHORIZED, channel.code, msg=channel.json_body)
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
- def test_not_admin(self):
+ def test_not_admin(self) -> None:
"""Try to login as a user as a non-admin user."""
channel = self.make_request(
"POST", self.url, b"{}", access_token=self.other_user_tok
@@ -3346,7 +3350,7 @@ class UserTokenRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.FORBIDDEN, channel.code, msg=channel.json_body)
- def test_send_event(self):
+ def test_send_event(self) -> None:
"""Test that sending event as a user works."""
# Create a room.
room_id = self.helper.create_room_as(self.other_user, tok=self.other_user_tok)
@@ -3360,7 +3364,7 @@ class UserTokenRestTestCase(unittest.HomeserverTestCase):
event = self.get_success(self.store.get_event(event_id))
self.assertEqual(event.sender, self.other_user)
- def test_devices(self):
+ def test_devices(self) -> None:
"""Tests that logging in as a user doesn't create a new device for them."""
# Login in as the user
self._get_token()
@@ -3374,7 +3378,7 @@ class UserTokenRestTestCase(unittest.HomeserverTestCase):
# We should only see the one device (from the login in `prepare`)
self.assertEqual(len(channel.json_body["devices"]), 1)
- def test_logout(self):
+ def test_logout(self) -> None:
"""Test that calling `/logout` with the token works."""
# Login in as the user
puppet_token = self._get_token()
@@ -3397,7 +3401,7 @@ class UserTokenRestTestCase(unittest.HomeserverTestCase):
)
self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
- def test_user_logout_all(self):
+ def test_user_logout_all(self) -> None:
"""Tests that the target user calling `/logout/all` does *not* expire
the token.
"""
@@ -3424,7 +3428,7 @@ class UserTokenRestTestCase(unittest.HomeserverTestCase):
)
self.assertEqual(HTTPStatus.UNAUTHORIZED, channel.code, msg=channel.json_body)
- def test_admin_logout_all(self):
+ def test_admin_logout_all(self) -> None:
"""Tests that the admin user calling `/logout/all` does expire the
token.
"""
@@ -3464,7 +3468,7 @@ class UserTokenRestTestCase(unittest.HomeserverTestCase):
"form_secret": "123secret",
}
)
- def test_consent(self):
+ def test_consent(self) -> None:
"""Test that sending a message is not subject to the privacy policies."""
# Have the admin user accept the terms.
self.get_success(self.store.user_set_consent_version(self.admin_user, "1.0"))
@@ -3492,7 +3496,7 @@ class UserTokenRestTestCase(unittest.HomeserverTestCase):
@override_config(
{"limit_usage_by_mau": True, "max_mau_value": 1, "mau_trial_days": 0}
)
- def test_mau_limit(self):
+ def test_mau_limit(self) -> None:
# Create a room as the admin user. This will bump the monthly active users to 1.
room_id = self.helper.create_room_as(self.admin_user, tok=self.admin_user_tok)
@@ -3524,14 +3528,14 @@ class WhoisRestTestCase(unittest.HomeserverTestCase):
login.register_servlets,
]
- def prepare(self, reactor, clock, hs):
+ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.admin_user = self.register_user("admin", "pass", admin=True)
self.admin_user_tok = self.login("admin", "pass")
self.other_user = self.register_user("user", "pass")
- self.url = self.url_prefix % self.other_user
+ self.url = self.url_prefix % self.other_user # type: ignore[attr-defined]
- def test_no_auth(self):
+ def test_no_auth(self) -> None:
"""
Try to get information of an user without authentication.
"""
@@ -3539,7 +3543,7 @@ class WhoisRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.UNAUTHORIZED, channel.code, msg=channel.json_body)
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
- def test_requester_is_not_admin(self):
+ def test_requester_is_not_admin(self) -> None:
"""
If the user is not a server admin, an error is returned.
"""
@@ -3554,11 +3558,11 @@ class WhoisRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.FORBIDDEN, channel.code, msg=channel.json_body)
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
- def test_user_is_not_local(self):
+ def test_user_is_not_local(self) -> None:
"""
Tests that a lookup for a user that is not a local returns a HTTPStatus.BAD_REQUEST
"""
- url = self.url_prefix % "@unknown_person:unknown_domain"
+ url = self.url_prefix % "@unknown_person:unknown_domain" # type: ignore[attr-defined]
channel = self.make_request(
"GET",
@@ -3568,7 +3572,7 @@ class WhoisRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual("Can only whois a local user", channel.json_body["error"])
- def test_get_whois_admin(self):
+ def test_get_whois_admin(self) -> None:
"""
The lookup should succeed for an admin.
"""
@@ -3581,7 +3585,7 @@ class WhoisRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(self.other_user, channel.json_body["user_id"])
self.assertIn("devices", channel.json_body)
- def test_get_whois_user(self):
+ def test_get_whois_user(self) -> None:
"""
The lookup should succeed for a normal user looking up their own information.
"""
@@ -3604,7 +3608,7 @@ class ShadowBanRestTestCase(unittest.HomeserverTestCase):
login.register_servlets,
]
- def prepare(self, reactor, clock, hs):
+ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastore()
self.admin_user = self.register_user("admin", "pass", admin=True)
@@ -3617,7 +3621,7 @@ class ShadowBanRestTestCase(unittest.HomeserverTestCase):
)
@parameterized.expand(["POST", "DELETE"])
- def test_no_auth(self, method: str):
+ def test_no_auth(self, method: str) -> None:
"""
Try to get information of an user without authentication.
"""
@@ -3626,7 +3630,7 @@ class ShadowBanRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
@parameterized.expand(["POST", "DELETE"])
- def test_requester_is_not_admin(self, method: str):
+ def test_requester_is_not_admin(self, method: str) -> None:
"""
If the user is not a server admin, an error is returned.
"""
@@ -3637,7 +3641,7 @@ class ShadowBanRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
@parameterized.expand(["POST", "DELETE"])
- def test_user_is_not_local(self, method: str):
+ def test_user_is_not_local(self, method: str) -> None:
"""
Tests that shadow-banning for a user that is not a local returns a HTTPStatus.BAD_REQUEST
"""
@@ -3646,7 +3650,7 @@ class ShadowBanRestTestCase(unittest.HomeserverTestCase):
channel = self.make_request(method, url, access_token=self.admin_user_tok)
self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
- def test_success(self):
+ def test_success(self) -> None:
"""
Shadow-banning should succeed for an admin.
"""
@@ -3682,7 +3686,7 @@ class RateLimitTestCase(unittest.HomeserverTestCase):
login.register_servlets,
]
- def prepare(self, reactor, clock, hs):
+ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastore()
self.admin_user = self.register_user("admin", "pass", admin=True)
@@ -3695,7 +3699,7 @@ class RateLimitTestCase(unittest.HomeserverTestCase):
)
@parameterized.expand(["GET", "POST", "DELETE"])
- def test_no_auth(self, method: str):
+ def test_no_auth(self, method: str) -> None:
"""
Try to get information of a user without authentication.
"""
@@ -3705,7 +3709,7 @@ class RateLimitTestCase(unittest.HomeserverTestCase):
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
@parameterized.expand(["GET", "POST", "DELETE"])
- def test_requester_is_no_admin(self, method: str):
+ def test_requester_is_no_admin(self, method: str) -> None:
"""
If the user is not a server admin, an error is returned.
"""
@@ -3721,7 +3725,7 @@ class RateLimitTestCase(unittest.HomeserverTestCase):
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
@parameterized.expand(["GET", "POST", "DELETE"])
- def test_user_does_not_exist(self, method: str):
+ def test_user_does_not_exist(self, method: str) -> None:
"""
Tests that a lookup for a user that does not exist returns a HTTPStatus.NOT_FOUND
"""
@@ -3743,7 +3747,7 @@ class RateLimitTestCase(unittest.HomeserverTestCase):
("DELETE", "Only local users can be ratelimited"),
]
)
- def test_user_is_not_local(self, method: str, error_msg: str):
+ def test_user_is_not_local(self, method: str, error_msg: str) -> None:
"""
Tests that a lookup for a user that is not a local returns a HTTPStatus.BAD_REQUEST
"""
@@ -3760,7 +3764,7 @@ class RateLimitTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(error_msg, channel.json_body["error"])
- def test_invalid_parameter(self):
+ def test_invalid_parameter(self) -> None:
"""
If parameters are invalid, an error is returned.
"""
@@ -3808,7 +3812,7 @@ class RateLimitTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"])
- def test_return_zero_when_null(self):
+ def test_return_zero_when_null(self) -> None:
"""
If values in database are `null` API should return an int `0`
"""
@@ -3834,7 +3838,7 @@ class RateLimitTestCase(unittest.HomeserverTestCase):
self.assertEqual(0, channel.json_body["messages_per_second"])
self.assertEqual(0, channel.json_body["burst_count"])
- def test_success(self):
+ def test_success(self) -> None:
"""
Rate-limiting (set/update/delete) should succeed for an admin.
"""
@@ -3908,7 +3912,7 @@ class AccountDataTestCase(unittest.HomeserverTestCase):
login.register_servlets,
]
- def prepare(self, reactor, clock, hs) -> None:
+ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastore()
self.admin_user = self.register_user("admin", "pass", admin=True)
diff --git a/tests/rest/admin/test_username_available.py b/tests/rest/admin/test_username_available.py
index 7978626e71..b21f6d4689 100644
--- a/tests/rest/admin/test_username_available.py
+++ b/tests/rest/admin/test_username_available.py
@@ -14,9 +14,13 @@
from http import HTTPStatus
+from twisted.test.proto_helpers import MemoryReactor
+
import synapse.rest.admin
from synapse.api.errors import Codes, SynapseError
from synapse.rest.client import login
+from synapse.server import HomeServer
+from synapse.util import Clock
from tests import unittest
@@ -28,11 +32,11 @@ class UsernameAvailableTestCase(unittest.HomeserverTestCase):
]
url = "/_synapse/admin/v1/username_available"
- def prepare(self, reactor, clock, hs):
+ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.register_user("admin", "pass", admin=True)
self.admin_user_tok = self.login("admin", "pass")
- async def check_username(username):
+ async def check_username(username: str) -> bool:
if username == "allowed":
return True
raise SynapseError(
@@ -44,24 +48,24 @@ class UsernameAvailableTestCase(unittest.HomeserverTestCase):
handler = self.hs.get_registration_handler()
handler.check_username = check_username
- def test_username_available(self):
+ def test_username_available(self) -> None:
"""
The endpoint should return a HTTPStatus.OK response if the username does not exist
"""
url = "%s?username=%s" % (self.url, "allowed")
- channel = self.make_request("GET", url, None, self.admin_user_tok)
+ channel = self.make_request("GET", url, access_token=self.admin_user_tok)
self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertTrue(channel.json_body["available"])
- def test_username_unavailable(self):
+ def test_username_unavailable(self) -> None:
"""
The endpoint should return a HTTPStatus.OK response if the username does not exist
"""
url = "%s?username=%s" % (self.url, "disallowed")
- channel = self.make_request("GET", url, None, self.admin_user_tok)
+ channel = self.make_request("GET", url, access_token=self.admin_user_tok)
self.assertEqual(
HTTPStatus.BAD_REQUEST,
|