summary refs log tree commit diff
path: root/tests/rest/admin/test_user.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/rest/admin/test_user.py')
-rw-r--r--tests/rest/admin/test_user.py613
1 files changed, 514 insertions, 99 deletions
diff --git a/tests/rest/admin/test_user.py b/tests/rest/admin/test_user.py

index 04599c2fcf..ba26895391 100644 --- a/tests/rest/admin/test_user.py +++ b/tests/rest/admin/test_user.py
@@ -28,6 +28,7 @@ from synapse.api.errors import Codes, HttpResponseException, ResourceLimitError from synapse.api.room_versions import RoomVersions from synapse.rest.client.v1 import login, logout, profile, room from synapse.rest.client.v2_alpha import devices, sync +from synapse.types import JsonDict from tests import unittest from tests.test_utils import make_awaitable @@ -468,13 +469,6 @@ class UsersListTestCase(unittest.HomeserverTestCase): self.admin_user = self.register_user("admin", "pass", admin=True) self.admin_user_tok = self.login("admin", "pass") - self.user1 = self.register_user( - "user1", "pass1", admin=False, displayname="Name 1" - ) - self.user2 = self.register_user( - "user2", "pass2", admin=False, displayname="Name 2" - ) - def test_no_auth(self): """ Try to list users without authentication. @@ -488,6 +482,7 @@ class UsersListTestCase(unittest.HomeserverTestCase): """ If the user is not a server admin, an error is returned. """ + self._create_users(1) other_user_token = self.login("user1", "pass1") channel = self.make_request("GET", self.url, access_token=other_user_token) @@ -499,6 +494,8 @@ class UsersListTestCase(unittest.HomeserverTestCase): """ List all users, including deactivated users. """ + self._create_users(2) + channel = self.make_request( "GET", self.url + "?deactivated=true", @@ -511,14 +508,7 @@ class UsersListTestCase(unittest.HomeserverTestCase): self.assertEqual(3, channel.json_body["total"]) # Check that all fields are available - for u in channel.json_body["users"]: - self.assertIn("name", u) - self.assertIn("is_guest", u) - self.assertIn("admin", u) - self.assertIn("user_type", u) - self.assertIn("deactivated", u) - self.assertIn("displayname", u) - self.assertIn("avatar_url", u) + self._check_fields(channel.json_body["users"]) def test_search_term(self): """Test that searching for a users works correctly""" @@ -538,9 +528,14 @@ class UsersListTestCase(unittest.HomeserverTestCase): search_field: Field which is to request: `name` or `user_id` expected_http_code: The expected http code for the request """ - url = self.url + "?%s=%s" % (search_field, search_term,) + url = self.url + "?%s=%s" % ( + search_field, + search_term, + ) channel = self.make_request( - "GET", url.encode("ascii"), access_token=self.admin_user_tok, + "GET", + url.encode("ascii"), + access_token=self.admin_user_tok, ) self.assertEqual(expected_http_code, channel.code, msg=channel.json_body) @@ -549,6 +544,7 @@ class UsersListTestCase(unittest.HomeserverTestCase): # Check that users were returned self.assertTrue("users" in channel.json_body) + self._check_fields(channel.json_body["users"]) users = channel.json_body["users"] # Check that the expected number of users were returned @@ -561,25 +557,30 @@ class UsersListTestCase(unittest.HomeserverTestCase): u = users[0] self.assertEqual(expected_user_id, u["name"]) + self._create_users(2) + + user1 = "@user1:test" + user2 = "@user2:test" + # Perform search tests - _search_test(self.user1, "er1") - _search_test(self.user1, "me 1") + _search_test(user1, "er1") + _search_test(user1, "me 1") - _search_test(self.user2, "er2") - _search_test(self.user2, "me 2") + _search_test(user2, "er2") + _search_test(user2, "me 2") - _search_test(self.user1, "er1", "user_id") - _search_test(self.user2, "er2", "user_id") + _search_test(user1, "er1", "user_id") + _search_test(user2, "er2", "user_id") # Test case insensitive - _search_test(self.user1, "ER1") - _search_test(self.user1, "NAME 1") + _search_test(user1, "ER1") + _search_test(user1, "NAME 1") - _search_test(self.user2, "ER2") - _search_test(self.user2, "NAME 2") + _search_test(user2, "ER2") + _search_test(user2, "NAME 2") - _search_test(self.user1, "ER1", "user_id") - _search_test(self.user2, "ER2", "user_id") + _search_test(user1, "ER1", "user_id") + _search_test(user2, "ER2", "user_id") _search_test(None, "foo") _search_test(None, "bar") @@ -587,6 +588,205 @@ class UsersListTestCase(unittest.HomeserverTestCase): _search_test(None, "foo", "user_id") _search_test(None, "bar", "user_id") + def test_invalid_parameter(self): + """ + If parameters are invalid, an error is returned. + """ + + # negative limit + channel = self.make_request( + "GET", + self.url + "?limit=-5", + access_token=self.admin_user_tok, + ) + + self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"]) + self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"]) + + # negative from + channel = self.make_request( + "GET", + self.url + "?from=-5", + access_token=self.admin_user_tok, + ) + + self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"]) + self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"]) + + # invalid guests + channel = self.make_request( + "GET", + self.url + "?guests=not_bool", + access_token=self.admin_user_tok, + ) + + self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"]) + self.assertEqual(Codes.UNKNOWN, channel.json_body["errcode"]) + + # invalid deactivated + channel = self.make_request( + "GET", + self.url + "?deactivated=not_bool", + access_token=self.admin_user_tok, + ) + + self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"]) + self.assertEqual(Codes.UNKNOWN, channel.json_body["errcode"]) + + def test_limit(self): + """ + Testing list of users with limit + """ + + number_users = 20 + # Create one less user (since there's already an admin user). + self._create_users(number_users - 1) + + channel = self.make_request( + "GET", + self.url + "?limit=5", + access_token=self.admin_user_tok, + ) + + self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) + self.assertEqual(channel.json_body["total"], number_users) + self.assertEqual(len(channel.json_body["users"]), 5) + self.assertEqual(channel.json_body["next_token"], "5") + self._check_fields(channel.json_body["users"]) + + def test_from(self): + """ + Testing list of users with a defined starting point (from) + """ + + number_users = 20 + # Create one less user (since there's already an admin user). + self._create_users(number_users - 1) + + channel = self.make_request( + "GET", + self.url + "?from=5", + access_token=self.admin_user_tok, + ) + + self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) + self.assertEqual(channel.json_body["total"], number_users) + self.assertEqual(len(channel.json_body["users"]), 15) + self.assertNotIn("next_token", channel.json_body) + self._check_fields(channel.json_body["users"]) + + def test_limit_and_from(self): + """ + Testing list of users with a defined starting point and limit + """ + + number_users = 20 + # Create one less user (since there's already an admin user). + self._create_users(number_users - 1) + + channel = self.make_request( + "GET", + self.url + "?from=5&limit=10", + access_token=self.admin_user_tok, + ) + + self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) + self.assertEqual(channel.json_body["total"], number_users) + self.assertEqual(channel.json_body["next_token"], "15") + self.assertEqual(len(channel.json_body["users"]), 10) + self._check_fields(channel.json_body["users"]) + + def test_next_token(self): + """ + Testing that `next_token` appears at the right place + """ + + number_users = 20 + # Create one less user (since there's already an admin user). + self._create_users(number_users - 1) + + # `next_token` does not appear + # Number of results is the number of entries + channel = self.make_request( + "GET", + self.url + "?limit=20", + access_token=self.admin_user_tok, + ) + + self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) + self.assertEqual(channel.json_body["total"], number_users) + self.assertEqual(len(channel.json_body["users"]), number_users) + self.assertNotIn("next_token", channel.json_body) + + # `next_token` does not appear + # Number of max results is larger than the number of entries + channel = self.make_request( + "GET", + self.url + "?limit=21", + access_token=self.admin_user_tok, + ) + + self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) + self.assertEqual(channel.json_body["total"], number_users) + self.assertEqual(len(channel.json_body["users"]), number_users) + self.assertNotIn("next_token", channel.json_body) + + # `next_token` does appear + # Number of max results is smaller than the number of entries + channel = self.make_request( + "GET", + self.url + "?limit=19", + access_token=self.admin_user_tok, + ) + + self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) + self.assertEqual(channel.json_body["total"], number_users) + self.assertEqual(len(channel.json_body["users"]), 19) + self.assertEqual(channel.json_body["next_token"], "19") + + # Check + # Set `from` to value of `next_token` for request remaining entries + # `next_token` does not appear + channel = self.make_request( + "GET", + self.url + "?from=19", + access_token=self.admin_user_tok, + ) + + self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) + self.assertEqual(channel.json_body["total"], number_users) + self.assertEqual(len(channel.json_body["users"]), 1) + self.assertNotIn("next_token", channel.json_body) + + def _check_fields(self, content: JsonDict): + """Checks that the expected user attributes are present in content + Args: + content: List that is checked for content + """ + for u in content: + self.assertIn("name", u) + self.assertIn("is_guest", u) + self.assertIn("admin", u) + self.assertIn("user_type", u) + self.assertIn("deactivated", u) + self.assertIn("shadow_banned", u) + self.assertIn("displayname", u) + self.assertIn("avatar_url", u) + + def _create_users(self, number_users: int): + """ + Create a number of users + Args: + number_users: Number of users to be created + """ + for i in range(1, number_users + 1): + self.register_user( + "user%d" % i, + "pass%d" % i, + admin=False, + displayname="Name %d" % i, + ) + class DeactivateAccountTestCase(unittest.HomeserverTestCase): @@ -639,7 +839,10 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase): self.assertEqual("You are not a server admin", channel.json_body["error"]) channel = self.make_request( - "POST", url, access_token=self.other_user_token, content=b"{}", + "POST", + url, + access_token=self.other_user_token, + content=b"{}", ) self.assertEqual(403, int(channel.result["code"]), msg=channel.result["body"]) @@ -693,7 +896,9 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase): # Get user channel = self.make_request( - "GET", self.url_other_user, access_token=self.admin_user_tok, + "GET", + self.url_other_user, + access_token=self.admin_user_tok, ) self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) @@ -717,7 +922,9 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase): # Get user channel = self.make_request( - "GET", self.url_other_user, access_token=self.admin_user_tok, + "GET", + self.url_other_user, + access_token=self.admin_user_tok, ) self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) @@ -736,7 +943,9 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase): # Get user channel = self.make_request( - "GET", self.url_other_user, access_token=self.admin_user_tok, + "GET", + self.url_other_user, + access_token=self.admin_user_tok, ) self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) @@ -760,7 +969,9 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase): # Get user channel = self.make_request( - "GET", self.url_other_user, access_token=self.admin_user_tok, + "GET", + self.url_other_user, + access_token=self.admin_user_tok, ) self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) @@ -773,8 +984,7 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase): self._is_erased("@user:test", False) def _is_erased(self, user_id: str, expect: bool) -> None: - """Assert that the user is erased or not - """ + """Assert that the user is erased or not""" d = self.store.is_user_erased(user_id) if expect: self.assertTrue(self.get_success(d)) @@ -808,13 +1018,20 @@ class UserRestTestCase(unittest.HomeserverTestCase): """ url = "/_synapse/admin/v2/users/@bob:test" - channel = self.make_request("GET", url, access_token=self.other_user_token,) + channel = self.make_request( + "GET", + url, + access_token=self.other_user_token, + ) self.assertEqual(403, int(channel.result["code"]), msg=channel.result["body"]) self.assertEqual("You are not a server admin", channel.json_body["error"]) channel = self.make_request( - "PUT", url, access_token=self.other_user_token, content=b"{}", + "PUT", + url, + access_token=self.other_user_token, + content=b"{}", ) self.assertEqual(403, int(channel.result["code"]), msg=channel.result["body"]) @@ -867,7 +1084,11 @@ class UserRestTestCase(unittest.HomeserverTestCase): self.assertEqual("mxc://fibble/wibble", channel.json_body["avatar_url"]) # Get user - channel = self.make_request("GET", url, access_token=self.admin_user_tok,) + channel = self.make_request( + "GET", + url, + access_token=self.admin_user_tok, + ) self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) self.assertEqual("@bob:test", channel.json_body["name"]) @@ -912,7 +1133,11 @@ class UserRestTestCase(unittest.HomeserverTestCase): self.assertEqual("mxc://fibble/wibble", channel.json_body["avatar_url"]) # Get user - channel = self.make_request("GET", url, access_token=self.admin_user_tok,) + channel = self.make_request( + "GET", + url, + access_token=self.admin_user_tok, + ) self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) self.assertEqual("@bob:test", channel.json_body["name"]) @@ -922,6 +1147,7 @@ class UserRestTestCase(unittest.HomeserverTestCase): self.assertEqual(False, channel.json_body["admin"]) self.assertEqual(False, channel.json_body["is_guest"]) self.assertEqual(False, channel.json_body["deactivated"]) + self.assertEqual(False, channel.json_body["shadow_banned"]) self.assertEqual("mxc://fibble/wibble", channel.json_body["avatar_url"]) @override_config( @@ -1137,7 +1363,9 @@ class UserRestTestCase(unittest.HomeserverTestCase): # Get user channel = self.make_request( - "GET", self.url_other_user, access_token=self.admin_user_tok, + "GET", + self.url_other_user, + access_token=self.admin_user_tok, ) self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) @@ -1168,7 +1396,9 @@ class UserRestTestCase(unittest.HomeserverTestCase): # Get user channel = self.make_request( - "GET", self.url_other_user, access_token=self.admin_user_tok, + "GET", + self.url_other_user, + access_token=self.admin_user_tok, ) self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) @@ -1191,7 +1421,9 @@ class UserRestTestCase(unittest.HomeserverTestCase): # Get user channel = self.make_request( - "GET", self.url_other_user, access_token=self.admin_user_tok, + "GET", + self.url_other_user, + access_token=self.admin_user_tok, ) self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) @@ -1221,7 +1453,9 @@ class UserRestTestCase(unittest.HomeserverTestCase): # Get user channel = self.make_request( - "GET", self.url_other_user, access_token=self.admin_user_tok, + "GET", + self.url_other_user, + access_token=self.admin_user_tok, ) self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) @@ -1319,7 +1553,9 @@ class UserRestTestCase(unittest.HomeserverTestCase): # Get user channel = self.make_request( - "GET", self.url_other_user, access_token=self.admin_user_tok, + "GET", + self.url_other_user, + access_token=self.admin_user_tok, ) self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) @@ -1348,7 +1584,9 @@ class UserRestTestCase(unittest.HomeserverTestCase): # Get user channel = self.make_request( - "GET", self.url_other_user, access_token=self.admin_user_tok, + "GET", + self.url_other_user, + access_token=self.admin_user_tok, ) self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) @@ -1377,7 +1615,11 @@ class UserRestTestCase(unittest.HomeserverTestCase): self.assertEqual("bob", channel.json_body["displayname"]) # Get user - channel = self.make_request("GET", url, access_token=self.admin_user_tok,) + channel = self.make_request( + "GET", + url, + access_token=self.admin_user_tok, + ) self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) self.assertEqual("@bob:test", channel.json_body["name"]) @@ -1397,7 +1639,11 @@ class UserRestTestCase(unittest.HomeserverTestCase): self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"]) # Check user is not deactivated - channel = self.make_request("GET", url, access_token=self.admin_user_tok,) + channel = self.make_request( + "GET", + url, + access_token=self.admin_user_tok, + ) self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) self.assertEqual("@bob:test", channel.json_body["name"]) @@ -1407,8 +1653,7 @@ class UserRestTestCase(unittest.HomeserverTestCase): self.assertEqual(0, channel.json_body["deactivated"]) def _is_erased(self, user_id, expect): - """Assert that the user is erased or not - """ + """Assert that the user is erased or not""" d = self.store.is_user_erased(user_id) if expect: self.assertTrue(self.get_success(d)) @@ -1448,7 +1693,11 @@ class UserMembershipRestTestCase(unittest.HomeserverTestCase): """ other_user_token = self.login("user", "pass") - channel = self.make_request("GET", self.url, access_token=other_user_token,) + channel = self.make_request( + "GET", + self.url, + access_token=other_user_token, + ) self.assertEqual(403, int(channel.result["code"]), msg=channel.result["body"]) self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"]) @@ -1458,7 +1707,11 @@ class UserMembershipRestTestCase(unittest.HomeserverTestCase): Tests that a lookup for a user that does not exist returns an empty list """ url = "/_synapse/admin/v1/users/@unknown_person:test/joined_rooms" - channel = self.make_request("GET", url, access_token=self.admin_user_tok,) + channel = self.make_request( + "GET", + url, + access_token=self.admin_user_tok, + ) self.assertEqual(200, channel.code, msg=channel.json_body) self.assertEqual(0, channel.json_body["total"]) @@ -1470,7 +1723,11 @@ class UserMembershipRestTestCase(unittest.HomeserverTestCase): """ url = "/_synapse/admin/v1/users/@unknown_person:unknown_domain/joined_rooms" - channel = self.make_request("GET", url, access_token=self.admin_user_tok,) + channel = self.make_request( + "GET", + url, + access_token=self.admin_user_tok, + ) self.assertEqual(200, channel.code, msg=channel.json_body) self.assertEqual(0, channel.json_body["total"]) @@ -1482,7 +1739,11 @@ class UserMembershipRestTestCase(unittest.HomeserverTestCase): if user has no memberships """ # Get rooms - channel = self.make_request("GET", self.url, access_token=self.admin_user_tok,) + channel = self.make_request( + "GET", + self.url, + access_token=self.admin_user_tok, + ) self.assertEqual(200, channel.code, msg=channel.json_body) self.assertEqual(0, channel.json_body["total"]) @@ -1499,7 +1760,11 @@ class UserMembershipRestTestCase(unittest.HomeserverTestCase): self.helper.create_room_as(self.other_user, tok=other_user_tok) # Get rooms - channel = self.make_request("GET", self.url, access_token=self.admin_user_tok,) + channel = self.make_request( + "GET", + self.url, + access_token=self.admin_user_tok, + ) self.assertEqual(200, channel.code, msg=channel.json_body) self.assertEqual(number_rooms, channel.json_body["total"]) @@ -1542,7 +1807,11 @@ class UserMembershipRestTestCase(unittest.HomeserverTestCase): # Now get rooms url = "/_synapse/admin/v1/users/@joiner:remote_hs/joined_rooms" - channel = self.make_request("GET", url, access_token=self.admin_user_tok,) + channel = self.make_request( + "GET", + url, + access_token=self.admin_user_tok, + ) self.assertEqual(200, channel.code, msg=channel.json_body) self.assertEqual(1, channel.json_body["total"]) @@ -1582,7 +1851,11 @@ class PushersRestTestCase(unittest.HomeserverTestCase): """ other_user_token = self.login("user", "pass") - channel = self.make_request("GET", self.url, access_token=other_user_token,) + channel = self.make_request( + "GET", + self.url, + access_token=other_user_token, + ) self.assertEqual(403, int(channel.result["code"]), msg=channel.result["body"]) self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"]) @@ -1592,7 +1865,11 @@ class PushersRestTestCase(unittest.HomeserverTestCase): Tests that a lookup for a user that does not exist returns a 404 """ url = "/_synapse/admin/v1/users/@unknown_person:test/pushers" - channel = self.make_request("GET", url, access_token=self.admin_user_tok,) + channel = self.make_request( + "GET", + url, + access_token=self.admin_user_tok, + ) self.assertEqual(404, channel.code, msg=channel.json_body) self.assertEqual(Codes.NOT_FOUND, channel.json_body["errcode"]) @@ -1603,7 +1880,11 @@ class PushersRestTestCase(unittest.HomeserverTestCase): """ url = "/_synapse/admin/v1/users/@unknown_person:unknown_domain/pushers" - channel = self.make_request("GET", url, access_token=self.admin_user_tok,) + channel = self.make_request( + "GET", + url, + access_token=self.admin_user_tok, + ) self.assertEqual(400, channel.code, msg=channel.json_body) self.assertEqual("Can only lookup local users", channel.json_body["error"]) @@ -1614,7 +1895,11 @@ class PushersRestTestCase(unittest.HomeserverTestCase): """ # Get pushers - channel = self.make_request("GET", self.url, access_token=self.admin_user_tok,) + channel = self.make_request( + "GET", + self.url, + access_token=self.admin_user_tok, + ) self.assertEqual(200, channel.code, msg=channel.json_body) self.assertEqual(0, channel.json_body["total"]) @@ -1641,7 +1926,11 @@ class PushersRestTestCase(unittest.HomeserverTestCase): ) # Get pushers - channel = self.make_request("GET", self.url, access_token=self.admin_user_tok,) + channel = self.make_request( + "GET", + self.url, + access_token=self.admin_user_tok, + ) self.assertEqual(200, channel.code, msg=channel.json_body) self.assertEqual(1, channel.json_body["total"]) @@ -1690,7 +1979,11 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase): """ other_user_token = self.login("user", "pass") - channel = self.make_request("GET", self.url, access_token=other_user_token,) + channel = self.make_request( + "GET", + self.url, + access_token=other_user_token, + ) self.assertEqual(403, int(channel.result["code"]), msg=channel.result["body"]) self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"]) @@ -1700,7 +1993,11 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase): Tests that a lookup for a user that does not exist returns a 404 """ url = "/_synapse/admin/v1/users/@unknown_person:test/media" - channel = self.make_request("GET", url, access_token=self.admin_user_tok,) + channel = self.make_request( + "GET", + url, + access_token=self.admin_user_tok, + ) self.assertEqual(404, channel.code, msg=channel.json_body) self.assertEqual(Codes.NOT_FOUND, channel.json_body["errcode"]) @@ -1711,7 +2008,11 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase): """ url = "/_synapse/admin/v1/users/@unknown_person:unknown_domain/media" - channel = self.make_request("GET", url, access_token=self.admin_user_tok,) + channel = self.make_request( + "GET", + url, + access_token=self.admin_user_tok, + ) self.assertEqual(400, channel.code, msg=channel.json_body) self.assertEqual("Can only lookup local users", channel.json_body["error"]) @@ -1726,7 +2027,9 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase): self._create_media(other_user_tok, number_media) channel = self.make_request( - "GET", self.url + "?limit=5", access_token=self.admin_user_tok, + "GET", + self.url + "?limit=5", + access_token=self.admin_user_tok, ) self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) @@ -1745,7 +2048,9 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase): self._create_media(other_user_tok, number_media) channel = self.make_request( - "GET", self.url + "?from=5", access_token=self.admin_user_tok, + "GET", + self.url + "?from=5", + access_token=self.admin_user_tok, ) self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) @@ -1764,7 +2069,9 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase): self._create_media(other_user_tok, number_media) channel = self.make_request( - "GET", self.url + "?from=5&limit=10", access_token=self.admin_user_tok, + "GET", + self.url + "?from=5&limit=10", + access_token=self.admin_user_tok, ) self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) @@ -1779,7 +2086,9 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase): """ channel = self.make_request( - "GET", self.url + "?limit=-5", access_token=self.admin_user_tok, + "GET", + self.url + "?limit=-5", + access_token=self.admin_user_tok, ) self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"]) @@ -1791,7 +2100,9 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase): """ channel = self.make_request( - "GET", self.url + "?from=-5", access_token=self.admin_user_tok, + "GET", + self.url + "?from=-5", + access_token=self.admin_user_tok, ) self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"]) @@ -1809,7 +2120,9 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase): # `next_token` does not appear # Number of results is the number of entries channel = self.make_request( - "GET", self.url + "?limit=20", access_token=self.admin_user_tok, + "GET", + self.url + "?limit=20", + access_token=self.admin_user_tok, ) self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) @@ -1820,7 +2133,9 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase): # `next_token` does not appear # Number of max results is larger than the number of entries channel = self.make_request( - "GET", self.url + "?limit=21", access_token=self.admin_user_tok, + "GET", + self.url + "?limit=21", + access_token=self.admin_user_tok, ) self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) @@ -1831,7 +2146,9 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase): # `next_token` does appear # Number of max results is smaller than the number of entries channel = self.make_request( - "GET", self.url + "?limit=19", access_token=self.admin_user_tok, + "GET", + self.url + "?limit=19", + access_token=self.admin_user_tok, ) self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) @@ -1843,7 +2160,9 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase): # Set `from` to value of `next_token` for request remaining entries # `next_token` does not appear channel = self.make_request( - "GET", self.url + "?from=19", access_token=self.admin_user_tok, + "GET", + self.url + "?from=19", + access_token=self.admin_user_tok, ) self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) @@ -1857,7 +2176,11 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase): if user has no media created """ - channel = self.make_request("GET", self.url, access_token=self.admin_user_tok,) + channel = self.make_request( + "GET", + self.url, + access_token=self.admin_user_tok, + ) self.assertEqual(200, channel.code, msg=channel.json_body) self.assertEqual(0, channel.json_body["total"]) @@ -1872,7 +2195,11 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase): other_user_tok = self.login("user", "pass") self._create_media(other_user_tok, number_media) - channel = self.make_request("GET", self.url, access_token=self.admin_user_tok,) + channel = self.make_request( + "GET", + self.url, + access_token=self.admin_user_tok, + ) self.assertEqual(200, channel.code, msg=channel.json_body) self.assertEqual(number_media, channel.json_body["total"]) @@ -1899,8 +2226,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase): ) def _check_fields(self, content): - """Checks that all attributes are present in content - """ + """Checks that all attributes are present in content""" for m in content: self.assertIn("media_id", m) self.assertIn("media_type", m) @@ -1913,8 +2239,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase): class UserTokenRestTestCase(unittest.HomeserverTestCase): - """Test for /_synapse/admin/v1/users/<user>/login - """ + """Test for /_synapse/admin/v1/users/<user>/login""" servlets = [ synapse.rest.admin.register_servlets, @@ -1945,16 +2270,14 @@ class UserTokenRestTestCase(unittest.HomeserverTestCase): return channel.json_body["access_token"] def test_no_auth(self): - """Try to login as a user without authentication. - """ + """Try to login as a user without authentication.""" channel = self.make_request("POST", self.url, b"{}") self.assertEqual(401, int(channel.result["code"]), msg=channel.result["body"]) self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"]) def test_not_admin(self): - """Try to login as a user as a non-admin user. - """ + """Try to login as a user as a non-admin user.""" channel = self.make_request( "POST", self.url, b"{}", access_token=self.other_user_tok ) @@ -1962,8 +2285,7 @@ class UserTokenRestTestCase(unittest.HomeserverTestCase): self.assertEqual(403, int(channel.result["code"]), msg=channel.result["body"]) def test_send_event(self): - """Test that sending event as a user works. - """ + """Test that sending event as a user works.""" # Create a room. room_id = self.helper.create_room_as(self.other_user, tok=self.other_user_tok) @@ -1977,8 +2299,7 @@ class UserTokenRestTestCase(unittest.HomeserverTestCase): self.assertEqual(event.sender, self.other_user) def test_devices(self): - """Tests that logging in as a user doesn't create a new device for them. - """ + """Tests that logging in as a user doesn't create a new device for them.""" # Login in as the user self._get_token() @@ -1992,8 +2313,7 @@ class UserTokenRestTestCase(unittest.HomeserverTestCase): self.assertEqual(len(channel.json_body["devices"]), 1) def test_logout(self): - """Test that calling `/logout` with the token works. - """ + """Test that calling `/logout` with the token works.""" # Login in as the user puppet_token = self._get_token() @@ -2083,8 +2403,7 @@ class UserTokenRestTestCase(unittest.HomeserverTestCase): } ) def test_consent(self): - """Test that sending a message is not subject to the privacy policies. - """ + """Test that sending a message is not subject to the privacy policies.""" # Have the admin user accept the terms. self.get_success(self.store.user_set_consent_version(self.admin_user, "1.0")) @@ -2159,11 +2478,19 @@ class WhoisRestTestCase(unittest.HomeserverTestCase): self.register_user("user2", "pass") other_user2_token = self.login("user2", "pass") - channel = self.make_request("GET", self.url1, access_token=other_user2_token,) + channel = self.make_request( + "GET", + self.url1, + access_token=other_user2_token, + ) self.assertEqual(403, int(channel.result["code"]), msg=channel.result["body"]) self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"]) - channel = self.make_request("GET", self.url2, access_token=other_user2_token,) + channel = self.make_request( + "GET", + self.url2, + access_token=other_user2_token, + ) self.assertEqual(403, int(channel.result["code"]), msg=channel.result["body"]) self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"]) @@ -2174,11 +2501,19 @@ class WhoisRestTestCase(unittest.HomeserverTestCase): url1 = "/_synapse/admin/v1/whois/@unknown_person:unknown_domain" url2 = "/_matrix/client/r0/admin/whois/@unknown_person:unknown_domain" - channel = self.make_request("GET", url1, access_token=self.admin_user_tok,) + channel = self.make_request( + "GET", + url1, + access_token=self.admin_user_tok, + ) self.assertEqual(400, channel.code, msg=channel.json_body) self.assertEqual("Can only whois a local user", channel.json_body["error"]) - channel = self.make_request("GET", url2, access_token=self.admin_user_tok,) + channel = self.make_request( + "GET", + url2, + access_token=self.admin_user_tok, + ) self.assertEqual(400, channel.code, msg=channel.json_body) self.assertEqual("Can only whois a local user", channel.json_body["error"]) @@ -2186,12 +2521,20 @@ class WhoisRestTestCase(unittest.HomeserverTestCase): """ The lookup should succeed for an admin. """ - channel = self.make_request("GET", self.url1, access_token=self.admin_user_tok,) + channel = self.make_request( + "GET", + self.url1, + access_token=self.admin_user_tok, + ) self.assertEqual(200, channel.code, msg=channel.json_body) self.assertEqual(self.other_user, channel.json_body["user_id"]) self.assertIn("devices", channel.json_body) - channel = self.make_request("GET", self.url2, access_token=self.admin_user_tok,) + channel = self.make_request( + "GET", + self.url2, + access_token=self.admin_user_tok, + ) self.assertEqual(200, channel.code, msg=channel.json_body) self.assertEqual(self.other_user, channel.json_body["user_id"]) self.assertIn("devices", channel.json_body) @@ -2202,12 +2545,84 @@ class WhoisRestTestCase(unittest.HomeserverTestCase): """ other_user_token = self.login("user", "pass") - channel = self.make_request("GET", self.url1, access_token=other_user_token,) + channel = self.make_request( + "GET", + self.url1, + access_token=other_user_token, + ) self.assertEqual(200, channel.code, msg=channel.json_body) self.assertEqual(self.other_user, channel.json_body["user_id"]) self.assertIn("devices", channel.json_body) - channel = self.make_request("GET", self.url2, access_token=other_user_token,) + channel = self.make_request( + "GET", + self.url2, + access_token=other_user_token, + ) self.assertEqual(200, channel.code, msg=channel.json_body) self.assertEqual(self.other_user, channel.json_body["user_id"]) self.assertIn("devices", channel.json_body) + + +class ShadowBanRestTestCase(unittest.HomeserverTestCase): + + servlets = [ + synapse.rest.admin.register_servlets, + login.register_servlets, + ] + + def prepare(self, reactor, clock, hs): + self.store = hs.get_datastore() + + self.admin_user = self.register_user("admin", "pass", admin=True) + self.admin_user_tok = self.login("admin", "pass") + + self.other_user = self.register_user("user", "pass") + + self.url = "/_synapse/admin/v1/users/%s/shadow_ban" % urllib.parse.quote( + self.other_user + ) + + def test_no_auth(self): + """ + Try to get information of an user without authentication. + """ + channel = self.make_request("POST", self.url) + self.assertEqual(401, int(channel.result["code"]), msg=channel.result["body"]) + self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"]) + + def test_requester_is_not_admin(self): + """ + If the user is not a server admin, an error is returned. + """ + other_user_token = self.login("user", "pass") + + channel = self.make_request("POST", self.url, access_token=other_user_token) + self.assertEqual(403, int(channel.result["code"]), msg=channel.result["body"]) + self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"]) + + def test_user_is_not_local(self): + """ + Tests that shadow-banning for a user that is not a local returns a 400 + """ + url = "/_synapse/admin/v1/whois/@unknown_person:unknown_domain" + + channel = self.make_request("POST", url, access_token=self.admin_user_tok) + self.assertEqual(400, channel.code, msg=channel.json_body) + + def test_success(self): + """ + Shadow-banning should succeed for an admin. + """ + # The user starts off as not shadow-banned. + other_user_token = self.login("user", "pass") + result = self.get_success(self.store.get_user_by_access_token(other_user_token)) + self.assertFalse(result.shadow_banned) + + channel = self.make_request("POST", self.url, access_token=self.admin_user_tok) + self.assertEqual(200, channel.code, msg=channel.json_body) + self.assertEqual({}, channel.json_body) + + # Ensure the user is shadow-banned (and the cache was cleared). + result = self.get_success(self.store.get_user_by_access_token(other_user_token)) + self.assertTrue(result.shadow_banned)