diff --git a/changelog.d/12108.misc b/changelog.d/12108.misc
index 0360dbd61e..b67a701dbb 100644
--- a/changelog.d/12108.misc
+++ b/changelog.d/12108.misc
@@ -1 +1 @@
-Add type hints to `tests/rest/client`.
+Add type hints to tests files.
diff --git a/changelog.d/12146.misc b/changelog.d/12146.misc
index 3ca7c47212..b67a701dbb 100644
--- a/changelog.d/12146.misc
+++ b/changelog.d/12146.misc
@@ -1 +1 @@
-Add type hints to `tests/rest`.
+Add type hints to tests files.
diff --git a/changelog.d/12207.misc b/changelog.d/12207.misc
new file mode 100644
index 0000000000..b67a701dbb
--- /dev/null
+++ b/changelog.d/12207.misc
@@ -0,0 +1 @@
+Add type hints to tests files.
diff --git a/tests/handlers/test_admin.py b/tests/handlers/test_admin.py
index abf2a0fe0d..c1579dac61 100644
--- a/tests/handlers/test_admin.py
+++ b/tests/handlers/test_admin.py
@@ -15,11 +15,15 @@
from collections import Counter
from unittest.mock import Mock
+from twisted.test.proto_helpers import MemoryReactor
+
import synapse.rest.admin
import synapse.storage
from synapse.api.constants import EventTypes, JoinRules
from synapse.api.room_versions import RoomVersions
from synapse.rest.client import knock, login, room
+from synapse.server import HomeServer
+from synapse.util import Clock
from tests import unittest
@@ -32,7 +36,7 @@ class ExfiltrateData(unittest.HomeserverTestCase):
knock.register_servlets,
]
- def prepare(self, reactor, clock, hs):
+ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.admin_handler = hs.get_admin_handler()
self.user1 = self.register_user("user1", "password")
@@ -41,7 +45,7 @@ class ExfiltrateData(unittest.HomeserverTestCase):
self.user2 = self.register_user("user2", "password")
self.token2 = self.login("user2", "password")
- def test_single_public_joined_room(self):
+ def test_single_public_joined_room(self) -> None:
"""Test that we write *all* events for a public room"""
room_id = self.helper.create_room_as(
self.user1, tok=self.token1, is_public=True
@@ -74,7 +78,7 @@ class ExfiltrateData(unittest.HomeserverTestCase):
self.assertEqual(counter[(EventTypes.Member, self.user1)], 1)
self.assertEqual(counter[(EventTypes.Member, self.user2)], 1)
- def test_single_private_joined_room(self):
+ def test_single_private_joined_room(self) -> None:
"""Tests that we correctly write state when we can't see all events in
a room.
"""
@@ -112,7 +116,7 @@ class ExfiltrateData(unittest.HomeserverTestCase):
self.assertEqual(counter[(EventTypes.Member, self.user1)], 1)
self.assertEqual(counter[(EventTypes.Member, self.user2)], 1)
- def test_single_left_room(self):
+ def test_single_left_room(self) -> None:
"""Tests that we don't see events in the room after we leave."""
room_id = self.helper.create_room_as(self.user1, tok=self.token1)
self.helper.send(room_id, body="Hello!", tok=self.token1)
@@ -144,7 +148,7 @@ class ExfiltrateData(unittest.HomeserverTestCase):
self.assertEqual(counter[(EventTypes.Member, self.user1)], 1)
self.assertEqual(counter[(EventTypes.Member, self.user2)], 2)
- def test_single_left_rejoined_private_room(self):
+ def test_single_left_rejoined_private_room(self) -> None:
"""Tests that see the correct events in private rooms when we
repeatedly join and leave.
"""
@@ -185,7 +189,7 @@ class ExfiltrateData(unittest.HomeserverTestCase):
self.assertEqual(counter[(EventTypes.Member, self.user1)], 1)
self.assertEqual(counter[(EventTypes.Member, self.user2)], 3)
- def test_invite(self):
+ def test_invite(self) -> None:
"""Tests that pending invites get handled correctly."""
room_id = self.helper.create_room_as(self.user1, tok=self.token1)
self.helper.send(room_id, body="Hello!", tok=self.token1)
@@ -204,7 +208,7 @@ class ExfiltrateData(unittest.HomeserverTestCase):
self.assertEqual(args[1].content["membership"], "invite")
self.assertTrue(args[2]) # Assert there is at least one bit of state
- def test_knock(self):
+ def test_knock(self) -> None:
"""Tests that knock get handled correctly."""
# create a knockable v7 room
room_id = self.helper.create_room_as(
diff --git a/tests/handlers/test_auth.py b/tests/handlers/test_auth.py
index 0c6e55e725..67a7829769 100644
--- a/tests/handlers/test_auth.py
+++ b/tests/handlers/test_auth.py
@@ -15,8 +15,12 @@ from unittest.mock import Mock
import pymacaroons
+from twisted.test.proto_helpers import MemoryReactor
+
from synapse.api.errors import AuthError, ResourceLimitError
from synapse.rest import admin
+from synapse.server import HomeServer
+from synapse.util import Clock
from tests import unittest
from tests.test_utils import make_awaitable
@@ -27,7 +31,7 @@ class AuthTestCase(unittest.HomeserverTestCase):
admin.register_servlets,
]
- def prepare(self, reactor, clock, hs):
+ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.auth_handler = hs.get_auth_handler()
self.macaroon_generator = hs.get_macaroon_generator()
@@ -42,23 +46,23 @@ class AuthTestCase(unittest.HomeserverTestCase):
self.user1 = self.register_user("a_user", "pass")
- def test_macaroon_caveats(self):
+ def test_macaroon_caveats(self) -> None:
token = self.macaroon_generator.generate_guest_access_token("a_user")
macaroon = pymacaroons.Macaroon.deserialize(token)
- def verify_gen(caveat):
+ def verify_gen(caveat: str) -> bool:
return caveat == "gen = 1"
- def verify_user(caveat):
+ def verify_user(caveat: str) -> bool:
return caveat == "user_id = a_user"
- def verify_type(caveat):
+ def verify_type(caveat: str) -> bool:
return caveat == "type = access"
- def verify_nonce(caveat):
+ def verify_nonce(caveat: str) -> bool:
return caveat.startswith("nonce =")
- def verify_guest(caveat):
+ def verify_guest(caveat: str) -> bool:
return caveat == "guest = true"
v = pymacaroons.Verifier()
@@ -69,7 +73,7 @@ class AuthTestCase(unittest.HomeserverTestCase):
v.satisfy_general(verify_guest)
v.verify(macaroon, self.hs.config.key.macaroon_secret_key)
- def test_short_term_login_token_gives_user_id(self):
+ def test_short_term_login_token_gives_user_id(self) -> None:
token = self.macaroon_generator.generate_short_term_login_token(
self.user1, "", duration_in_ms=5000
)
@@ -84,7 +88,7 @@ class AuthTestCase(unittest.HomeserverTestCase):
AuthError,
)
- def test_short_term_login_token_gives_auth_provider(self):
+ def test_short_term_login_token_gives_auth_provider(self) -> None:
token = self.macaroon_generator.generate_short_term_login_token(
self.user1, auth_provider_id="my_idp"
)
@@ -92,7 +96,7 @@ class AuthTestCase(unittest.HomeserverTestCase):
self.assertEqual(self.user1, res.user_id)
self.assertEqual("my_idp", res.auth_provider_id)
- def test_short_term_login_token_cannot_replace_user_id(self):
+ def test_short_term_login_token_cannot_replace_user_id(self) -> None:
token = self.macaroon_generator.generate_short_term_login_token(
self.user1, "", duration_in_ms=5000
)
@@ -112,7 +116,7 @@ class AuthTestCase(unittest.HomeserverTestCase):
AuthError,
)
- def test_mau_limits_disabled(self):
+ def test_mau_limits_disabled(self) -> None:
self.auth_blocking._limit_usage_by_mau = False
# Ensure does not throw exception
self.get_success(
@@ -127,7 +131,7 @@ class AuthTestCase(unittest.HomeserverTestCase):
)
)
- def test_mau_limits_exceeded_large(self):
+ def test_mau_limits_exceeded_large(self) -> None:
self.auth_blocking._limit_usage_by_mau = True
self.hs.get_datastores().main.get_monthly_active_count = Mock(
return_value=make_awaitable(self.large_number_of_users)
@@ -150,7 +154,7 @@ class AuthTestCase(unittest.HomeserverTestCase):
ResourceLimitError,
)
- def test_mau_limits_parity(self):
+ def test_mau_limits_parity(self) -> None:
# Ensure we're not at the unix epoch.
self.reactor.advance(1)
self.auth_blocking._limit_usage_by_mau = True
@@ -189,7 +193,7 @@ class AuthTestCase(unittest.HomeserverTestCase):
)
)
- def test_mau_limits_not_exceeded(self):
+ def test_mau_limits_not_exceeded(self) -> None:
self.auth_blocking._limit_usage_by_mau = True
self.hs.get_datastores().main.get_monthly_active_count = Mock(
@@ -211,7 +215,7 @@ class AuthTestCase(unittest.HomeserverTestCase):
)
)
- def _get_macaroon(self):
+ def _get_macaroon(self) -> pymacaroons.Macaroon:
token = self.macaroon_generator.generate_short_term_login_token(
self.user1, "", duration_in_ms=5000
)
diff --git a/tests/handlers/test_deactivate_account.py b/tests/handlers/test_deactivate_account.py
index ddda36c5a9..3a10791226 100644
--- a/tests/handlers/test_deactivate_account.py
+++ b/tests/handlers/test_deactivate_account.py
@@ -39,7 +39,7 @@ class DeactivateAccountTestCase(HomeserverTestCase):
self.user = self.register_user("user", "pass")
self.token = self.login("user", "pass")
- def _deactivate_my_account(self):
+ def _deactivate_my_account(self) -> None:
"""
Deactivates the account `self.user` using `self.token` and asserts
that it returns a 200 success code.
diff --git a/tests/handlers/test_device.py b/tests/handlers/test_device.py
index 683677fd07..01ea7d2a42 100644
--- a/tests/handlers/test_device.py
+++ b/tests/handlers/test_device.py
@@ -14,9 +14,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import synapse.api.errors
-import synapse.handlers.device
-import synapse.storage
+from typing import Optional
+
+from twisted.test.proto_helpers import MemoryReactor
+
+from synapse.api.errors import NotFoundError, SynapseError
+from synapse.handlers.device import MAX_DEVICE_DISPLAY_NAME_LEN
+from synapse.server import HomeServer
+from synapse.util import Clock
from tests import unittest
@@ -25,28 +30,27 @@ user2 = "@theresa:bbb"
class DeviceTestCase(unittest.HomeserverTestCase):
- def make_homeserver(self, reactor, clock):
+ def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
hs = self.setup_test_homeserver("server", federation_http_client=None)
self.handler = hs.get_device_handler()
self.store = hs.get_datastores().main
return hs
- def prepare(self, reactor, clock, hs):
+ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
# These tests assume that it starts 1000 seconds in.
self.reactor.advance(1000)
- def test_device_is_created_with_invalid_name(self):
+ def test_device_is_created_with_invalid_name(self) -> None:
self.get_failure(
self.handler.check_device_registered(
user_id="@boris:foo",
device_id="foo",
- initial_device_display_name="a"
- * (synapse.handlers.device.MAX_DEVICE_DISPLAY_NAME_LEN + 1),
+ initial_device_display_name="a" * (MAX_DEVICE_DISPLAY_NAME_LEN + 1),
),
- synapse.api.errors.SynapseError,
+ SynapseError,
)
- def test_device_is_created_if_doesnt_exist(self):
+ def test_device_is_created_if_doesnt_exist(self) -> None:
res = self.get_success(
self.handler.check_device_registered(
user_id="@boris:foo",
@@ -59,7 +63,7 @@ class DeviceTestCase(unittest.HomeserverTestCase):
dev = self.get_success(self.handler.store.get_device("@boris:foo", "fco"))
self.assertEqual(dev["display_name"], "display name")
- def test_device_is_preserved_if_exists(self):
+ def test_device_is_preserved_if_exists(self) -> None:
res1 = self.get_success(
self.handler.check_device_registered(
user_id="@boris:foo",
@@ -81,7 +85,7 @@ class DeviceTestCase(unittest.HomeserverTestCase):
dev = self.get_success(self.handler.store.get_device("@boris:foo", "fco"))
self.assertEqual(dev["display_name"], "display name")
- def test_device_id_is_made_up_if_unspecified(self):
+ def test_device_id_is_made_up_if_unspecified(self) -> None:
device_id = self.get_success(
self.handler.check_device_registered(
user_id="@theresa:foo",
@@ -93,7 +97,7 @@ class DeviceTestCase(unittest.HomeserverTestCase):
dev = self.get_success(self.handler.store.get_device("@theresa:foo", device_id))
self.assertEqual(dev["display_name"], "display")
- def test_get_devices_by_user(self):
+ def test_get_devices_by_user(self) -> None:
self._record_users()
res = self.get_success(self.handler.get_devices_by_user(user1))
@@ -131,7 +135,7 @@ class DeviceTestCase(unittest.HomeserverTestCase):
device_map["abc"],
)
- def test_get_device(self):
+ def test_get_device(self) -> None:
self._record_users()
res = self.get_success(self.handler.get_device(user1, "abc"))
@@ -146,21 +150,19 @@ class DeviceTestCase(unittest.HomeserverTestCase):
res,
)
- def test_delete_device(self):
+ def test_delete_device(self) -> None:
self._record_users()
# delete the device
self.get_success(self.handler.delete_device(user1, "abc"))
# check the device was deleted
- self.get_failure(
- self.handler.get_device(user1, "abc"), synapse.api.errors.NotFoundError
- )
+ self.get_failure(self.handler.get_device(user1, "abc"), NotFoundError)
# we'd like to check the access token was invalidated, but that's a
# bit of a PITA.
- def test_delete_device_and_device_inbox(self):
+ def test_delete_device_and_device_inbox(self) -> None:
self._record_users()
# add an device_inbox
@@ -191,7 +193,7 @@ class DeviceTestCase(unittest.HomeserverTestCase):
)
self.assertIsNone(res)
- def test_update_device(self):
+ def test_update_device(self) -> None:
self._record_users()
update = {"display_name": "new display"}
@@ -200,32 +202,29 @@ class DeviceTestCase(unittest.HomeserverTestCase):
res = self.get_success(self.handler.get_device(user1, "abc"))
self.assertEqual(res["display_name"], "new display")
- def test_update_device_too_long_display_name(self):
+ def test_update_device_too_long_display_name(self) -> None:
"""Update a device with a display name that is invalid (too long)."""
self._record_users()
# Request to update a device display name with a new value that is longer than allowed.
- update = {
- "display_name": "a"
- * (synapse.handlers.device.MAX_DEVICE_DISPLAY_NAME_LEN + 1)
- }
+ update = {"display_name": "a" * (MAX_DEVICE_DISPLAY_NAME_LEN + 1)}
self.get_failure(
self.handler.update_device(user1, "abc", update),
- synapse.api.errors.SynapseError,
+ SynapseError,
)
# Ensure the display name was not updated.
res = self.get_success(self.handler.get_device(user1, "abc"))
self.assertEqual(res["display_name"], "display 2")
- def test_update_unknown_device(self):
+ def test_update_unknown_device(self) -> None:
update = {"display_name": "new_display"}
self.get_failure(
self.handler.update_device("user_id", "unknown_device_id", update),
- synapse.api.errors.NotFoundError,
+ NotFoundError,
)
- def _record_users(self):
+ def _record_users(self) -> None:
# check this works for both devices which have a recorded client_ip,
# and those which don't.
self._record_user(user1, "xyz", "display 0")
@@ -238,8 +237,13 @@ class DeviceTestCase(unittest.HomeserverTestCase):
self.reactor.advance(10000)
def _record_user(
- self, user_id, device_id, display_name, access_token=None, ip=None
- ):
+ self,
+ user_id: str,
+ device_id: str,
+ display_name: str,
+ access_token: Optional[str] = None,
+ ip: Optional[str] = None,
+ ) -> None:
device_id = self.get_success(
self.handler.check_device_registered(
user_id=user_id,
@@ -248,7 +252,7 @@ class DeviceTestCase(unittest.HomeserverTestCase):
)
)
- if ip is not None:
+ if access_token is not None and ip is not None:
self.get_success(
self.store.insert_client_ip(
user_id, access_token, ip, "user_agent", device_id
@@ -258,7 +262,7 @@ class DeviceTestCase(unittest.HomeserverTestCase):
class DehydrationTestCase(unittest.HomeserverTestCase):
- def make_homeserver(self, reactor, clock):
+ def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
hs = self.setup_test_homeserver("server", federation_http_client=None)
self.handler = hs.get_device_handler()
self.registration = hs.get_registration_handler()
@@ -266,7 +270,7 @@ class DehydrationTestCase(unittest.HomeserverTestCase):
self.store = hs.get_datastores().main
return hs
- def test_dehydrate_and_rehydrate_device(self):
+ def test_dehydrate_and_rehydrate_device(self) -> None:
user_id = "@boris:dehydration"
self.get_success(self.store.register_user(user_id, "foobar"))
@@ -303,7 +307,7 @@ class DehydrationTestCase(unittest.HomeserverTestCase):
access_token=access_token,
device_id="not the right device ID",
),
- synapse.api.errors.NotFoundError,
+ NotFoundError,
)
# dehydrating the right devices should succeed and change our device ID
@@ -331,7 +335,7 @@ class DehydrationTestCase(unittest.HomeserverTestCase):
# make sure that the device ID that we were initially assigned no longer exists
self.get_failure(
self.handler.get_device(user_id, device_id),
- synapse.api.errors.NotFoundError,
+ NotFoundError,
)
# make sure that there's no device available for dehydrating now
|