diff --git a/tests/rest/admin/test_admin.py b/tests/rest/admin/test_admin.py
index af849bd471..3adadcb46b 100644
--- a/tests/rest/admin/test_admin.py
+++ b/tests/rest/admin/test_admin.py
@@ -12,9 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import json
import os
import urllib.parse
+from http import HTTPStatus
from unittest.mock import Mock
from twisted.internet.defer import Deferred
@@ -41,7 +41,7 @@ class VersionTestCase(unittest.HomeserverTestCase):
def test_version_string(self):
channel = self.make_request("GET", self.url, shorthand=False)
- self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(
{"server_version", "python_version"}, set(channel.json_body.keys())
)
@@ -70,11 +70,11 @@ class DeleteGroupTestCase(unittest.HomeserverTestCase):
content={"localpart": "test"},
)
- self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
group_id = channel.json_body["group_id"]
- self._check_group(group_id, expect_code=200)
+ self._check_group(group_id, expect_code=HTTPStatus.OK)
# Invite/join another user
@@ -82,13 +82,13 @@ class DeleteGroupTestCase(unittest.HomeserverTestCase):
channel = self.make_request(
"PUT", url.encode("ascii"), access_token=self.admin_user_tok, content={}
)
- self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
url = "/groups/%s/self/accept_invite" % (group_id,)
channel = self.make_request(
"PUT", url.encode("ascii"), access_token=self.other_user_token, content={}
)
- self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
# Check other user knows they're in the group
self.assertIn(group_id, self._get_groups_user_is_in(self.admin_user_tok))
@@ -103,10 +103,10 @@ class DeleteGroupTestCase(unittest.HomeserverTestCase):
content={"localpart": "test"},
)
- self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
- # Check group returns 404
- self._check_group(group_id, expect_code=404)
+ # Check group returns HTTPStatus.NOT_FOUND
+ self._check_group(group_id, expect_code=HTTPStatus.NOT_FOUND)
# Check users don't think they're in the group
self.assertNotIn(group_id, self._get_groups_user_is_in(self.admin_user_tok))
@@ -122,15 +122,13 @@ class DeleteGroupTestCase(unittest.HomeserverTestCase):
"GET", url.encode("ascii"), access_token=self.admin_user_tok
)
- self.assertEqual(
- expect_code, int(channel.result["code"]), msg=channel.result["body"]
- )
+ self.assertEqual(expect_code, channel.code, msg=channel.json_body)
def _get_groups_user_is_in(self, access_token):
"""Returns the list of groups the user is in (given their access token)"""
channel = self.make_request("GET", b"/joined_groups", access_token=access_token)
- self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
return channel.json_body["groups"]
@@ -210,10 +208,10 @@ class QuarantineMediaTestCase(unittest.HomeserverTestCase):
# Should be quarantined
self.assertEqual(
- 404,
- int(channel.code),
+ HTTPStatus.NOT_FOUND,
+ channel.code,
msg=(
- "Expected to receive a 404 on accessing quarantined media: %s"
+ "Expected to receive a HTTPStatus.NOT_FOUND on accessing quarantined media: %s"
% server_and_media_id
),
)
@@ -232,8 +230,8 @@ class QuarantineMediaTestCase(unittest.HomeserverTestCase):
# Expect a forbidden error
self.assertEqual(
- 403,
- int(channel.result["code"]),
+ HTTPStatus.FORBIDDEN,
+ channel.code,
msg="Expected forbidden on quarantining media as a non-admin",
)
@@ -247,8 +245,8 @@ class QuarantineMediaTestCase(unittest.HomeserverTestCase):
# Expect a forbidden error
self.assertEqual(
- 403,
- int(channel.result["code"]),
+ HTTPStatus.FORBIDDEN,
+ channel.code,
msg="Expected forbidden on quarantining media as a non-admin",
)
@@ -279,7 +277,7 @@ class QuarantineMediaTestCase(unittest.HomeserverTestCase):
)
# Should be successful
- self.assertEqual(200, int(channel.code), msg=channel.result["body"])
+ self.assertEqual(HTTPStatus.OK, channel.code)
# Quarantine the media
url = "/_synapse/admin/v1/media/quarantine/%s/%s" % (
@@ -292,7 +290,7 @@ class QuarantineMediaTestCase(unittest.HomeserverTestCase):
access_token=admin_user_tok,
)
self.pump(1.0)
- self.assertEqual(200, int(channel.code), msg=channel.result["body"])
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
# Attempt to access the media
self._ensure_quarantined(admin_user_tok, server_name_and_media_id)
@@ -348,11 +346,9 @@ class QuarantineMediaTestCase(unittest.HomeserverTestCase):
access_token=admin_user_tok,
)
self.pump(1.0)
- self.assertEqual(200, int(channel.code), msg=channel.result["body"])
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(
- json.loads(channel.result["body"].decode("utf-8")),
- {"num_quarantined": 2},
- "Expected 2 quarantined items",
+ channel.json_body, {"num_quarantined": 2}, "Expected 2 quarantined items"
)
# Convert mxc URLs to server/media_id strings
@@ -396,11 +392,9 @@ class QuarantineMediaTestCase(unittest.HomeserverTestCase):
access_token=admin_user_tok,
)
self.pump(1.0)
- self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(
- json.loads(channel.result["body"].decode("utf-8")),
- {"num_quarantined": 2},
- "Expected 2 quarantined items",
+ channel.json_body, {"num_quarantined": 2}, "Expected 2 quarantined items"
)
# Attempt to access each piece of media
@@ -432,7 +426,7 @@ class QuarantineMediaTestCase(unittest.HomeserverTestCase):
url = "/_synapse/admin/v1/media/protect/%s" % (urllib.parse.quote(media_id_2),)
channel = self.make_request("POST", url, access_token=admin_user_tok)
self.pump(1.0)
- self.assertEqual(200, int(channel.code), msg=channel.result["body"])
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
# Quarantine all media by this user
url = "/_synapse/admin/v1/user/%s/media/quarantine" % urllib.parse.quote(
@@ -444,11 +438,9 @@ class QuarantineMediaTestCase(unittest.HomeserverTestCase):
access_token=admin_user_tok,
)
self.pump(1.0)
- self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(
- json.loads(channel.result["body"].decode("utf-8")),
- {"num_quarantined": 1},
- "Expected 1 quarantined item",
+ channel.json_body, {"num_quarantined": 1}, "Expected 1 quarantined item"
)
# Attempt to access each piece of media, the first should fail, the
@@ -467,10 +459,10 @@ class QuarantineMediaTestCase(unittest.HomeserverTestCase):
# Shouldn't be quarantined
self.assertEqual(
- 200,
- int(channel.code),
+ HTTPStatus.OK,
+ channel.code,
msg=(
- "Expected to receive a 200 on accessing not-quarantined media: %s"
+ "Expected to receive a HTTPStatus.OK on accessing not-quarantined media: %s"
% server_and_media_id_2
),
)
@@ -499,7 +491,7 @@ class PurgeHistoryTestCase(unittest.HomeserverTestCase):
def test_purge_history(self):
"""
Simple test of purge history API.
- Test only that is is possible to call, get status 200 and purge_id.
+ Test only that is is possible to call, get status HTTPStatus.OK and purge_id.
"""
channel = self.make_request(
@@ -509,7 +501,7 @@ class PurgeHistoryTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertIn("purge_id", channel.json_body)
purge_id = channel.json_body["purge_id"]
@@ -520,5 +512,5 @@ class PurgeHistoryTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual("complete", channel.json_body["status"])
diff --git a/tests/rest/admin/test_background_updates.py b/tests/rest/admin/test_background_updates.py
index cd5c60b65c..4d152c0d66 100644
--- a/tests/rest/admin/test_background_updates.py
+++ b/tests/rest/admin/test_background_updates.py
@@ -16,11 +16,14 @@ from typing import Collection
from parameterized import parameterized
+from twisted.test.proto_helpers import MemoryReactor
+
import synapse.rest.admin
from synapse.api.errors import Codes
from synapse.rest.client import login
from synapse.server import HomeServer
from synapse.storage.background_updates import BackgroundUpdater
+from synapse.util import Clock
from tests import unittest
@@ -31,7 +34,7 @@ class BackgroundUpdatesTestCase(unittest.HomeserverTestCase):
login.register_servlets,
]
- def prepare(self, reactor, clock, hs: HomeServer):
+ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastore()
self.admin_user = self.register_user("admin", "pass", admin=True)
self.admin_user_tok = self.login("admin", "pass")
@@ -44,9 +47,9 @@ class BackgroundUpdatesTestCase(unittest.HomeserverTestCase):
("POST", "/_synapse/admin/v1/background_updates/start_job"),
]
)
- def test_requester_is_no_admin(self, method: str, url: str):
+ def test_requester_is_no_admin(self, method: str, url: str) -> None:
"""
- If the user is not a server admin, an error 403 is returned.
+ If the user is not a server admin, an error HTTPStatus.FORBIDDEN is returned.
"""
self.register_user("user", "pass", admin=False)
@@ -62,7 +65,7 @@ class BackgroundUpdatesTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.FORBIDDEN, channel.code, msg=channel.json_body)
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
- def test_invalid_parameter(self):
+ def test_invalid_parameter(self) -> None:
"""
If parameters are invalid, an error is returned.
"""
@@ -90,7 +93,7 @@ class BackgroundUpdatesTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(Codes.UNKNOWN, channel.json_body["errcode"])
- def _register_bg_update(self):
+ def _register_bg_update(self) -> None:
"Adds a bg update but doesn't start it"
async def _fake_update(progress, batch_size) -> int:
@@ -112,7 +115,7 @@ class BackgroundUpdatesTestCase(unittest.HomeserverTestCase):
)
)
- def test_status_empty(self):
+ def test_status_empty(self) -> None:
"""Test the status API works."""
channel = self.make_request(
@@ -127,7 +130,7 @@ class BackgroundUpdatesTestCase(unittest.HomeserverTestCase):
channel.json_body, {"current_updates": {}, "enabled": True}
)
- def test_status_bg_update(self):
+ def test_status_bg_update(self) -> None:
"""Test the status API works with a background update."""
# Create a new background update
@@ -135,7 +138,7 @@ class BackgroundUpdatesTestCase(unittest.HomeserverTestCase):
self._register_bg_update()
self.store.db_pool.updates.start_doing_background_updates()
- self.reactor.pump([1.0, 1.0])
+ self.reactor.pump([1.0, 1.0, 1.0])
channel = self.make_request(
"GET",
@@ -162,7 +165,7 @@ class BackgroundUpdatesTestCase(unittest.HomeserverTestCase):
},
)
- def test_enabled(self):
+ def test_enabled(self) -> None:
"""Test the enabled API works."""
# Create a new background update
@@ -299,7 +302,7 @@ class BackgroundUpdatesTestCase(unittest.HomeserverTestCase):
),
]
)
- def test_start_backround_job(self, job_name: str, updates: Collection[str]):
+ def test_start_backround_job(self, job_name: str, updates: Collection[str]) -> None:
"""
Test that background updates add to database and be processed.
@@ -341,7 +344,7 @@ class BackgroundUpdatesTestCase(unittest.HomeserverTestCase):
)
)
- def test_start_backround_job_twice(self):
+ def test_start_backround_job_twice(self) -> None:
"""Test that add a background update twice return an error."""
# add job to database
diff --git a/tests/rest/admin/test_device.py b/tests/rest/admin/test_device.py
index a3679be205..f7080bda87 100644
--- a/tests/rest/admin/test_device.py
+++ b/tests/rest/admin/test_device.py
@@ -11,14 +11,18 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
import urllib.parse
+from http import HTTPStatus
from parameterized import parameterized
+from twisted.test.proto_helpers import MemoryReactor
+
import synapse.rest.admin
from synapse.api.errors import Codes
from synapse.rest.client import login
+from synapse.server import HomeServer
+from synapse.util import Clock
from tests import unittest
@@ -30,7 +34,7 @@ class DeviceRestTestCase(unittest.HomeserverTestCase):
login.register_servlets,
]
- def prepare(self, reactor, clock, hs):
+ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.handler = hs.get_device_handler()
self.admin_user = self.register_user("admin", "pass", admin=True)
@@ -47,17 +51,21 @@ class DeviceRestTestCase(unittest.HomeserverTestCase):
)
@parameterized.expand(["GET", "PUT", "DELETE"])
- def test_no_auth(self, method: str):
+ def test_no_auth(self, method: str) -> None:
"""
Try to get a device of an user without authentication.
"""
channel = self.make_request(method, self.url, b"{}")
- self.assertEqual(401, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.UNAUTHORIZED,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
@parameterized.expand(["GET", "PUT", "DELETE"])
- def test_requester_is_no_admin(self, method: str):
+ def test_requester_is_no_admin(self, method: str) -> None:
"""
If the user is not a server admin, an error is returned.
"""
@@ -67,13 +75,17 @@ class DeviceRestTestCase(unittest.HomeserverTestCase):
access_token=self.other_user_token,
)
- self.assertEqual(403, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.FORBIDDEN,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
@parameterized.expand(["GET", "PUT", "DELETE"])
- def test_user_does_not_exist(self, method: str):
+ def test_user_does_not_exist(self, method: str) -> None:
"""
- Tests that a lookup for a user that does not exist returns a 404
+ Tests that a lookup for a user that does not exist returns a HTTPStatus.NOT_FOUND
"""
url = (
"/_synapse/admin/v2/users/@unknown_person:test/devices/%s"
@@ -86,13 +98,13 @@ class DeviceRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(404, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.NOT_FOUND, channel.code, msg=channel.json_body)
self.assertEqual(Codes.NOT_FOUND, channel.json_body["errcode"])
@parameterized.expand(["GET", "PUT", "DELETE"])
- def test_user_is_not_local(self, method: str):
+ def test_user_is_not_local(self, method: str) -> None:
"""
- Tests that a lookup for a user that is not a local returns a 400
+ Tests that a lookup for a user that is not a local returns a HTTPStatus.BAD_REQUEST
"""
url = (
"/_synapse/admin/v2/users/@unknown_person:unknown_domain/devices/%s"
@@ -105,12 +117,12 @@ class DeviceRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(400, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual("Can only lookup local users", channel.json_body["error"])
- def test_unknown_device(self):
+ def test_unknown_device(self) -> None:
"""
- Tests that a lookup for a device that does not exist returns either 404 or 200.
+ Tests that a lookup for a device that does not exist returns either HTTPStatus.NOT_FOUND or HTTPStatus.OK.
"""
url = "/_synapse/admin/v2/users/%s/devices/unknown_device" % urllib.parse.quote(
self.other_user
@@ -122,7 +134,7 @@ class DeviceRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(404, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.NOT_FOUND, channel.code, msg=channel.json_body)
self.assertEqual(Codes.NOT_FOUND, channel.json_body["errcode"])
channel = self.make_request(
@@ -131,7 +143,7 @@ class DeviceRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
channel = self.make_request(
"DELETE",
@@ -139,10 +151,10 @@ class DeviceRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- # Delete unknown device returns status 200
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ # Delete unknown device returns status HTTPStatus.OK
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
- def test_update_device_too_long_display_name(self):
+ def test_update_device_too_long_display_name(self) -> None:
"""
Update a device with a display name that is invalid (too long).
"""
@@ -167,7 +179,7 @@ class DeviceRestTestCase(unittest.HomeserverTestCase):
content=update,
)
- self.assertEqual(400, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(Codes.TOO_LARGE, channel.json_body["errcode"])
# Ensure the display name was not updated.
@@ -177,12 +189,12 @@ class DeviceRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual("new display", channel.json_body["display_name"])
- def test_update_no_display_name(self):
+ def test_update_no_display_name(self) -> None:
"""
- Tests that a update for a device without JSON returns a 200
+ Tests that a update for a device without JSON returns a HTTPStatus.OK
"""
# Set iniital display name.
update = {"display_name": "new display"}
@@ -198,7 +210,7 @@ class DeviceRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
# Ensure the display name was not updated.
channel = self.make_request(
@@ -207,10 +219,10 @@ class DeviceRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual("new display", channel.json_body["display_name"])
- def test_update_display_name(self):
+ def test_update_display_name(self) -> None:
"""
Tests a normal successful update of display name
"""
@@ -222,7 +234,7 @@ class DeviceRestTestCase(unittest.HomeserverTestCase):
content={"display_name": "new displayname"},
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
# Check new display_name
channel = self.make_request(
@@ -231,10 +243,10 @@ class DeviceRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual("new displayname", channel.json_body["display_name"])
- def test_get_device(self):
+ def test_get_device(self) -> None:
"""
Tests that a normal lookup for a device is successfully
"""
@@ -244,7 +256,7 @@ class DeviceRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(self.other_user, channel.json_body["user_id"])
# Check that all fields are available
self.assertIn("user_id", channel.json_body)
@@ -253,7 +265,7 @@ class DeviceRestTestCase(unittest.HomeserverTestCase):
self.assertIn("last_seen_ip", channel.json_body)
self.assertIn("last_seen_ts", channel.json_body)
- def test_delete_device(self):
+ def test_delete_device(self) -> None:
"""
Tests that a remove of a device is successfully
"""
@@ -269,7 +281,7 @@ class DeviceRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
# Ensure that the number of devices is decreased
res = self.get_success(self.handler.get_devices_by_user(self.other_user))
@@ -283,7 +295,7 @@ class DevicesRestTestCase(unittest.HomeserverTestCase):
login.register_servlets,
]
- def prepare(self, reactor, clock, hs):
+ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.admin_user = self.register_user("admin", "pass", admin=True)
self.admin_user_tok = self.login("admin", "pass")
@@ -293,16 +305,20 @@ class DevicesRestTestCase(unittest.HomeserverTestCase):
self.other_user
)
- def test_no_auth(self):
+ def test_no_auth(self) -> None:
"""
Try to list devices of an user without authentication.
"""
channel = self.make_request("GET", self.url, b"{}")
- self.assertEqual(401, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.UNAUTHORIZED,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
- def test_requester_is_no_admin(self):
+ def test_requester_is_no_admin(self) -> None:
"""
If the user is not a server admin, an error is returned.
"""
@@ -314,12 +330,16 @@ class DevicesRestTestCase(unittest.HomeserverTestCase):
access_token=other_user_token,
)
- self.assertEqual(403, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.FORBIDDEN,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
- def test_user_does_not_exist(self):
+ def test_user_does_not_exist(self) -> None:
"""
- Tests that a lookup for a user that does not exist returns a 404
+ Tests that a lookup for a user that does not exist returns a HTTPStatus.NOT_FOUND
"""
url = "/_synapse/admin/v2/users/@unknown_person:test/devices"
channel = self.make_request(
@@ -328,12 +348,12 @@ class DevicesRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(404, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.NOT_FOUND, channel.code, msg=channel.json_body)
self.assertEqual(Codes.NOT_FOUND, channel.json_body["errcode"])
- def test_user_is_not_local(self):
+ def test_user_is_not_local(self) -> None:
"""
- Tests that a lookup for a user that is not a local returns a 400
+ Tests that a lookup for a user that is not a local returns a HTTPStatus.BAD_REQUEST
"""
url = "/_synapse/admin/v2/users/@unknown_person:unknown_domain/devices"
@@ -343,10 +363,10 @@ class DevicesRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(400, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual("Can only lookup local users", channel.json_body["error"])
- def test_user_has_no_devices(self):
+ def test_user_has_no_devices(self) -> None:
"""
Tests that a normal lookup for devices is successfully
if user has no devices
@@ -359,11 +379,11 @@ class DevicesRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(0, channel.json_body["total"])
self.assertEqual(0, len(channel.json_body["devices"]))
- def test_get_devices(self):
+ def test_get_devices(self) -> None:
"""
Tests that a normal lookup for devices is successfully
"""
@@ -379,7 +399,7 @@ class DevicesRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(number_devices, channel.json_body["total"])
self.assertEqual(number_devices, len(channel.json_body["devices"]))
self.assertEqual(self.other_user, channel.json_body["devices"][0]["user_id"])
@@ -399,7 +419,7 @@ class DeleteDevicesRestTestCase(unittest.HomeserverTestCase):
login.register_servlets,
]
- def prepare(self, reactor, clock, hs):
+ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.handler = hs.get_device_handler()
self.admin_user = self.register_user("admin", "pass", admin=True)
@@ -411,16 +431,20 @@ class DeleteDevicesRestTestCase(unittest.HomeserverTestCase):
self.other_user
)
- def test_no_auth(self):
+ def test_no_auth(self) -> None:
"""
Try to delete devices of an user without authentication.
"""
channel = self.make_request("POST", self.url, b"{}")
- self.assertEqual(401, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.UNAUTHORIZED,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
- def test_requester_is_no_admin(self):
+ def test_requester_is_no_admin(self) -> None:
"""
If the user is not a server admin, an error is returned.
"""
@@ -432,12 +456,16 @@ class DeleteDevicesRestTestCase(unittest.HomeserverTestCase):
access_token=other_user_token,
)
- self.assertEqual(403, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.FORBIDDEN,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
- def test_user_does_not_exist(self):
+ def test_user_does_not_exist(self) -> None:
"""
- Tests that a lookup for a user that does not exist returns a 404
+ Tests that a lookup for a user that does not exist returns a HTTPStatus.NOT_FOUND
"""
url = "/_synapse/admin/v2/users/@unknown_person:test/delete_devices"
channel = self.make_request(
@@ -446,12 +474,12 @@ class DeleteDevicesRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(404, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.NOT_FOUND, channel.code, msg=channel.json_body)
self.assertEqual(Codes.NOT_FOUND, channel.json_body["errcode"])
- def test_user_is_not_local(self):
+ def test_user_is_not_local(self) -> None:
"""
- Tests that a lookup for a user that is not a local returns a 400
+ Tests that a lookup for a user that is not a local returns a HTTPStatus.BAD_REQUEST
"""
url = "/_synapse/admin/v2/users/@unknown_person:unknown_domain/delete_devices"
@@ -461,12 +489,12 @@ class DeleteDevicesRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(400, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual("Can only lookup local users", channel.json_body["error"])
- def test_unknown_devices(self):
+ def test_unknown_devices(self) -> None:
"""
- Tests that a remove of a device that does not exist returns 200.
+ Tests that a remove of a device that does not exist returns HTTPStatus.OK.
"""
channel = self.make_request(
"POST",
@@ -475,10 +503,10 @@ class DeleteDevicesRestTestCase(unittest.HomeserverTestCase):
content={"devices": ["unknown_device1", "unknown_device2"]},
)
- # Delete unknown devices returns status 200
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ # Delete unknown devices returns status HTTPStatus.OK
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
- def test_delete_devices(self):
+ def test_delete_devices(self) -> None:
"""
Tests that a remove of devices is successfully
"""
@@ -505,7 +533,7 @@ class DeleteDevicesRestTestCase(unittest.HomeserverTestCase):
content={"devices": device_ids},
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
res = self.get_success(self.handler.get_devices_by_user(self.other_user))
self.assertEqual(0, len(res))
diff --git a/tests/rest/admin/test_event_reports.py b/tests/rest/admin/test_event_reports.py
index e9ef89731f..4f89f8b534 100644
--- a/tests/rest/admin/test_event_reports.py
+++ b/tests/rest/admin/test_event_reports.py
@@ -11,12 +11,17 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+from http import HTTPStatus
+from typing import List
-import json
+from twisted.test.proto_helpers import MemoryReactor
import synapse.rest.admin
from synapse.api.errors import Codes
from synapse.rest.client import login, report_event, room
+from synapse.server import HomeServer
+from synapse.types import JsonDict
+from synapse.util import Clock
from tests import unittest
@@ -29,7 +34,7 @@ class EventReportsTestCase(unittest.HomeserverTestCase):
report_event.register_servlets,
]
- def prepare(self, reactor, clock, hs):
+ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.admin_user = self.register_user("admin", "pass", admin=True)
self.admin_user_tok = self.login("admin", "pass")
@@ -70,18 +75,22 @@ class EventReportsTestCase(unittest.HomeserverTestCase):
self.url = "/_synapse/admin/v1/event_reports"
- def test_no_auth(self):
+ def test_no_auth(self) -> None:
"""
Try to get an event report without authentication.
"""
channel = self.make_request("GET", self.url, b"{}")
- self.assertEqual(401, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.UNAUTHORIZED,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
- def test_requester_is_no_admin(self):
+ def test_requester_is_no_admin(self) -> None:
"""
- If the user is not a server admin, an error 403 is returned.
+ If the user is not a server admin, an error HTTPStatus.FORBIDDEN is returned.
"""
channel = self.make_request(
@@ -90,10 +99,14 @@ class EventReportsTestCase(unittest.HomeserverTestCase):
access_token=self.other_user_tok,
)
- self.assertEqual(403, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.FORBIDDEN,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
- def test_default_success(self):
+ def test_default_success(self) -> None:
"""
Testing list of reported events
"""
@@ -104,13 +117,13 @@ class EventReportsTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["total"], 20)
self.assertEqual(len(channel.json_body["event_reports"]), 20)
self.assertNotIn("next_token", channel.json_body)
self._check_fields(channel.json_body["event_reports"])
- def test_limit(self):
+ def test_limit(self) -> None:
"""
Testing list of reported events with limit
"""
@@ -121,13 +134,13 @@ class EventReportsTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["total"], 20)
self.assertEqual(len(channel.json_body["event_reports"]), 5)
self.assertEqual(channel.json_body["next_token"], 5)
self._check_fields(channel.json_body["event_reports"])
- def test_from(self):
+ def test_from(self) -> None:
"""
Testing list of reported events with a defined starting point (from)
"""
@@ -138,13 +151,13 @@ class EventReportsTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["total"], 20)
self.assertEqual(len(channel.json_body["event_reports"]), 15)
self.assertNotIn("next_token", channel.json_body)
self._check_fields(channel.json_body["event_reports"])
- def test_limit_and_from(self):
+ def test_limit_and_from(self) -> None:
"""
Testing list of reported events with a defined starting point and limit
"""
@@ -155,13 +168,13 @@ class EventReportsTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["total"], 20)
self.assertEqual(channel.json_body["next_token"], 15)
self.assertEqual(len(channel.json_body["event_reports"]), 10)
self._check_fields(channel.json_body["event_reports"])
- def test_filter_room(self):
+ def test_filter_room(self) -> None:
"""
Testing list of reported events with a filter of room
"""
@@ -172,7 +185,7 @@ class EventReportsTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["total"], 10)
self.assertEqual(len(channel.json_body["event_reports"]), 10)
self.assertNotIn("next_token", channel.json_body)
@@ -181,7 +194,7 @@ class EventReportsTestCase(unittest.HomeserverTestCase):
for report in channel.json_body["event_reports"]:
self.assertEqual(report["room_id"], self.room_id1)
- def test_filter_user(self):
+ def test_filter_user(self) -> None:
"""
Testing list of reported events with a filter of user
"""
@@ -192,7 +205,7 @@ class EventReportsTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["total"], 10)
self.assertEqual(len(channel.json_body["event_reports"]), 10)
self.assertNotIn("next_token", channel.json_body)
@@ -201,7 +214,7 @@ class EventReportsTestCase(unittest.HomeserverTestCase):
for report in channel.json_body["event_reports"]:
self.assertEqual(report["user_id"], self.other_user)
- def test_filter_user_and_room(self):
+ def test_filter_user_and_room(self) -> None:
"""
Testing list of reported events with a filter of user and room
"""
@@ -212,7 +225,7 @@ class EventReportsTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["total"], 5)
self.assertEqual(len(channel.json_body["event_reports"]), 5)
self.assertNotIn("next_token", channel.json_body)
@@ -222,7 +235,7 @@ class EventReportsTestCase(unittest.HomeserverTestCase):
self.assertEqual(report["user_id"], self.other_user)
self.assertEqual(report["room_id"], self.room_id1)
- def test_valid_search_order(self):
+ def test_valid_search_order(self) -> None:
"""
Testing search order. Order by timestamps.
"""
@@ -234,7 +247,7 @@ class EventReportsTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["total"], 20)
self.assertEqual(len(channel.json_body["event_reports"]), 20)
report = 1
@@ -252,7 +265,7 @@ class EventReportsTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["total"], 20)
self.assertEqual(len(channel.json_body["event_reports"]), 20)
report = 1
@@ -263,9 +276,9 @@ class EventReportsTestCase(unittest.HomeserverTestCase):
)
report += 1
- def test_invalid_search_order(self):
+ def test_invalid_search_order(self) -> None:
"""
- Testing that a invalid search order returns a 400
+ Testing that a invalid search order returns a HTTPStatus.BAD_REQUEST
"""
channel = self.make_request(
@@ -274,13 +287,17 @@ class EventReportsTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.BAD_REQUEST,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"])
self.assertEqual("Unknown direction: bar", channel.json_body["error"])
- def test_limit_is_negative(self):
+ def test_limit_is_negative(self) -> None:
"""
- Testing that a negative limit parameter returns a 400
+ Testing that a negative limit parameter returns a HTTPStatus.BAD_REQUEST
"""
channel = self.make_request(
@@ -289,12 +306,16 @@ class EventReportsTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.BAD_REQUEST,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"])
- def test_from_is_negative(self):
+ def test_from_is_negative(self) -> None:
"""
- Testing that a negative from parameter returns a 400
+ Testing that a negative from parameter returns a HTTPStatus.BAD_REQUEST
"""
channel = self.make_request(
@@ -303,10 +324,14 @@ class EventReportsTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.BAD_REQUEST,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"])
- def test_next_token(self):
+ def test_next_token(self) -> None:
"""
Testing that `next_token` appears at the right place
"""
@@ -319,7 +344,7 @@ class EventReportsTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["total"], 20)
self.assertEqual(len(channel.json_body["event_reports"]), 20)
self.assertNotIn("next_token", channel.json_body)
@@ -332,7 +357,7 @@ class EventReportsTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["total"], 20)
self.assertEqual(len(channel.json_body["event_reports"]), 20)
self.assertNotIn("next_token", channel.json_body)
@@ -345,7 +370,7 @@ class EventReportsTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["total"], 20)
self.assertEqual(len(channel.json_body["event_reports"]), 19)
self.assertEqual(channel.json_body["next_token"], 19)
@@ -359,12 +384,12 @@ class EventReportsTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["total"], 20)
self.assertEqual(len(channel.json_body["event_reports"]), 1)
self.assertNotIn("next_token", channel.json_body)
- def _create_event_and_report(self, room_id, user_tok):
+ def _create_event_and_report(self, room_id: str, user_tok: str) -> None:
"""Create and report events"""
resp = self.helper.send(room_id, tok=user_tok)
event_id = resp["event_id"]
@@ -372,12 +397,14 @@ class EventReportsTestCase(unittest.HomeserverTestCase):
channel = self.make_request(
"POST",
"rooms/%s/report/%s" % (room_id, event_id),
- json.dumps({"score": -100, "reason": "this makes me sad"}),
+ {"score": -100, "reason": "this makes me sad"},
access_token=user_tok,
)
- self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
- def _create_event_and_report_without_parameters(self, room_id, user_tok):
+ def _create_event_and_report_without_parameters(
+ self, room_id: str, user_tok: str
+ ) -> None:
"""Create and report an event, but omit reason and score"""
resp = self.helper.send(room_id, tok=user_tok)
event_id = resp["event_id"]
@@ -385,12 +412,12 @@ class EventReportsTestCase(unittest.HomeserverTestCase):
channel = self.make_request(
"POST",
"rooms/%s/report/%s" % (room_id, event_id),
- json.dumps({}),
+ {},
access_token=user_tok,
)
- self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
- def _check_fields(self, content):
+ def _check_fields(self, content: List[JsonDict]) -> None:
"""Checks that all attributes are present in an event report"""
for c in content:
self.assertIn("id", c)
@@ -413,7 +440,7 @@ class EventReportDetailTestCase(unittest.HomeserverTestCase):
report_event.register_servlets,
]
- def prepare(self, reactor, clock, hs):
+ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.admin_user = self.register_user("admin", "pass", admin=True)
self.admin_user_tok = self.login("admin", "pass")
@@ -433,18 +460,22 @@ class EventReportDetailTestCase(unittest.HomeserverTestCase):
# first created event report gets `id`=2
self.url = "/_synapse/admin/v1/event_reports/2"
- def test_no_auth(self):
+ def test_no_auth(self) -> None:
"""
Try to get event report without authentication.
"""
channel = self.make_request("GET", self.url, b"{}")
- self.assertEqual(401, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.UNAUTHORIZED,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
- def test_requester_is_no_admin(self):
+ def test_requester_is_no_admin(self) -> None:
"""
- If the user is not a server admin, an error 403 is returned.
+ If the user is not a server admin, an error HTTPStatus.FORBIDDEN is returned.
"""
channel = self.make_request(
@@ -453,10 +484,14 @@ class EventReportDetailTestCase(unittest.HomeserverTestCase):
access_token=self.other_user_tok,
)
- self.assertEqual(403, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.FORBIDDEN,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
- def test_default_success(self):
+ def test_default_success(self) -> None:
"""
Testing get a reported event
"""
@@ -467,12 +502,12 @@ class EventReportDetailTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self._check_fields(channel.json_body)
- def test_invalid_report_id(self):
+ def test_invalid_report_id(self) -> None:
"""
- Testing that an invalid `report_id` returns a 400.
+ Testing that an invalid `report_id` returns a HTTPStatus.BAD_REQUEST.
"""
# `report_id` is negative
@@ -482,7 +517,11 @@ class EventReportDetailTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.BAD_REQUEST,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"])
self.assertEqual(
"The report_id parameter must be a string representing a positive integer.",
@@ -496,7 +535,11 @@ class EventReportDetailTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.BAD_REQUEST,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"])
self.assertEqual(
"The report_id parameter must be a string representing a positive integer.",
@@ -510,16 +553,20 @@ class EventReportDetailTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.BAD_REQUEST,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"])
self.assertEqual(
"The report_id parameter must be a string representing a positive integer.",
channel.json_body["error"],
)
- def test_report_id_not_found(self):
+ def test_report_id_not_found(self) -> None:
"""
- Testing that a not existing `report_id` returns a 404.
+ Testing that a not existing `report_id` returns a HTTPStatus.NOT_FOUND.
"""
channel = self.make_request(
@@ -528,11 +575,15 @@ class EventReportDetailTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(404, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.NOT_FOUND,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(Codes.NOT_FOUND, channel.json_body["errcode"])
self.assertEqual("Event report not found", channel.json_body["error"])
- def _create_event_and_report(self, room_id, user_tok):
+ def _create_event_and_report(self, room_id: str, user_tok: str) -> None:
"""Create and report events"""
resp = self.helper.send(room_id, tok=user_tok)
event_id = resp["event_id"]
@@ -540,12 +591,12 @@ class EventReportDetailTestCase(unittest.HomeserverTestCase):
channel = self.make_request(
"POST",
"rooms/%s/report/%s" % (room_id, event_id),
- json.dumps({"score": -100, "reason": "this makes me sad"}),
+ {"score": -100, "reason": "this makes me sad"},
access_token=user_tok,
)
- self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
- def _check_fields(self, content):
+ def _check_fields(self, content: JsonDict) -> None:
"""Checks that all attributes are present in a event report"""
self.assertIn("id", content)
self.assertIn("received_ts", content)
diff --git a/tests/rest/admin/test_federation.py b/tests/rest/admin/test_federation.py
new file mode 100644
index 0000000000..5188499ef2
--- /dev/null
+++ b/tests/rest/admin/test_federation.py
@@ -0,0 +1,456 @@
+# Copyright 2021 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from http import HTTPStatus
+from typing import List, Optional
+
+from parameterized import parameterized
+
+import synapse.rest.admin
+from synapse.api.errors import Codes
+from synapse.rest.client import login
+from synapse.server import HomeServer
+from synapse.types import JsonDict
+
+from tests import unittest
+
+
+class FederationTestCase(unittest.HomeserverTestCase):
+ servlets = [
+ synapse.rest.admin.register_servlets,
+ login.register_servlets,
+ ]
+
+ def prepare(self, reactor, clock, hs: HomeServer):
+ self.store = hs.get_datastore()
+ self.register_user("admin", "pass", admin=True)
+ self.admin_user_tok = self.login("admin", "pass")
+
+ self.url = "/_synapse/admin/v1/federation/destinations"
+
+ @parameterized.expand(
+ [
+ ("/_synapse/admin/v1/federation/destinations",),
+ ("/_synapse/admin/v1/federation/destinations/dummy",),
+ ]
+ )
+ def test_requester_is_no_admin(self, url: str):
+ """
+ If the user is not a server admin, an error 403 is returned.
+ """
+
+ self.register_user("user", "pass", admin=False)
+ other_user_tok = self.login("user", "pass")
+
+ channel = self.make_request(
+ "GET",
+ url,
+ content={},
+ access_token=other_user_tok,
+ )
+
+ self.assertEqual(HTTPStatus.FORBIDDEN, channel.code, msg=channel.json_body)
+ self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
+
+ def test_invalid_parameter(self):
+ """
+ If parameters are invalid, an error is returned.
+ """
+
+ # negative limit
+ channel = self.make_request(
+ "GET",
+ self.url + "?limit=-5",
+ access_token=self.admin_user_tok,
+ )
+
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
+ self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"])
+
+ # negative from
+ channel = self.make_request(
+ "GET",
+ self.url + "?from=-5",
+ access_token=self.admin_user_tok,
+ )
+
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
+ self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"])
+
+ # unkown order_by
+ channel = self.make_request(
+ "GET",
+ self.url + "?order_by=bar",
+ access_token=self.admin_user_tok,
+ )
+
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
+ self.assertEqual(Codes.UNKNOWN, channel.json_body["errcode"])
+
+ # invalid search order
+ channel = self.make_request(
+ "GET",
+ self.url + "?dir=bar",
+ access_token=self.admin_user_tok,
+ )
+
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
+ self.assertEqual(Codes.UNKNOWN, channel.json_body["errcode"])
+
+ # invalid destination
+ channel = self.make_request(
+ "GET",
+ self.url + "/dummy",
+ access_token=self.admin_user_tok,
+ )
+
+ self.assertEqual(HTTPStatus.NOT_FOUND, channel.code, msg=channel.json_body)
+ self.assertEqual(Codes.NOT_FOUND, channel.json_body["errcode"])
+
+ def test_limit(self):
+ """
+ Testing list of destinations with limit
+ """
+
+ number_destinations = 20
+ self._create_destinations(number_destinations)
+
+ channel = self.make_request(
+ "GET",
+ self.url + "?limit=5",
+ access_token=self.admin_user_tok,
+ )
+
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
+ self.assertEqual(channel.json_body["total"], number_destinations)
+ self.assertEqual(len(channel.json_body["destinations"]), 5)
+ self.assertEqual(channel.json_body["next_token"], "5")
+ self._check_fields(channel.json_body["destinations"])
+
+ def test_from(self):
+ """
+ Testing list of destinations with a defined starting point (from)
+ """
+
+ number_destinations = 20
+ self._create_destinations(number_destinations)
+
+ channel = self.make_request(
+ "GET",
+ self.url + "?from=5",
+ access_token=self.admin_user_tok,
+ )
+
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
+ self.assertEqual(channel.json_body["total"], number_destinations)
+ self.assertEqual(len(channel.json_body["destinations"]), 15)
+ self.assertNotIn("next_token", channel.json_body)
+ self._check_fields(channel.json_body["destinations"])
+
+ def test_limit_and_from(self):
+ """
+ Testing list of destinations with a defined starting point and limit
+ """
+
+ number_destinations = 20
+ self._create_destinations(number_destinations)
+
+ channel = self.make_request(
+ "GET",
+ self.url + "?from=5&limit=10",
+ access_token=self.admin_user_tok,
+ )
+
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
+ self.assertEqual(channel.json_body["total"], number_destinations)
+ self.assertEqual(channel.json_body["next_token"], "15")
+ self.assertEqual(len(channel.json_body["destinations"]), 10)
+ self._check_fields(channel.json_body["destinations"])
+
+ def test_next_token(self):
+ """
+ Testing that `next_token` appears at the right place
+ """
+
+ number_destinations = 20
+ self._create_destinations(number_destinations)
+
+ # `next_token` does not appear
+ # Number of results is the number of entries
+ channel = self.make_request(
+ "GET",
+ self.url + "?limit=20",
+ access_token=self.admin_user_tok,
+ )
+
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
+ self.assertEqual(channel.json_body["total"], number_destinations)
+ self.assertEqual(len(channel.json_body["destinations"]), number_destinations)
+ self.assertNotIn("next_token", channel.json_body)
+
+ # `next_token` does not appear
+ # Number of max results is larger than the number of entries
+ channel = self.make_request(
+ "GET",
+ self.url + "?limit=21",
+ access_token=self.admin_user_tok,
+ )
+
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
+ self.assertEqual(channel.json_body["total"], number_destinations)
+ self.assertEqual(len(channel.json_body["destinations"]), number_destinations)
+ self.assertNotIn("next_token", channel.json_body)
+
+ # `next_token` does appear
+ # Number of max results is smaller than the number of entries
+ channel = self.make_request(
+ "GET",
+ self.url + "?limit=19",
+ access_token=self.admin_user_tok,
+ )
+
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
+ self.assertEqual(channel.json_body["total"], number_destinations)
+ self.assertEqual(len(channel.json_body["destinations"]), 19)
+ self.assertEqual(channel.json_body["next_token"], "19")
+
+ # Check
+ # Set `from` to value of `next_token` for request remaining entries
+ # `next_token` does not appear
+ channel = self.make_request(
+ "GET",
+ self.url + "?from=19",
+ access_token=self.admin_user_tok,
+ )
+
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
+ self.assertEqual(channel.json_body["total"], number_destinations)
+ self.assertEqual(len(channel.json_body["destinations"]), 1)
+ self.assertNotIn("next_token", channel.json_body)
+
+ def test_list_all_destinations(self):
+ """
+ List all destinations.
+ """
+ number_destinations = 5
+ self._create_destinations(number_destinations)
+
+ channel = self.make_request(
+ "GET",
+ self.url,
+ {},
+ access_token=self.admin_user_tok,
+ )
+
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
+ self.assertEqual(number_destinations, len(channel.json_body["destinations"]))
+ self.assertEqual(number_destinations, channel.json_body["total"])
+
+ # Check that all fields are available
+ self._check_fields(channel.json_body["destinations"])
+
+ def test_order_by(self):
+ """
+ Testing order list with parameter `order_by`
+ """
+
+ def _order_test(
+ expected_destination_list: List[str],
+ order_by: Optional[str],
+ dir: Optional[str] = None,
+ ):
+ """Request the list of destinations in a certain order.
+ Assert that order is what we expect
+
+ Args:
+ expected_destination_list: The list of user_id in the order
+ we expect to get back from the server
+ order_by: The type of ordering to give the server
+ dir: The direction of ordering to give the server
+ """
+
+ url = f"{self.url}?"
+ if order_by is not None:
+ url += f"order_by={order_by}&"
+ if dir is not None and dir in ("b", "f"):
+ url += f"dir={dir}"
+ channel = self.make_request(
+ "GET",
+ url,
+ access_token=self.admin_user_tok,
+ )
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
+ self.assertEqual(channel.json_body["total"], len(expected_destination_list))
+
+ returned_order = [
+ row["destination"] for row in channel.json_body["destinations"]
+ ]
+ self.assertEqual(expected_destination_list, returned_order)
+ self._check_fields(channel.json_body["destinations"])
+
+ # create destinations
+ dest = [
+ ("sub-a.example.com", 100, 300, 200, 300),
+ ("sub-b.example.com", 200, 200, 100, 100),
+ ("sub-c.example.com", 300, 100, 300, 200),
+ ]
+ for (
+ destination,
+ failure_ts,
+ retry_last_ts,
+ retry_interval,
+ last_successful_stream_ordering,
+ ) in dest:
+ self.get_success(
+ self.store.set_destination_retry_timings(
+ destination, failure_ts, retry_last_ts, retry_interval
+ )
+ )
+ self.get_success(
+ self.store.set_destination_last_successful_stream_ordering(
+ destination, last_successful_stream_ordering
+ )
+ )
+
+ # order by default (destination)
+ _order_test([dest[0][0], dest[1][0], dest[2][0]], None)
+ _order_test([dest[0][0], dest[1][0], dest[2][0]], None, "f")
+ _order_test([dest[2][0], dest[1][0], dest[0][0]], None, "b")
+
+ # order by destination
+ _order_test([dest[0][0], dest[1][0], dest[2][0]], "destination")
+ _order_test([dest[0][0], dest[1][0], dest[2][0]], "destination", "f")
+ _order_test([dest[2][0], dest[1][0], dest[0][0]], "destination", "b")
+
+ # order by failure_ts
+ _order_test([dest[0][0], dest[1][0], dest[2][0]], "failure_ts")
+ _order_test([dest[0][0], dest[1][0], dest[2][0]], "failure_ts", "f")
+ _order_test([dest[2][0], dest[1][0], dest[0][0]], "failure_ts", "b")
+
+ # order by retry_last_ts
+ _order_test([dest[2][0], dest[1][0], dest[0][0]], "retry_last_ts")
+ _order_test([dest[2][0], dest[1][0], dest[0][0]], "retry_last_ts", "f")
+ _order_test([dest[0][0], dest[1][0], dest[2][0]], "retry_last_ts", "b")
+
+ # order by retry_interval
+ _order_test([dest[1][0], dest[0][0], dest[2][0]], "retry_interval")
+ _order_test([dest[1][0], dest[0][0], dest[2][0]], "retry_interval", "f")
+ _order_test([dest[2][0], dest[0][0], dest[1][0]], "retry_interval", "b")
+
+ # order by last_successful_stream_ordering
+ _order_test(
+ [dest[1][0], dest[2][0], dest[0][0]], "last_successful_stream_ordering"
+ )
+ _order_test(
+ [dest[1][0], dest[2][0], dest[0][0]], "last_successful_stream_ordering", "f"
+ )
+ _order_test(
+ [dest[0][0], dest[2][0], dest[1][0]], "last_successful_stream_ordering", "b"
+ )
+
+ def test_search_term(self):
+ """Test that searching for a destination works correctly"""
+
+ def _search_test(
+ expected_destination: Optional[str],
+ search_term: str,
+ ):
+ """Search for a destination and check that the returned destinationis a match
+
+ Args:
+ expected_destination: The room_id expected to be returned by the API.
+ Set to None to expect zero results for the search
+ search_term: The term to search for room names with
+ """
+ url = f"{self.url}?destination={search_term}"
+ channel = self.make_request(
+ "GET",
+ url.encode("ascii"),
+ access_token=self.admin_user_tok,
+ )
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
+
+ # Check that destinations were returned
+ self.assertTrue("destinations" in channel.json_body)
+ self._check_fields(channel.json_body["destinations"])
+ destinations = channel.json_body["destinations"]
+
+ # Check that the expected number of destinations were returned
+ expected_destination_count = 1 if expected_destination else 0
+ self.assertEqual(len(destinations), expected_destination_count)
+ self.assertEqual(channel.json_body["total"], expected_destination_count)
+
+ if expected_destination:
+ # Check that the first returned destination is correct
+ self.assertEqual(expected_destination, destinations[0]["destination"])
+
+ number_destinations = 3
+ self._create_destinations(number_destinations)
+
+ # Test searching
+ _search_test("sub0.example.com", "0")
+ _search_test("sub0.example.com", "sub0")
+
+ _search_test("sub1.example.com", "1")
+ _search_test("sub1.example.com", "1.")
+
+ # Test case insensitive
+ _search_test("sub0.example.com", "SUB0")
+
+ _search_test(None, "foo")
+ _search_test(None, "bar")
+
+ def test_get_single_destination(self):
+ """
+ Get one specific destinations.
+ """
+ self._create_destinations(5)
+
+ channel = self.make_request(
+ "GET",
+ self.url + "/sub0.example.com",
+ access_token=self.admin_user_tok,
+ )
+
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
+ self.assertEqual("sub0.example.com", channel.json_body["destination"])
+
+ # Check that all fields are available
+ # convert channel.json_body into a List
+ self._check_fields([channel.json_body])
+
+ def _create_destinations(self, number_destinations: int):
+ """Create a number of destinations
+
+ Args:
+ number_destinations: Number of destinations to be created
+ """
+ for i in range(0, number_destinations):
+ dest = f"sub{i}.example.com"
+ self.get_success(self.store.set_destination_retry_timings(dest, 50, 50, 50))
+ self.get_success(
+ self.store.set_destination_last_successful_stream_ordering(dest, 100)
+ )
+
+ def _check_fields(self, content: List[JsonDict]):
+ """Checks that the expected destination attributes are present in content
+
+ Args:
+ content: List that is checked for content
+ """
+ for c in content:
+ self.assertIn("destination", c)
+ self.assertIn("retry_last_ts", c)
+ self.assertIn("retry_interval", c)
+ self.assertIn("failure_ts", c)
+ self.assertIn("last_successful_stream_ordering", c)
diff --git a/tests/rest/admin/test_media.py b/tests/rest/admin/test_media.py
index db0e78c039..81e578fd26 100644
--- a/tests/rest/admin/test_media.py
+++ b/tests/rest/admin/test_media.py
@@ -12,16 +12,19 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
-import json
import os
+from http import HTTPStatus
from parameterized import parameterized
+from twisted.test.proto_helpers import MemoryReactor
+
import synapse.rest.admin
from synapse.api.errors import Codes
from synapse.rest.client import login, profile, room
from synapse.rest.media.v1.filepath import MediaFilePaths
+from synapse.server import HomeServer
+from synapse.util import Clock
from tests import unittest
from tests.server import FakeSite, make_request
@@ -39,7 +42,7 @@ class DeleteMediaByIDTestCase(unittest.HomeserverTestCase):
login.register_servlets,
]
- def prepare(self, reactor, clock, hs):
+ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.media_repo = hs.get_media_repository_resource()
self.server_name = hs.hostname
@@ -48,7 +51,7 @@ class DeleteMediaByIDTestCase(unittest.HomeserverTestCase):
self.filepaths = MediaFilePaths(hs.config.media.media_store_path)
- def test_no_auth(self):
+ def test_no_auth(self) -> None:
"""
Try to delete media without authentication.
"""
@@ -56,10 +59,14 @@ class DeleteMediaByIDTestCase(unittest.HomeserverTestCase):
channel = self.make_request("DELETE", url, b"{}")
- self.assertEqual(401, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.UNAUTHORIZED,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
- def test_requester_is_no_admin(self):
+ def test_requester_is_no_admin(self) -> None:
"""
If the user is not a server admin, an error is returned.
"""
@@ -74,12 +81,16 @@ class DeleteMediaByIDTestCase(unittest.HomeserverTestCase):
access_token=self.other_user_token,
)
- self.assertEqual(403, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.FORBIDDEN,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
- def test_media_does_not_exist(self):
+ def test_media_does_not_exist(self) -> None:
"""
- Tests that a lookup for a media that does not exist returns a 404
+ Tests that a lookup for a media that does not exist returns a HTTPStatus.NOT_FOUND
"""
url = "/_synapse/admin/v1/media/%s/%s" % (self.server_name, "12345")
@@ -89,12 +100,12 @@ class DeleteMediaByIDTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(404, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.NOT_FOUND, channel.code, msg=channel.json_body)
self.assertEqual(Codes.NOT_FOUND, channel.json_body["errcode"])
- def test_media_is_not_local(self):
+ def test_media_is_not_local(self) -> None:
"""
- Tests that a lookup for a media that is not a local returns a 400
+ Tests that a lookup for a media that is not a local returns a HTTPStatus.BAD_REQUEST
"""
url = "/_synapse/admin/v1/media/%s/%s" % ("unknown_domain", "12345")
@@ -104,10 +115,10 @@ class DeleteMediaByIDTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(400, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual("Can only delete local media", channel.json_body["error"])
- def test_delete_media(self):
+ def test_delete_media(self) -> None:
"""
Tests that delete a media is successfully
"""
@@ -117,7 +128,10 @@ class DeleteMediaByIDTestCase(unittest.HomeserverTestCase):
# Upload some media into the room
response = self.helper.upload_media(
- upload_resource, SMALL_PNG, tok=self.admin_user_tok, expect_code=200
+ upload_resource,
+ SMALL_PNG,
+ tok=self.admin_user_tok,
+ expect_code=HTTPStatus.OK,
)
# Extract media ID from the response
server_and_media_id = response["content_uri"][6:] # Cut off 'mxc://'
@@ -137,10 +151,11 @@ class DeleteMediaByIDTestCase(unittest.HomeserverTestCase):
# Should be successful
self.assertEqual(
- 200,
+ HTTPStatus.OK,
channel.code,
msg=(
- "Expected to receive a 200 on accessing media: %s" % server_and_media_id
+ "Expected to receive a HTTPStatus.OK on accessing media: %s"
+ % server_and_media_id
),
)
@@ -157,7 +172,7 @@ class DeleteMediaByIDTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(1, channel.json_body["total"])
self.assertEqual(
media_id,
@@ -174,10 +189,10 @@ class DeleteMediaByIDTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
self.assertEqual(
- 404,
+ HTTPStatus.NOT_FOUND,
channel.code,
msg=(
- "Expected to receive a 404 on accessing deleted media: %s"
+ "Expected to receive a HTTPStatus.NOT_FOUND on accessing deleted media: %s"
% server_and_media_id
),
)
@@ -196,7 +211,7 @@ class DeleteMediaByDateSizeTestCase(unittest.HomeserverTestCase):
room.register_servlets,
]
- def prepare(self, reactor, clock, hs):
+ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.media_repo = hs.get_media_repository_resource()
self.server_name = hs.hostname
@@ -209,17 +224,21 @@ class DeleteMediaByDateSizeTestCase(unittest.HomeserverTestCase):
# Move clock up to somewhat realistic time
self.reactor.advance(1000000000)
- def test_no_auth(self):
+ def test_no_auth(self) -> None:
"""
Try to delete media without authentication.
"""
channel = self.make_request("POST", self.url, b"{}")
- self.assertEqual(401, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.UNAUTHORIZED,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
- def test_requester_is_no_admin(self):
+ def test_requester_is_no_admin(self) -> None:
"""
If the user is not a server admin, an error is returned.
"""
@@ -232,12 +251,16 @@ class DeleteMediaByDateSizeTestCase(unittest.HomeserverTestCase):
access_token=self.other_user_token,
)
- self.assertEqual(403, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.FORBIDDEN,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
- def test_media_is_not_local(self):
+ def test_media_is_not_local(self) -> None:
"""
- Tests that a lookup for media that is not local returns a 400
+ Tests that a lookup for media that is not local returns a HTTPStatus.BAD_REQUEST
"""
url = "/_synapse/admin/v1/media/%s/delete" % "unknown_domain"
@@ -247,10 +270,10 @@ class DeleteMediaByDateSizeTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(400, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual("Can only delete local media", channel.json_body["error"])
- def test_missing_parameter(self):
+ def test_missing_parameter(self) -> None:
"""
If the parameter `before_ts` is missing, an error is returned.
"""
@@ -260,13 +283,17 @@ class DeleteMediaByDateSizeTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.BAD_REQUEST,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(Codes.MISSING_PARAM, channel.json_body["errcode"])
self.assertEqual(
"Missing integer query parameter 'before_ts'", channel.json_body["error"]
)
- def test_invalid_parameter(self):
+ def test_invalid_parameter(self) -> None:
"""
If parameters are invalid, an error is returned.
"""
@@ -276,7 +303,11 @@ class DeleteMediaByDateSizeTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.BAD_REQUEST,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"])
self.assertEqual(
"Query parameter before_ts must be a positive integer.",
@@ -289,7 +320,11 @@ class DeleteMediaByDateSizeTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.BAD_REQUEST,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"])
self.assertEqual(
"Query parameter before_ts you provided is from the year 1970. "
@@ -303,7 +338,11 @@ class DeleteMediaByDateSizeTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.BAD_REQUEST,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"])
self.assertEqual(
"Query parameter size_gt must be a string representing a positive integer.",
@@ -316,14 +355,18 @@ class DeleteMediaByDateSizeTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.BAD_REQUEST,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(Codes.UNKNOWN, channel.json_body["errcode"])
self.assertEqual(
"Boolean query parameter 'keep_profiles' must be one of ['true', 'false']",
channel.json_body["error"],
)
- def test_delete_media_never_accessed(self):
+ def test_delete_media_never_accessed(self) -> None:
"""
Tests that media deleted if it is older than `before_ts` and never accessed
`last_access_ts` is `NULL` and `created_ts` < `before_ts`
@@ -345,7 +388,7 @@ class DeleteMediaByDateSizeTestCase(unittest.HomeserverTestCase):
self.url + "?before_ts=" + str(now_ms),
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(1, channel.json_body["total"])
self.assertEqual(
media_id,
@@ -354,7 +397,7 @@ class DeleteMediaByDateSizeTestCase(unittest.HomeserverTestCase):
self._access_media(server_and_media_id, False)
- def test_keep_media_by_date(self):
+ def test_keep_media_by_date(self) -> None:
"""
Tests that media is not deleted if it is newer than `before_ts`
"""
@@ -370,7 +413,7 @@ class DeleteMediaByDateSizeTestCase(unittest.HomeserverTestCase):
self.url + "?before_ts=" + str(now_ms),
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(0, channel.json_body["total"])
self._access_media(server_and_media_id)
@@ -382,7 +425,7 @@ class DeleteMediaByDateSizeTestCase(unittest.HomeserverTestCase):
self.url + "?before_ts=" + str(now_ms),
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(1, channel.json_body["total"])
self.assertEqual(
server_and_media_id.split("/")[1],
@@ -391,7 +434,7 @@ class DeleteMediaByDateSizeTestCase(unittest.HomeserverTestCase):
self._access_media(server_and_media_id, False)
- def test_keep_media_by_size(self):
+ def test_keep_media_by_size(self) -> None:
"""
Tests that media is not deleted if its size is smaller than or equal
to `size_gt`
@@ -406,7 +449,7 @@ class DeleteMediaByDateSizeTestCase(unittest.HomeserverTestCase):
self.url + "?before_ts=" + str(now_ms) + "&size_gt=67",
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(0, channel.json_body["total"])
self._access_media(server_and_media_id)
@@ -417,7 +460,7 @@ class DeleteMediaByDateSizeTestCase(unittest.HomeserverTestCase):
self.url + "?before_ts=" + str(now_ms) + "&size_gt=66",
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(1, channel.json_body["total"])
self.assertEqual(
server_and_media_id.split("/")[1],
@@ -426,7 +469,7 @@ class DeleteMediaByDateSizeTestCase(unittest.HomeserverTestCase):
self._access_media(server_and_media_id, False)
- def test_keep_media_by_user_avatar(self):
+ def test_keep_media_by_user_avatar(self) -> None:
"""
Tests that we do not delete media if is used as a user avatar
Tests parameter `keep_profiles`
@@ -439,10 +482,10 @@ class DeleteMediaByDateSizeTestCase(unittest.HomeserverTestCase):
channel = self.make_request(
"PUT",
"/profile/%s/avatar_url" % (self.admin_user,),
- content=json.dumps({"avatar_url": "mxc://%s" % (server_and_media_id,)}),
+ content={"avatar_url": "mxc://%s" % (server_and_media_id,)},
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
now_ms = self.clock.time_msec()
channel = self.make_request(
@@ -450,7 +493,7 @@ class DeleteMediaByDateSizeTestCase(unittest.HomeserverTestCase):
self.url + "?before_ts=" + str(now_ms) + "&keep_profiles=true",
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(0, channel.json_body["total"])
self._access_media(server_and_media_id)
@@ -461,7 +504,7 @@ class DeleteMediaByDateSizeTestCase(unittest.HomeserverTestCase):
self.url + "?before_ts=" + str(now_ms) + "&keep_profiles=false",
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(1, channel.json_body["total"])
self.assertEqual(
server_and_media_id.split("/")[1],
@@ -470,7 +513,7 @@ class DeleteMediaByDateSizeTestCase(unittest.HomeserverTestCase):
self._access_media(server_and_media_id, False)
- def test_keep_media_by_room_avatar(self):
+ def test_keep_media_by_room_avatar(self) -> None:
"""
Tests that we do not delete media if it is used as a room avatar
Tests parameter `keep_profiles`
@@ -484,10 +527,10 @@ class DeleteMediaByDateSizeTestCase(unittest.HomeserverTestCase):
channel = self.make_request(
"PUT",
"/rooms/%s/state/m.room.avatar" % (room_id,),
- content=json.dumps({"url": "mxc://%s" % (server_and_media_id,)}),
+ content={"url": "mxc://%s" % (server_and_media_id,)},
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
now_ms = self.clock.time_msec()
channel = self.make_request(
@@ -495,7 +538,7 @@ class DeleteMediaByDateSizeTestCase(unittest.HomeserverTestCase):
self.url + "?before_ts=" + str(now_ms) + "&keep_profiles=true",
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(0, channel.json_body["total"])
self._access_media(server_and_media_id)
@@ -506,7 +549,7 @@ class DeleteMediaByDateSizeTestCase(unittest.HomeserverTestCase):
self.url + "?before_ts=" + str(now_ms) + "&keep_profiles=false",
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(1, channel.json_body["total"])
self.assertEqual(
server_and_media_id.split("/")[1],
@@ -515,7 +558,7 @@ class DeleteMediaByDateSizeTestCase(unittest.HomeserverTestCase):
self._access_media(server_and_media_id, False)
- def _create_media(self):
+ def _create_media(self) -> str:
"""
Create a media and return media_id and server_and_media_id
"""
@@ -523,7 +566,10 @@ class DeleteMediaByDateSizeTestCase(unittest.HomeserverTestCase):
# Upload some media into the room
response = self.helper.upload_media(
- upload_resource, SMALL_PNG, tok=self.admin_user_tok, expect_code=200
+ upload_resource,
+ SMALL_PNG,
+ tok=self.admin_user_tok,
+ expect_code=HTTPStatus.OK,
)
# Extract media ID from the response
server_and_media_id = response["content_uri"][6:] # Cut off 'mxc://'
@@ -534,7 +580,7 @@ class DeleteMediaByDateSizeTestCase(unittest.HomeserverTestCase):
return server_and_media_id
- def _access_media(self, server_and_media_id, expect_success=True):
+ def _access_media(self, server_and_media_id, expect_success=True) -> None:
"""
Try to access a media and check the result
"""
@@ -554,10 +600,10 @@ class DeleteMediaByDateSizeTestCase(unittest.HomeserverTestCase):
if expect_success:
self.assertEqual(
- 200,
+ HTTPStatus.OK,
channel.code,
msg=(
- "Expected to receive a 200 on accessing media: %s"
+ "Expected to receive a HTTPStatus.OK on accessing media: %s"
% server_and_media_id
),
)
@@ -565,10 +611,10 @@ class DeleteMediaByDateSizeTestCase(unittest.HomeserverTestCase):
self.assertTrue(os.path.exists(local_path))
else:
self.assertEqual(
- 404,
+ HTTPStatus.NOT_FOUND,
channel.code,
msg=(
- "Expected to receive a 404 on accessing deleted media: %s"
+ "Expected to receive a HTTPStatus.NOT_FOUND on accessing deleted media: %s"
% (server_and_media_id)
),
)
@@ -584,7 +630,7 @@ class QuarantineMediaByIDTestCase(unittest.HomeserverTestCase):
login.register_servlets,
]
- def prepare(self, reactor, clock, hs):
+ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
media_repo = hs.get_media_repository_resource()
self.store = hs.get_datastore()
self.server_name = hs.hostname
@@ -597,7 +643,10 @@ class QuarantineMediaByIDTestCase(unittest.HomeserverTestCase):
# Upload some media into the room
response = self.helper.upload_media(
- upload_resource, SMALL_PNG, tok=self.admin_user_tok, expect_code=200
+ upload_resource,
+ SMALL_PNG,
+ tok=self.admin_user_tok,
+ expect_code=HTTPStatus.OK,
)
# Extract media ID from the response
server_and_media_id = response["content_uri"][6:] # Cut off 'mxc://'
@@ -606,7 +655,7 @@ class QuarantineMediaByIDTestCase(unittest.HomeserverTestCase):
self.url = "/_synapse/admin/v1/media/%s/%s/%s"
@parameterized.expand(["quarantine", "unquarantine"])
- def test_no_auth(self, action: str):
+ def test_no_auth(self, action: str) -> None:
"""
Try to protect media without authentication.
"""
@@ -617,11 +666,15 @@ class QuarantineMediaByIDTestCase(unittest.HomeserverTestCase):
b"{}",
)
- self.assertEqual(401, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.UNAUTHORIZED,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
@parameterized.expand(["quarantine", "unquarantine"])
- def test_requester_is_no_admin(self, action: str):
+ def test_requester_is_no_admin(self, action: str) -> None:
"""
If the user is not a server admin, an error is returned.
"""
@@ -634,10 +687,14 @@ class QuarantineMediaByIDTestCase(unittest.HomeserverTestCase):
access_token=self.other_user_token,
)
- self.assertEqual(403, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.FORBIDDEN,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
- def test_quarantine_media(self):
+ def test_quarantine_media(self) -> None:
"""
Tests that quarantining and remove from quarantine a media is successfully
"""
@@ -652,7 +709,7 @@ class QuarantineMediaByIDTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertFalse(channel.json_body)
media_info = self.get_success(self.store.get_local_media(self.media_id))
@@ -665,13 +722,13 @@ class QuarantineMediaByIDTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertFalse(channel.json_body)
media_info = self.get_success(self.store.get_local_media(self.media_id))
self.assertFalse(media_info["quarantined_by"])
- def test_quarantine_protected_media(self):
+ def test_quarantine_protected_media(self) -> None:
"""
Tests that quarantining from protected media fails
"""
@@ -690,7 +747,7 @@ class QuarantineMediaByIDTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertFalse(channel.json_body)
# verify that is not in quarantine
@@ -706,7 +763,7 @@ class ProtectMediaByIDTestCase(unittest.HomeserverTestCase):
login.register_servlets,
]
- def prepare(self, reactor, clock, hs):
+ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
media_repo = hs.get_media_repository_resource()
self.store = hs.get_datastore()
@@ -718,7 +775,10 @@ class ProtectMediaByIDTestCase(unittest.HomeserverTestCase):
# Upload some media into the room
response = self.helper.upload_media(
- upload_resource, SMALL_PNG, tok=self.admin_user_tok, expect_code=200
+ upload_resource,
+ SMALL_PNG,
+ tok=self.admin_user_tok,
+ expect_code=HTTPStatus.OK,
)
# Extract media ID from the response
server_and_media_id = response["content_uri"][6:] # Cut off 'mxc://'
@@ -727,18 +787,22 @@ class ProtectMediaByIDTestCase(unittest.HomeserverTestCase):
self.url = "/_synapse/admin/v1/media/%s/%s"
@parameterized.expand(["protect", "unprotect"])
- def test_no_auth(self, action: str):
+ def test_no_auth(self, action: str) -> None:
"""
Try to protect media without authentication.
"""
channel = self.make_request("POST", self.url % (action, self.media_id), b"{}")
- self.assertEqual(401, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.UNAUTHORIZED,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
@parameterized.expand(["protect", "unprotect"])
- def test_requester_is_no_admin(self, action: str):
+ def test_requester_is_no_admin(self, action: str) -> None:
"""
If the user is not a server admin, an error is returned.
"""
@@ -751,10 +815,14 @@ class ProtectMediaByIDTestCase(unittest.HomeserverTestCase):
access_token=self.other_user_token,
)
- self.assertEqual(403, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.FORBIDDEN,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
- def test_protect_media(self):
+ def test_protect_media(self) -> None:
"""
Tests that protect and unprotect a media is successfully
"""
@@ -769,7 +837,7 @@ class ProtectMediaByIDTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertFalse(channel.json_body)
media_info = self.get_success(self.store.get_local_media(self.media_id))
@@ -782,7 +850,7 @@ class ProtectMediaByIDTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertFalse(channel.json_body)
media_info = self.get_success(self.store.get_local_media(self.media_id))
@@ -799,7 +867,7 @@ class PurgeMediaCacheTestCase(unittest.HomeserverTestCase):
room.register_servlets,
]
- def prepare(self, reactor, clock, hs):
+ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.media_repo = hs.get_media_repository_resource()
self.server_name = hs.hostname
@@ -809,17 +877,21 @@ class PurgeMediaCacheTestCase(unittest.HomeserverTestCase):
self.filepaths = MediaFilePaths(hs.config.media.media_store_path)
self.url = "/_synapse/admin/v1/purge_media_cache"
- def test_no_auth(self):
+ def test_no_auth(self) -> None:
"""
Try to delete media without authentication.
"""
channel = self.make_request("POST", self.url, b"{}")
- self.assertEqual(401, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.UNAUTHORIZED,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
- def test_requester_is_not_admin(self):
+ def test_requester_is_not_admin(self) -> None:
"""
If the user is not a server admin, an error is returned.
"""
@@ -832,10 +904,14 @@ class PurgeMediaCacheTestCase(unittest.HomeserverTestCase):
access_token=self.other_user_token,
)
- self.assertEqual(403, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.FORBIDDEN,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
- def test_invalid_parameter(self):
+ def test_invalid_parameter(self) -> None:
"""
If parameters are invalid, an error is returned.
"""
@@ -845,7 +921,11 @@ class PurgeMediaCacheTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.BAD_REQUEST,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"])
self.assertEqual(
"Query parameter before_ts must be a positive integer.",
@@ -858,7 +938,11 @@ class PurgeMediaCacheTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.BAD_REQUEST,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"])
self.assertEqual(
"Query parameter before_ts you provided is from the year 1970. "
diff --git a/tests/rest/admin/test_registration_tokens.py b/tests/rest/admin/test_registration_tokens.py
index 9bac423ae0..350a62dda6 100644
--- a/tests/rest/admin/test_registration_tokens.py
+++ b/tests/rest/admin/test_registration_tokens.py
@@ -11,13 +11,17 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
import random
import string
+from http import HTTPStatus
+
+from twisted.test.proto_helpers import MemoryReactor
import synapse.rest.admin
from synapse.api.errors import Codes
from synapse.rest.client import login
+from synapse.server import HomeServer
+from synapse.util import Clock
from tests import unittest
@@ -28,7 +32,7 @@ class ManageRegistrationTokensTestCase(unittest.HomeserverTestCase):
login.register_servlets,
]
- def prepare(self, reactor, clock, hs):
+ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastore()
self.admin_user = self.register_user("admin", "pass", admin=True)
self.admin_user_tok = self.login("admin", "pass")
@@ -38,7 +42,7 @@ class ManageRegistrationTokensTestCase(unittest.HomeserverTestCase):
self.url = "/_synapse/admin/v1/registration_tokens"
- def _new_token(self, **kwargs):
+ def _new_token(self, **kwargs) -> str:
"""Helper function to create a token."""
token = kwargs.get(
"token",
@@ -60,13 +64,17 @@ class ManageRegistrationTokensTestCase(unittest.HomeserverTestCase):
# CREATION
- def test_create_no_auth(self):
+ def test_create_no_auth(self) -> None:
"""Try to create a token without authentication."""
channel = self.make_request("POST", self.url + "/new", {})
- self.assertEqual(401, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.UNAUTHORIZED,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
- def test_create_requester_not_admin(self):
+ def test_create_requester_not_admin(self) -> None:
"""Try to create a token while not an admin."""
channel = self.make_request(
"POST",
@@ -74,10 +82,14 @@ class ManageRegistrationTokensTestCase(unittest.HomeserverTestCase):
{},
access_token=self.other_user_tok,
)
- self.assertEqual(403, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.FORBIDDEN,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
- def test_create_using_defaults(self):
+ def test_create_using_defaults(self) -> None:
"""Create a token using all the defaults."""
channel = self.make_request(
"POST",
@@ -86,14 +98,14 @@ class ManageRegistrationTokensTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(len(channel.json_body["token"]), 16)
self.assertIsNone(channel.json_body["uses_allowed"])
self.assertIsNone(channel.json_body["expiry_time"])
self.assertEqual(channel.json_body["pending"], 0)
self.assertEqual(channel.json_body["completed"], 0)
- def test_create_specifying_fields(self):
+ def test_create_specifying_fields(self) -> None:
"""Create a token specifying the value of all fields."""
# As many of the allowed characters as possible with length <= 64
token = "adefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789._~-"
@@ -110,14 +122,14 @@ class ManageRegistrationTokensTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["token"], token)
self.assertEqual(channel.json_body["uses_allowed"], 1)
self.assertEqual(channel.json_body["expiry_time"], data["expiry_time"])
self.assertEqual(channel.json_body["pending"], 0)
self.assertEqual(channel.json_body["completed"], 0)
- def test_create_with_null_value(self):
+ def test_create_with_null_value(self) -> None:
"""Create a token specifying unlimited uses and no expiry."""
data = {
"uses_allowed": None,
@@ -131,14 +143,14 @@ class ManageRegistrationTokensTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(len(channel.json_body["token"]), 16)
self.assertIsNone(channel.json_body["uses_allowed"])
self.assertIsNone(channel.json_body["expiry_time"])
self.assertEqual(channel.json_body["pending"], 0)
self.assertEqual(channel.json_body["completed"], 0)
- def test_create_token_too_long(self):
+ def test_create_token_too_long(self) -> None:
"""Check token longer than 64 chars is invalid."""
data = {"token": "a" * 65}
@@ -149,10 +161,14 @@ class ManageRegistrationTokensTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.BAD_REQUEST,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(channel.json_body["errcode"], Codes.INVALID_PARAM)
- def test_create_token_invalid_chars(self):
+ def test_create_token_invalid_chars(self) -> None:
"""Check you can't create token with invalid characters."""
data = {
"token": "abc/def",
@@ -165,10 +181,14 @@ class ManageRegistrationTokensTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.BAD_REQUEST,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(channel.json_body["errcode"], Codes.INVALID_PARAM)
- def test_create_token_already_exists(self):
+ def test_create_token_already_exists(self) -> None:
"""Check you can't create token that already exists."""
data = {
"token": "abcd",
@@ -180,7 +200,7 @@ class ManageRegistrationTokensTestCase(unittest.HomeserverTestCase):
data,
access_token=self.admin_user_tok,
)
- self.assertEqual(200, int(channel1.result["code"]), msg=channel1.result["body"])
+ self.assertEqual(HTTPStatus.OK, channel1.code, msg=channel1.json_body)
channel2 = self.make_request(
"POST",
@@ -188,10 +208,10 @@ class ManageRegistrationTokensTestCase(unittest.HomeserverTestCase):
data,
access_token=self.admin_user_tok,
)
- self.assertEqual(400, int(channel2.result["code"]), msg=channel2.result["body"])
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel2.code, msg=channel2.json_body)
self.assertEqual(channel2.json_body["errcode"], Codes.INVALID_PARAM)
- def test_create_unable_to_generate_token(self):
+ def test_create_unable_to_generate_token(self) -> None:
"""Check right error is raised when server can't generate unique token."""
# Create all possible single character tokens
tokens = []
@@ -220,9 +240,9 @@ class ManageRegistrationTokensTestCase(unittest.HomeserverTestCase):
{"length": 1},
access_token=self.admin_user_tok,
)
- self.assertEqual(500, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(500, channel.code, msg=channel.json_body)
- def test_create_uses_allowed(self):
+ def test_create_uses_allowed(self) -> None:
"""Check you can only create a token with good values for uses_allowed."""
# Should work with 0 (token is invalid from the start)
channel = self.make_request(
@@ -231,7 +251,7 @@ class ManageRegistrationTokensTestCase(unittest.HomeserverTestCase):
{"uses_allowed": 0},
access_token=self.admin_user_tok,
)
- self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["uses_allowed"], 0)
# Should fail with negative integer
@@ -241,7 +261,11 @@ class ManageRegistrationTokensTestCase(unittest.HomeserverTestCase):
{"uses_allowed": -5},
access_token=self.admin_user_tok,
)
- self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.BAD_REQUEST,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(channel.json_body["errcode"], Codes.INVALID_PARAM)
# Should fail with float
@@ -251,10 +275,14 @@ class ManageRegistrationTokensTestCase(unittest.HomeserverTestCase):
{"uses_allowed": 1.5},
access_token=self.admin_user_tok,
)
- self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.BAD_REQUEST,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(channel.json_body["errcode"], Codes.INVALID_PARAM)
- def test_create_expiry_time(self):
+ def test_create_expiry_time(self) -> None:
"""Check you can't create a token with an invalid expiry_time."""
# Should fail with a time in the past
channel = self.make_request(
@@ -263,7 +291,11 @@ class ManageRegistrationTokensTestCase(unittest.HomeserverTestCase):
{"expiry_time": self.clock.time_msec() - 10000},
access_token=self.admin_user_tok,
)
- self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.BAD_REQUEST,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(channel.json_body["errcode"], Codes.INVALID_PARAM)
# Should fail with float
@@ -273,10 +305,14 @@ class ManageRegistrationTokensTestCase(unittest.HomeserverTestCase):
{"expiry_time": self.clock.time_msec() + 1000000.5},
access_token=self.admin_user_tok,
)
- self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.BAD_REQUEST,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(channel.json_body["errcode"], Codes.INVALID_PARAM)
- def test_create_length(self):
+ def test_create_length(self) -> None:
"""Check you can only generate a token with a valid length."""
# Should work with 64
channel = self.make_request(
@@ -285,7 +321,7 @@ class ManageRegistrationTokensTestCase(unittest.HomeserverTestCase):
{"length": 64},
access_token=self.admin_user_tok,
)
- self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(len(channel.json_body["token"]), 64)
# Should fail with 0
@@ -295,7 +331,11 @@ class ManageRegistrationTokensTestCase(unittest.HomeserverTestCase):
{"length": 0},
access_token=self.admin_user_tok,
)
- self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.BAD_REQUEST,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(channel.json_body["errcode"], Codes.INVALID_PARAM)
# Should fail with a negative integer
@@ -305,7 +345,11 @@ class ManageRegistrationTokensTestCase(unittest.HomeserverTestCase):
{"length": -5},
access_token=self.admin_user_tok,
)
- self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.BAD_REQUEST,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(channel.json_body["errcode"], Codes.INVALID_PARAM)
# Should fail with a float
@@ -315,7 +359,11 @@ class ManageRegistrationTokensTestCase(unittest.HomeserverTestCase):
{"length": 8.5},
access_token=self.admin_user_tok,
)
- self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.BAD_REQUEST,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(channel.json_body["errcode"], Codes.INVALID_PARAM)
# Should fail with 65
@@ -325,22 +373,30 @@ class ManageRegistrationTokensTestCase(unittest.HomeserverTestCase):
{"length": 65},
access_token=self.admin_user_tok,
)
- self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.BAD_REQUEST,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(channel.json_body["errcode"], Codes.INVALID_PARAM)
# UPDATING
- def test_update_no_auth(self):
+ def test_update_no_auth(self) -> None:
"""Try to update a token without authentication."""
channel = self.make_request(
"PUT",
self.url + "/1234", # Token doesn't exist but that doesn't matter
{},
)
- self.assertEqual(401, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.UNAUTHORIZED,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
- def test_update_requester_not_admin(self):
+ def test_update_requester_not_admin(self) -> None:
"""Try to update a token while not an admin."""
channel = self.make_request(
"PUT",
@@ -348,10 +404,14 @@ class ManageRegistrationTokensTestCase(unittest.HomeserverTestCase):
{},
access_token=self.other_user_tok,
)
- self.assertEqual(403, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.FORBIDDEN,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
- def test_update_non_existent(self):
+ def test_update_non_existent(self) -> None:
"""Try to update a token that doesn't exist."""
channel = self.make_request(
"PUT",
@@ -360,10 +420,14 @@ class ManageRegistrationTokensTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(404, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.NOT_FOUND,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(channel.json_body["errcode"], Codes.NOT_FOUND)
- def test_update_uses_allowed(self):
+ def test_update_uses_allowed(self) -> None:
"""Test updating just uses_allowed."""
# Create new token using default values
token = self._new_token()
@@ -375,7 +439,7 @@ class ManageRegistrationTokensTestCase(unittest.HomeserverTestCase):
{"uses_allowed": 1},
access_token=self.admin_user_tok,
)
- self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["uses_allowed"], 1)
self.assertIsNone(channel.json_body["expiry_time"])
@@ -386,7 +450,7 @@ class ManageRegistrationTokensTestCase(unittest.HomeserverTestCase):
{"uses_allowed": 0},
access_token=self.admin_user_tok,
)
- self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["uses_allowed"], 0)
self.assertIsNone(channel.json_body["expiry_time"])
@@ -397,7 +461,7 @@ class ManageRegistrationTokensTestCase(unittest.HomeserverTestCase):
{"uses_allowed": None},
access_token=self.admin_user_tok,
)
- self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertIsNone(channel.json_body["uses_allowed"])
self.assertIsNone(channel.json_body["expiry_time"])
@@ -408,7 +472,11 @@ class ManageRegistrationTokensTestCase(unittest.HomeserverTestCase):
{"uses_allowed": 1.5},
access_token=self.admin_user_tok,
)
- self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.BAD_REQUEST,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(channel.json_body["errcode"], Codes.INVALID_PARAM)
# Should fail with a negative integer
@@ -418,10 +486,14 @@ class ManageRegistrationTokensTestCase(unittest.HomeserverTestCase):
{"uses_allowed": -5},
access_token=self.admin_user_tok,
)
- self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.BAD_REQUEST,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(channel.json_body["errcode"], Codes.INVALID_PARAM)
- def test_update_expiry_time(self):
+ def test_update_expiry_time(self) -> None:
"""Test updating just expiry_time."""
# Create new token using default values
token = self._new_token()
@@ -434,7 +506,7 @@ class ManageRegistrationTokensTestCase(unittest.HomeserverTestCase):
{"expiry_time": new_expiry_time},
access_token=self.admin_user_tok,
)
- self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["expiry_time"], new_expiry_time)
self.assertIsNone(channel.json_body["uses_allowed"])
@@ -445,7 +517,7 @@ class ManageRegistrationTokensTestCase(unittest.HomeserverTestCase):
{"expiry_time": None},
access_token=self.admin_user_tok,
)
- self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertIsNone(channel.json_body["expiry_time"])
self.assertIsNone(channel.json_body["uses_allowed"])
@@ -457,7 +529,11 @@ class ManageRegistrationTokensTestCase(unittest.HomeserverTestCase):
{"expiry_time": past_time},
access_token=self.admin_user_tok,
)
- self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.BAD_REQUEST,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(channel.json_body["errcode"], Codes.INVALID_PARAM)
# Should fail a float
@@ -467,10 +543,14 @@ class ManageRegistrationTokensTestCase(unittest.HomeserverTestCase):
{"expiry_time": new_expiry_time + 0.5},
access_token=self.admin_user_tok,
)
- self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.BAD_REQUEST,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(channel.json_body["errcode"], Codes.INVALID_PARAM)
- def test_update_both(self):
+ def test_update_both(self) -> None:
"""Test updating both uses_allowed and expiry_time."""
# Create new token using default values
token = self._new_token()
@@ -488,11 +568,11 @@ class ManageRegistrationTokensTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["uses_allowed"], 1)
self.assertEqual(channel.json_body["expiry_time"], new_expiry_time)
- def test_update_invalid_type(self):
+ def test_update_invalid_type(self) -> None:
"""Test using invalid types doesn't work."""
# Create new token using default values
token = self._new_token()
@@ -509,22 +589,30 @@ class ManageRegistrationTokensTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.BAD_REQUEST,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(channel.json_body["errcode"], Codes.INVALID_PARAM)
# DELETING
- def test_delete_no_auth(self):
+ def test_delete_no_auth(self) -> None:
"""Try to delete a token without authentication."""
channel = self.make_request(
"DELETE",
self.url + "/1234", # Token doesn't exist but that doesn't matter
{},
)
- self.assertEqual(401, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.UNAUTHORIZED,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
- def test_delete_requester_not_admin(self):
+ def test_delete_requester_not_admin(self) -> None:
"""Try to delete a token while not an admin."""
channel = self.make_request(
"DELETE",
@@ -532,10 +620,14 @@ class ManageRegistrationTokensTestCase(unittest.HomeserverTestCase):
{},
access_token=self.other_user_tok,
)
- self.assertEqual(403, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.FORBIDDEN,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
- def test_delete_non_existent(self):
+ def test_delete_non_existent(self) -> None:
"""Try to delete a token that doesn't exist."""
channel = self.make_request(
"DELETE",
@@ -544,10 +636,14 @@ class ManageRegistrationTokensTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(404, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.NOT_FOUND,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(channel.json_body["errcode"], Codes.NOT_FOUND)
- def test_delete(self):
+ def test_delete(self) -> None:
"""Test deleting a token."""
# Create new token using default values
token = self._new_token()
@@ -559,21 +655,25 @@ class ManageRegistrationTokensTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
# GETTING ONE
- def test_get_no_auth(self):
+ def test_get_no_auth(self) -> None:
"""Try to get a token without authentication."""
channel = self.make_request(
"GET",
self.url + "/1234", # Token doesn't exist but that doesn't matter
{},
)
- self.assertEqual(401, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.UNAUTHORIZED,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
- def test_get_requester_not_admin(self):
+ def test_get_requester_not_admin(self) -> None:
"""Try to get a token while not an admin."""
channel = self.make_request(
"GET",
@@ -581,10 +681,14 @@ class ManageRegistrationTokensTestCase(unittest.HomeserverTestCase):
{},
access_token=self.other_user_tok,
)
- self.assertEqual(403, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.FORBIDDEN,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
- def test_get_non_existent(self):
+ def test_get_non_existent(self) -> None:
"""Try to get a token that doesn't exist."""
channel = self.make_request(
"GET",
@@ -593,10 +697,14 @@ class ManageRegistrationTokensTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(404, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.NOT_FOUND,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(channel.json_body["errcode"], Codes.NOT_FOUND)
- def test_get(self):
+ def test_get(self) -> None:
"""Test getting a token."""
# Create new token using default values
token = self._new_token()
@@ -608,7 +716,7 @@ class ManageRegistrationTokensTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["token"], token)
self.assertIsNone(channel.json_body["uses_allowed"])
self.assertIsNone(channel.json_body["expiry_time"])
@@ -617,13 +725,17 @@ class ManageRegistrationTokensTestCase(unittest.HomeserverTestCase):
# LISTING
- def test_list_no_auth(self):
+ def test_list_no_auth(self) -> None:
"""Try to list tokens without authentication."""
channel = self.make_request("GET", self.url, {})
- self.assertEqual(401, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.UNAUTHORIZED,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
- def test_list_requester_not_admin(self):
+ def test_list_requester_not_admin(self) -> None:
"""Try to list tokens while not an admin."""
channel = self.make_request(
"GET",
@@ -631,10 +743,14 @@ class ManageRegistrationTokensTestCase(unittest.HomeserverTestCase):
{},
access_token=self.other_user_tok,
)
- self.assertEqual(403, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.FORBIDDEN,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
- def test_list_all(self):
+ def test_list_all(self) -> None:
"""Test listing all tokens."""
# Create new token using default values
token = self._new_token()
@@ -646,7 +762,7 @@ class ManageRegistrationTokensTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(len(channel.json_body["registration_tokens"]), 1)
token_info = channel.json_body["registration_tokens"][0]
self.assertEqual(token_info["token"], token)
@@ -655,7 +771,7 @@ class ManageRegistrationTokensTestCase(unittest.HomeserverTestCase):
self.assertEqual(token_info["pending"], 0)
self.assertEqual(token_info["completed"], 0)
- def test_list_invalid_query_parameter(self):
+ def test_list_invalid_query_parameter(self) -> None:
"""Test with `valid` query parameter not `true` or `false`."""
channel = self.make_request(
"GET",
@@ -664,9 +780,13 @@ class ManageRegistrationTokensTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.BAD_REQUEST,
+ channel.code,
+ msg=channel.json_body,
+ )
- def _test_list_query_parameter(self, valid: str):
+ def _test_list_query_parameter(self, valid: str) -> None:
"""Helper used to test both valid=true and valid=false."""
# Create 2 valid and 2 invalid tokens.
now = self.hs.get_clock().time_msec()
@@ -696,17 +816,17 @@ class ManageRegistrationTokensTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(len(channel.json_body["registration_tokens"]), 2)
token_info_1 = channel.json_body["registration_tokens"][0]
token_info_2 = channel.json_body["registration_tokens"][1]
self.assertIn(token_info_1["token"], tokens)
self.assertIn(token_info_2["token"], tokens)
- def test_list_valid(self):
+ def test_list_valid(self) -> None:
"""Test listing just valid tokens."""
self._test_list_query_parameter(valid="true")
- def test_list_invalid(self):
+ def test_list_invalid(self) -> None:
"""Test listing just invalid tokens."""
self._test_list_query_parameter(valid="false")
diff --git a/tests/rest/admin/test_room.py b/tests/rest/admin/test_room.py
index 07077aff78..22f9aa6234 100644
--- a/tests/rest/admin/test_room.py
+++ b/tests/rest/admin/test_room.py
@@ -11,8 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
-import json
import urllib.parse
from http import HTTPStatus
from typing import List, Optional
@@ -20,11 +18,15 @@ from unittest.mock import Mock
from parameterized import parameterized
+from twisted.test.proto_helpers import MemoryReactor
+
import synapse.rest.admin
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import Codes
from synapse.handlers.pagination import PaginationHandler
from synapse.rest.client import directory, events, login, room
+from synapse.server import HomeServer
+from synapse.util import Clock
from tests import unittest
@@ -40,7 +42,7 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase):
room.register_deprecated_servlets,
]
- def prepare(self, reactor, clock, hs):
+ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.event_creation_handler = hs.get_event_creation_handler()
hs.config.consent.user_consent_version = "1"
@@ -66,7 +68,7 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase):
def test_requester_is_no_admin(self):
"""
- If the user is not a server admin, an error 403 is returned.
+ If the user is not a server admin, an error HTTPStatus.FORBIDDEN is returned.
"""
channel = self.make_request(
@@ -76,12 +78,12 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase):
access_token=self.other_user_tok,
)
- self.assertEqual(403, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.FORBIDDEN, channel.code, msg=channel.json_body)
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
def test_room_does_not_exist(self):
"""
- Check that unknown rooms/server return error 404.
+ Check that unknown rooms/server return 200
"""
url = "/_synapse/admin/v1/rooms/%s" % "!unknown:test"
@@ -92,12 +94,11 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(404, channel.code, msg=channel.json_body)
- self.assertEqual(Codes.NOT_FOUND, channel.json_body["errcode"])
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
def test_room_is_not_valid(self):
"""
- Check that invalid room names, return an error 400.
+ Check that invalid room names, return an error HTTPStatus.BAD_REQUEST.
"""
url = "/_synapse/admin/v1/rooms/%s" % "invalidroom"
@@ -108,7 +109,7 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(400, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(
"invalidroom is not a legal room ID",
channel.json_body["error"],
@@ -118,16 +119,15 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase):
"""
Tests that the user ID must be from local server but it does not have to exist.
"""
- body = json.dumps({"new_room_user_id": "@unknown:test"})
channel = self.make_request(
"DELETE",
self.url,
- content=body,
+ content={"new_room_user_id": "@unknown:test"},
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertIn("new_room_id", channel.json_body)
self.assertIn("kicked_users", channel.json_body)
self.assertIn("failed_to_kick_users", channel.json_body)
@@ -137,16 +137,15 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase):
"""
Check that only local users can create new room to move members.
"""
- body = json.dumps({"new_room_user_id": "@not:exist.bla"})
channel = self.make_request(
"DELETE",
self.url,
- content=body,
+ content={"new_room_user_id": "@not:exist.bla"},
access_token=self.admin_user_tok,
)
- self.assertEqual(400, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(
"User must be our own: @not:exist.bla",
channel.json_body["error"],
@@ -156,32 +155,30 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase):
"""
If parameter `block` is not boolean, return an error
"""
- body = json.dumps({"block": "NotBool"})
channel = self.make_request(
"DELETE",
self.url,
- content=body,
+ content={"block": "NotBool"},
access_token=self.admin_user_tok,
)
- self.assertEqual(400, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(Codes.BAD_JSON, channel.json_body["errcode"])
def test_purge_is_not_bool(self):
"""
If parameter `purge` is not boolean, return an error
"""
- body = json.dumps({"purge": "NotBool"})
channel = self.make_request(
"DELETE",
self.url,
- content=body,
+ content={"purge": "NotBool"},
access_token=self.admin_user_tok,
)
- self.assertEqual(400, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(Codes.BAD_JSON, channel.json_body["errcode"])
def test_purge_room_and_block(self):
@@ -198,16 +195,14 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase):
# Assert one user in room
self._is_member(room_id=self.room_id, user_id=self.other_user)
- body = json.dumps({"block": True, "purge": True})
-
channel = self.make_request(
"DELETE",
self.url.encode("ascii"),
- content=body,
+ content={"block": True, "purge": True},
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(None, channel.json_body["new_room_id"])
self.assertEqual(self.other_user, channel.json_body["kicked_users"][0])
self.assertIn("failed_to_kick_users", channel.json_body)
@@ -231,16 +226,14 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase):
# Assert one user in room
self._is_member(room_id=self.room_id, user_id=self.other_user)
- body = json.dumps({"block": False, "purge": True})
-
channel = self.make_request(
"DELETE",
self.url.encode("ascii"),
- content=body,
+ content={"block": False, "purge": True},
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(None, channel.json_body["new_room_id"])
self.assertEqual(self.other_user, channel.json_body["kicked_users"][0])
self.assertIn("failed_to_kick_users", channel.json_body)
@@ -265,16 +258,14 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase):
# Assert one user in room
self._is_member(room_id=self.room_id, user_id=self.other_user)
- body = json.dumps({"block": True, "purge": False})
-
channel = self.make_request(
"DELETE",
self.url.encode("ascii"),
- content=body,
+ content={"block": True, "purge": False},
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(None, channel.json_body["new_room_id"])
self.assertEqual(self.other_user, channel.json_body["kicked_users"][0])
self.assertIn("failed_to_kick_users", channel.json_body)
@@ -305,9 +296,7 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase):
)
# The room is now blocked.
- self.assertEqual(
- HTTPStatus.OK, int(channel.result["code"]), msg=channel.result["body"]
- )
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self._is_blocked(room_id)
def test_shutdown_room_consent(self):
@@ -327,7 +316,10 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase):
# Assert that the user is getting consent error
self.helper.send(
- self.room_id, body="foo", tok=self.other_user_tok, expect_code=403
+ self.room_id,
+ body="foo",
+ tok=self.other_user_tok,
+ expect_code=HTTPStatus.FORBIDDEN,
)
# Test that room is not purged
@@ -341,11 +333,11 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase):
channel = self.make_request(
"DELETE",
self.url,
- json.dumps({"new_room_user_id": self.admin_user}),
+ {"new_room_user_id": self.admin_user},
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(self.other_user, channel.json_body["kicked_users"][0])
self.assertIn("new_room_id", channel.json_body)
self.assertIn("failed_to_kick_users", channel.json_body)
@@ -371,10 +363,10 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase):
channel = self.make_request(
"PUT",
url.encode("ascii"),
- json.dumps({"history_visibility": "world_readable"}),
+ {"history_visibility": "world_readable"},
access_token=self.other_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
# Test that room is not purged
with self.assertRaises(AssertionError):
@@ -387,11 +379,11 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase):
channel = self.make_request(
"DELETE",
self.url,
- json.dumps({"new_room_user_id": self.admin_user}),
+ {"new_room_user_id": self.admin_user},
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(self.other_user, channel.json_body["kicked_users"][0])
self.assertIn("new_room_id", channel.json_body)
self.assertIn("failed_to_kick_users", channel.json_body)
@@ -406,7 +398,7 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase):
self._has_no_members(self.room_id)
# Assert we can no longer peek into the room
- self._assert_peek(self.room_id, expect_code=403)
+ self._assert_peek(self.room_id, expect_code=HTTPStatus.FORBIDDEN)
def _is_blocked(self, room_id, expect=True):
"""Assert that the room is blocked or not"""
@@ -465,7 +457,7 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
room.register_deprecated_servlets,
]
- def prepare(self, reactor, clock, hs):
+ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.event_creation_handler = hs.get_event_creation_handler()
hs.config.consent.user_consent_version = "1"
@@ -502,7 +494,7 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
)
def test_requester_is_no_admin(self, method: str, url: str):
"""
- If the user is not a server admin, an error 403 is returned.
+ If the user is not a server admin, an error HTTPStatus.FORBIDDEN is returned.
"""
channel = self.make_request(
@@ -515,27 +507,36 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.FORBIDDEN, channel.code, msg=channel.json_body)
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
- @parameterized.expand(
- [
- ("DELETE", "/_synapse/admin/v2/rooms/%s"),
- ("GET", "/_synapse/admin/v2/rooms/%s/delete_status"),
- ("GET", "/_synapse/admin/v2/rooms/delete_status/%s"),
- ]
- )
- def test_room_does_not_exist(self, method: str, url: str):
- """
- Check that unknown rooms/server return error 404.
+ def test_room_does_not_exist(self):
"""
+ Check that unknown rooms/server return 200
+ This is important, as it allows incomplete vestiges of rooms to be cleared up
+ even if the create event/etc is missing.
+ """
+ room_id = "!unknown:test"
channel = self.make_request(
- method,
- url % "!unknown:test",
+ "DELETE",
+ f"/_synapse/admin/v2/rooms/{room_id}",
content={},
access_token=self.admin_user_tok,
)
- self.assertEqual(HTTPStatus.NOT_FOUND, channel.code, msg=channel.json_body)
- self.assertEqual(Codes.NOT_FOUND, channel.json_body["errcode"])
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
+ self.assertIn("delete_id", channel.json_body)
+ delete_id = channel.json_body["delete_id"]
+
+ # get status
+ channel = self.make_request(
+ "GET",
+ f"/_synapse/admin/v2/rooms/{room_id}/delete_status",
+ access_token=self.admin_user_tok,
+ )
+
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
+ self.assertEqual(1, len(channel.json_body["results"]))
+ self.assertEqual("complete", channel.json_body["results"][0]["status"])
+ self.assertEqual(delete_id, channel.json_body["results"][0]["delete_id"])
@parameterized.expand(
[
@@ -545,7 +546,7 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
)
def test_room_is_not_valid(self, method: str, url: str):
"""
- Check that invalid room names, return an error 400.
+ Check that invalid room names, return an error HTTPStatus.BAD_REQUEST.
"""
channel = self.make_request(
@@ -854,7 +855,10 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
# Assert that the user is getting consent error
self.helper.send(
- self.room_id, body="foo", tok=self.other_user_tok, expect_code=403
+ self.room_id,
+ body="foo",
+ tok=self.other_user_tok,
+ expect_code=HTTPStatus.FORBIDDEN,
)
# Test that room is not purged
@@ -951,7 +955,7 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
self._has_no_members(self.room_id)
# Assert we can no longer peek into the room
- self._assert_peek(self.room_id, expect_code=403)
+ self._assert_peek(self.room_id, expect_code=HTTPStatus.FORBIDDEN)
def _is_blocked(self, room_id: str, expect: bool = True) -> None:
"""Assert that the room is blocked or not"""
@@ -1069,12 +1073,12 @@ class RoomTestCase(unittest.HomeserverTestCase):
directory.register_servlets,
]
- def prepare(self, reactor, clock, hs):
+ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
# Create user
self.admin_user = self.register_user("admin", "pass", admin=True)
self.admin_user_tok = self.login("admin", "pass")
- def test_list_rooms(self):
+ def test_list_rooms(self) -> None:
"""Test that we can list rooms"""
# Create 3 test rooms
total_rooms = 3
@@ -1094,7 +1098,7 @@ class RoomTestCase(unittest.HomeserverTestCase):
)
# Check request completed successfully
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
# Check that response json body contains a "rooms" key
self.assertTrue(
@@ -1138,7 +1142,7 @@ class RoomTestCase(unittest.HomeserverTestCase):
# We shouldn't receive a next token here as there's no further rooms to show
self.assertNotIn("next_batch", channel.json_body)
- def test_list_rooms_pagination(self):
+ def test_list_rooms_pagination(self) -> None:
"""Test that we can get a full list of rooms through pagination"""
# Create 5 test rooms
total_rooms = 5
@@ -1178,7 +1182,7 @@ class RoomTestCase(unittest.HomeserverTestCase):
url.encode("ascii"),
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertTrue("rooms" in channel.json_body)
for r in channel.json_body["rooms"]:
@@ -1218,9 +1222,9 @@ class RoomTestCase(unittest.HomeserverTestCase):
url.encode("ascii"),
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
- def test_correct_room_attributes(self):
+ def test_correct_room_attributes(self) -> None:
"""Test the correct attributes for a room are returned"""
# Create a test room
room_id = self.helper.create_room_as(self.admin_user, tok=self.admin_user_tok)
@@ -1241,7 +1245,7 @@ class RoomTestCase(unittest.HomeserverTestCase):
{"room_id": room_id},
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
# Set this new alias as the canonical alias for this room
self.helper.send_state(
@@ -1273,7 +1277,7 @@ class RoomTestCase(unittest.HomeserverTestCase):
url.encode("ascii"),
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
# Check that rooms were returned
self.assertTrue("rooms" in channel.json_body)
@@ -1301,7 +1305,7 @@ class RoomTestCase(unittest.HomeserverTestCase):
self.assertEqual(test_room_name, r["name"])
self.assertEqual(test_alias, r["canonical_alias"])
- def test_room_list_sort_order(self):
+ def test_room_list_sort_order(self) -> None:
"""Test room list sort ordering. alphabetical name versus number of members,
reversing the order, etc.
"""
@@ -1310,7 +1314,7 @@ class RoomTestCase(unittest.HomeserverTestCase):
order_type: str,
expected_room_list: List[str],
reverse: bool = False,
- ):
+ ) -> None:
"""Request the list of rooms in a certain order. Assert that order is what
we expect
@@ -1328,7 +1332,7 @@ class RoomTestCase(unittest.HomeserverTestCase):
url.encode("ascii"),
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
# Check that rooms were returned
self.assertTrue("rooms" in channel.json_body)
@@ -1439,7 +1443,7 @@ class RoomTestCase(unittest.HomeserverTestCase):
_order_test("state_events", [room_id_3, room_id_2, room_id_1])
_order_test("state_events", [room_id_1, room_id_2, room_id_3], reverse=True)
- def test_search_term(self):
+ def test_search_term(self) -> None:
"""Test that searching for a room works correctly"""
# Create two test rooms
room_id_1 = self.helper.create_room_as(self.admin_user, tok=self.admin_user_tok)
@@ -1467,8 +1471,8 @@ class RoomTestCase(unittest.HomeserverTestCase):
def _search_test(
expected_room_id: Optional[str],
search_term: str,
- expected_http_code: int = 200,
- ):
+ expected_http_code: int = HTTPStatus.OK,
+ ) -> None:
"""Search for a room and check that the returned room's id is a match
Args:
@@ -1485,7 +1489,7 @@ class RoomTestCase(unittest.HomeserverTestCase):
)
self.assertEqual(expected_http_code, channel.code, msg=channel.json_body)
- if expected_http_code != 200:
+ if expected_http_code != HTTPStatus.OK:
return
# Check that rooms were returned
@@ -1528,7 +1532,7 @@ class RoomTestCase(unittest.HomeserverTestCase):
_search_test(None, "foo")
_search_test(None, "bar")
- _search_test(None, "", expected_http_code=400)
+ _search_test(None, "", expected_http_code=HTTPStatus.BAD_REQUEST)
# Test that the whole room id returns the room
_search_test(room_id_1, room_id_1)
@@ -1542,7 +1546,7 @@ class RoomTestCase(unittest.HomeserverTestCase):
# Test search local part of alias
_search_test(room_id_1, "alias1")
- def test_search_term_non_ascii(self):
+ def test_search_term_non_ascii(self) -> None:
"""Test that searching for a room with non-ASCII characters works correctly"""
# Create test room
@@ -1565,11 +1569,11 @@ class RoomTestCase(unittest.HomeserverTestCase):
url.encode("ascii"),
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(room_id, channel.json_body.get("rooms")[0].get("room_id"))
self.assertEqual("ж", channel.json_body.get("rooms")[0].get("name"))
- def test_single_room(self):
+ def test_single_room(self) -> None:
"""Test that a single room can be requested correctly"""
# Create two test rooms
room_id_1 = self.helper.create_room_as(self.admin_user, tok=self.admin_user_tok)
@@ -1598,7 +1602,7 @@ class RoomTestCase(unittest.HomeserverTestCase):
url.encode("ascii"),
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertIn("room_id", channel.json_body)
self.assertIn("name", channel.json_body)
@@ -1620,7 +1624,7 @@ class RoomTestCase(unittest.HomeserverTestCase):
self.assertEqual(room_id_1, channel.json_body["room_id"])
- def test_single_room_devices(self):
+ def test_single_room_devices(self) -> None:
"""Test that `joined_local_devices` can be requested correctly"""
room_id_1 = self.helper.create_room_as(self.admin_user, tok=self.admin_user_tok)
@@ -1630,7 +1634,7 @@ class RoomTestCase(unittest.HomeserverTestCase):
url.encode("ascii"),
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(1, channel.json_body["joined_local_devices"])
# Have another user join the room
@@ -1644,7 +1648,7 @@ class RoomTestCase(unittest.HomeserverTestCase):
url.encode("ascii"),
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(2, channel.json_body["joined_local_devices"])
# leave room
@@ -1656,10 +1660,10 @@ class RoomTestCase(unittest.HomeserverTestCase):
url.encode("ascii"),
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(0, channel.json_body["joined_local_devices"])
- def test_room_members(self):
+ def test_room_members(self) -> None:
"""Test that room members can be requested correctly"""
# Create two test rooms
room_id_1 = self.helper.create_room_as(self.admin_user, tok=self.admin_user_tok)
@@ -1687,7 +1691,7 @@ class RoomTestCase(unittest.HomeserverTestCase):
url.encode("ascii"),
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertCountEqual(
["@admin:test", "@foo:test", "@bar:test"], channel.json_body["members"]
@@ -1700,14 +1704,14 @@ class RoomTestCase(unittest.HomeserverTestCase):
url.encode("ascii"),
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertCountEqual(
["@admin:test", "@bar:test", "@foobar:test"], channel.json_body["members"]
)
self.assertEqual(channel.json_body["total"], 3)
- def test_room_state(self):
+ def test_room_state(self) -> None:
"""Test that room state can be requested correctly"""
# Create two test rooms
room_id = self.helper.create_room_as(self.admin_user, tok=self.admin_user_tok)
@@ -1718,13 +1722,15 @@ class RoomTestCase(unittest.HomeserverTestCase):
url.encode("ascii"),
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertIn("state", channel.json_body)
# testing that the state events match is painful and not done here. We assume that
# the create_room already does the right thing, so no need to verify that we got
# the state events it created.
- def _set_canonical_alias(self, room_id: str, test_alias: str, admin_user_tok: str):
+ def _set_canonical_alias(
+ self, room_id: str, test_alias: str, admin_user_tok: str
+ ) -> None:
# Create a new alias to this room
url = "/_matrix/client/r0/directory/room/%s" % (urllib.parse.quote(test_alias),)
channel = self.make_request(
@@ -1733,7 +1739,7 @@ class RoomTestCase(unittest.HomeserverTestCase):
{"room_id": room_id},
access_token=admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
# Set this new alias as the canonical alias for this room
self.helper.send_state(
@@ -1759,7 +1765,7 @@ class JoinAliasRoomTestCase(unittest.HomeserverTestCase):
login.register_servlets,
]
- def prepare(self, reactor, clock, homeserver):
+ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.admin_user = self.register_user("admin", "pass", admin=True)
self.admin_user_tok = self.login("admin", "pass")
@@ -1774,124 +1780,117 @@ class JoinAliasRoomTestCase(unittest.HomeserverTestCase):
)
self.url = f"/_synapse/admin/v1/join/{self.public_room_id}"
- def test_requester_is_no_admin(self):
+ def test_requester_is_no_admin(self) -> None:
"""
- If the user is not a server admin, an error 403 is returned.
+ If the user is not a server admin, an error HTTPStatus.FORBIDDEN is returned.
"""
- body = json.dumps({"user_id": self.second_user_id})
channel = self.make_request(
"POST",
self.url,
- content=body,
+ content={"user_id": self.second_user_id},
access_token=self.second_tok,
)
- self.assertEqual(403, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.FORBIDDEN, channel.code, msg=channel.json_body)
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
- def test_invalid_parameter(self):
+ def test_invalid_parameter(self) -> None:
"""
If a parameter is missing, return an error
"""
- body = json.dumps({"unknown_parameter": "@unknown:test"})
channel = self.make_request(
"POST",
self.url,
- content=body,
+ content={"unknown_parameter": "@unknown:test"},
access_token=self.admin_user_tok,
)
- self.assertEqual(400, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(Codes.MISSING_PARAM, channel.json_body["errcode"])
- def test_local_user_does_not_exist(self):
+ def test_local_user_does_not_exist(self) -> None:
"""
- Tests that a lookup for a user that does not exist returns a 404
+ Tests that a lookup for a user that does not exist returns a HTTPStatus.NOT_FOUND
"""
- body = json.dumps({"user_id": "@unknown:test"})
channel = self.make_request(
"POST",
self.url,
- content=body,
+ content={"user_id": "@unknown:test"},
access_token=self.admin_user_tok,
)
- self.assertEqual(404, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.NOT_FOUND, channel.code, msg=channel.json_body)
self.assertEqual(Codes.NOT_FOUND, channel.json_body["errcode"])
- def test_remote_user(self):
+ def test_remote_user(self) -> None:
"""
Check that only local user can join rooms.
"""
- body = json.dumps({"user_id": "@not:exist.bla"})
channel = self.make_request(
"POST",
self.url,
- content=body,
+ content={"user_id": "@not:exist.bla"},
access_token=self.admin_user_tok,
)
- self.assertEqual(400, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(
"This endpoint can only be used with local users",
channel.json_body["error"],
)
- def test_room_does_not_exist(self):
+ def test_room_does_not_exist(self) -> None:
"""
- Check that unknown rooms/server return error 404.
+ Check that unknown rooms/server return error HTTPStatus.NOT_FOUND.
"""
- body = json.dumps({"user_id": self.second_user_id})
url = "/_synapse/admin/v1/join/!unknown:test"
channel = self.make_request(
"POST",
url,
- content=body,
+ content={"user_id": self.second_user_id},
access_token=self.admin_user_tok,
)
- self.assertEqual(404, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.NOT_FOUND, channel.code, msg=channel.json_body)
self.assertEqual("No known servers", channel.json_body["error"])
- def test_room_is_not_valid(self):
+ def test_room_is_not_valid(self) -> None:
"""
- Check that invalid room names, return an error 400.
+ Check that invalid room names, return an error HTTPStatus.BAD_REQUEST.
"""
- body = json.dumps({"user_id": self.second_user_id})
url = "/_synapse/admin/v1/join/invalidroom"
channel = self.make_request(
"POST",
url,
- content=body,
+ content={"user_id": self.second_user_id},
access_token=self.admin_user_tok,
)
- self.assertEqual(400, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(
"invalidroom was not legal room ID or room alias",
channel.json_body["error"],
)
- def test_join_public_room(self):
+ def test_join_public_room(self) -> None:
"""
Test joining a local user to a public room with "JoinRules.PUBLIC"
"""
- body = json.dumps({"user_id": self.second_user_id})
channel = self.make_request(
"POST",
self.url,
- content=body,
+ content={"user_id": self.second_user_id},
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(self.public_room_id, channel.json_body["room_id"])
# Validate if user is a member of the room
@@ -1901,10 +1900,10 @@ class JoinAliasRoomTestCase(unittest.HomeserverTestCase):
"/_matrix/client/r0/joined_rooms",
access_token=self.second_tok,
)
- self.assertEquals(200, channel.code, msg=channel.json_body)
+ self.assertEquals(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(self.public_room_id, channel.json_body["joined_rooms"][0])
- def test_join_private_room_if_not_member(self):
+ def test_join_private_room_if_not_member(self) -> None:
"""
Test joining a local user to a private room with "JoinRules.INVITE"
when server admin is not member of this room.
@@ -1913,19 +1912,18 @@ class JoinAliasRoomTestCase(unittest.HomeserverTestCase):
self.creator, tok=self.creator_tok, is_public=False
)
url = f"/_synapse/admin/v1/join/{private_room_id}"
- body = json.dumps({"user_id": self.second_user_id})
channel = self.make_request(
"POST",
url,
- content=body,
+ content={"user_id": self.second_user_id},
access_token=self.admin_user_tok,
)
- self.assertEqual(403, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.FORBIDDEN, channel.code, msg=channel.json_body)
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
- def test_join_private_room_if_member(self):
+ def test_join_private_room_if_member(self) -> None:
"""
Test joining a local user to a private room with "JoinRules.INVITE",
when server admin is member of this room.
@@ -1950,21 +1948,20 @@ class JoinAliasRoomTestCase(unittest.HomeserverTestCase):
"/_matrix/client/r0/joined_rooms",
access_token=self.admin_user_tok,
)
- self.assertEquals(200, channel.code, msg=channel.json_body)
+ self.assertEquals(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(private_room_id, channel.json_body["joined_rooms"][0])
# Join user to room.
url = f"/_synapse/admin/v1/join/{private_room_id}"
- body = json.dumps({"user_id": self.second_user_id})
channel = self.make_request(
"POST",
url,
- content=body,
+ content={"user_id": self.second_user_id},
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(private_room_id, channel.json_body["room_id"])
# Validate if user is a member of the room
@@ -1974,10 +1971,10 @@ class JoinAliasRoomTestCase(unittest.HomeserverTestCase):
"/_matrix/client/r0/joined_rooms",
access_token=self.second_tok,
)
- self.assertEquals(200, channel.code, msg=channel.json_body)
+ self.assertEquals(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(private_room_id, channel.json_body["joined_rooms"][0])
- def test_join_private_room_if_owner(self):
+ def test_join_private_room_if_owner(self) -> None:
"""
Test joining a local user to a private room with "JoinRules.INVITE",
when server admin is owner of this room.
@@ -1986,16 +1983,15 @@ class JoinAliasRoomTestCase(unittest.HomeserverTestCase):
self.admin_user, tok=self.admin_user_tok, is_public=False
)
url = f"/_synapse/admin/v1/join/{private_room_id}"
- body = json.dumps({"user_id": self.second_user_id})
channel = self.make_request(
"POST",
url,
- content=body,
+ content={"user_id": self.second_user_id},
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(private_room_id, channel.json_body["room_id"])
# Validate if user is a member of the room
@@ -2005,10 +2001,10 @@ class JoinAliasRoomTestCase(unittest.HomeserverTestCase):
"/_matrix/client/r0/joined_rooms",
access_token=self.second_tok,
)
- self.assertEquals(200, channel.code, msg=channel.json_body)
+ self.assertEquals(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(private_room_id, channel.json_body["joined_rooms"][0])
- def test_context_as_non_admin(self):
+ def test_context_as_non_admin(self) -> None:
"""
Test that, without being admin, one cannot use the context admin API
"""
@@ -2039,10 +2035,10 @@ class JoinAliasRoomTestCase(unittest.HomeserverTestCase):
% (room_id, events[midway]["event_id"]),
access_token=tok,
)
- self.assertEquals(403, channel.code, msg=channel.json_body)
+ self.assertEquals(HTTPStatus.FORBIDDEN, channel.code, msg=channel.json_body)
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
- def test_context_as_admin(self):
+ def test_context_as_admin(self) -> None:
"""
Test that, as admin, we can find the context of an event without having joined the room.
"""
@@ -2069,7 +2065,7 @@ class JoinAliasRoomTestCase(unittest.HomeserverTestCase):
% (room_id, events[midway]["event_id"]),
access_token=self.admin_user_tok,
)
- self.assertEquals(200, channel.code, msg=channel.json_body)
+ self.assertEquals(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEquals(
channel.json_body["event"]["event_id"], events[midway]["event_id"]
)
@@ -2098,7 +2094,7 @@ class MakeRoomAdminTestCase(unittest.HomeserverTestCase):
login.register_servlets,
]
- def prepare(self, reactor, clock, homeserver):
+ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.admin_user = self.register_user("admin", "pass", admin=True)
self.admin_user_tok = self.login("admin", "pass")
@@ -2115,7 +2111,7 @@ class MakeRoomAdminTestCase(unittest.HomeserverTestCase):
self.public_room_id
)
- def test_public_room(self):
+ def test_public_room(self) -> None:
"""Test that getting admin in a public room works."""
room_id = self.helper.create_room_as(
self.creator, tok=self.creator_tok, is_public=True
@@ -2128,7 +2124,7 @@ class MakeRoomAdminTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
# Now we test that we can join the room and ban a user.
self.helper.join(room_id, self.admin_user, tok=self.admin_user_tok)
@@ -2140,7 +2136,7 @@ class MakeRoomAdminTestCase(unittest.HomeserverTestCase):
tok=self.admin_user_tok,
)
- def test_private_room(self):
+ def test_private_room(self) -> None:
"""Test that getting admin in a private room works and we get invited."""
room_id = self.helper.create_room_as(
self.creator,
@@ -2155,7 +2151,7 @@ class MakeRoomAdminTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
# Now we test that we can join the room (we should have received an
# invite) and can ban a user.
@@ -2168,7 +2164,7 @@ class MakeRoomAdminTestCase(unittest.HomeserverTestCase):
tok=self.admin_user_tok,
)
- def test_other_user(self):
+ def test_other_user(self) -> None:
"""Test that giving admin in a public room works to a non-admin user works."""
room_id = self.helper.create_room_as(
self.creator, tok=self.creator_tok, is_public=True
@@ -2181,7 +2177,7 @@ class MakeRoomAdminTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
# Now we test that we can join the room and ban a user.
self.helper.join(room_id, self.second_user_id, tok=self.second_tok)
@@ -2193,7 +2189,7 @@ class MakeRoomAdminTestCase(unittest.HomeserverTestCase):
tok=self.second_tok,
)
- def test_not_enough_power(self):
+ def test_not_enough_power(self) -> None:
"""Test that we get a sensible error if there are no local room admins."""
room_id = self.helper.create_room_as(
self.creator, tok=self.creator_tok, is_public=True
@@ -2215,11 +2211,11 @@ class MakeRoomAdminTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- # We expect this to fail with a 400 as there are no room admins.
+ # We expect this to fail with a HTTPStatus.BAD_REQUEST as there are no room admins.
#
# (Note we assert the error message to ensure that it's not denied for
# some other reason)
- self.assertEqual(400, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(
channel.json_body["error"],
"No local admin user in room with power to update power levels.",
@@ -2233,7 +2229,7 @@ class BlockRoomTestCase(unittest.HomeserverTestCase):
login.register_servlets,
]
- def prepare(self, reactor, clock, hs):
+ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self._store = hs.get_datastore()
self.admin_user = self.register_user("admin", "pass", admin=True)
@@ -2248,8 +2244,8 @@ class BlockRoomTestCase(unittest.HomeserverTestCase):
self.url = "/_synapse/admin/v1/rooms/%s/block"
@parameterized.expand([("PUT",), ("GET",)])
- def test_requester_is_no_admin(self, method: str):
- """If the user is not a server admin, an error 403 is returned."""
+ def test_requester_is_no_admin(self, method: str) -> None:
+ """If the user is not a server admin, an error HTTPStatus.FORBIDDEN is returned."""
channel = self.make_request(
method,
@@ -2262,8 +2258,8 @@ class BlockRoomTestCase(unittest.HomeserverTestCase):
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
@parameterized.expand([("PUT",), ("GET",)])
- def test_room_is_not_valid(self, method: str):
- """Check that invalid room names, return an error 400."""
+ def test_room_is_not_valid(self, method: str) -> None:
+ """Check that invalid room names, return an error HTTPStatus.BAD_REQUEST."""
channel = self.make_request(
method,
@@ -2278,7 +2274,7 @@ class BlockRoomTestCase(unittest.HomeserverTestCase):
channel.json_body["error"],
)
- def test_block_is_not_valid(self):
+ def test_block_is_not_valid(self) -> None:
"""If parameter `block` is not valid, return an error."""
# `block` is not valid
@@ -2313,7 +2309,7 @@ class BlockRoomTestCase(unittest.HomeserverTestCase):
self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(Codes.NOT_JSON, channel.json_body["errcode"])
- def test_block_room(self):
+ def test_block_room(self) -> None:
"""Test that block a room is successful."""
def _request_and_test_block_room(room_id: str) -> None:
@@ -2337,7 +2333,7 @@ class BlockRoomTestCase(unittest.HomeserverTestCase):
# unknown remote room
_request_and_test_block_room("!unknown:remote")
- def test_block_room_twice(self):
+ def test_block_room_twice(self) -> None:
"""Test that block a room that is already blocked is successful."""
self._is_blocked(self.room_id, expect=False)
@@ -2352,7 +2348,7 @@ class BlockRoomTestCase(unittest.HomeserverTestCase):
self.assertTrue(channel.json_body["block"])
self._is_blocked(self.room_id, expect=True)
- def test_unblock_room(self):
+ def test_unblock_room(self) -> None:
"""Test that unblock a room is successful."""
def _request_and_test_unblock_room(room_id: str) -> None:
@@ -2377,7 +2373,7 @@ class BlockRoomTestCase(unittest.HomeserverTestCase):
# unknown remote room
_request_and_test_unblock_room("!unknown:remote")
- def test_unblock_room_twice(self):
+ def test_unblock_room_twice(self) -> None:
"""Test that unblock a room that is not blocked is successful."""
self._block_room(self.room_id)
@@ -2392,7 +2388,7 @@ class BlockRoomTestCase(unittest.HomeserverTestCase):
self.assertFalse(channel.json_body["block"])
self._is_blocked(self.room_id, expect=False)
- def test_get_blocked_room(self):
+ def test_get_blocked_room(self) -> None:
"""Test get status of a blocked room"""
def _request_blocked_room(room_id: str) -> None:
@@ -2416,7 +2412,7 @@ class BlockRoomTestCase(unittest.HomeserverTestCase):
# unknown remote room
_request_blocked_room("!unknown:remote")
- def test_get_unblocked_room(self):
+ def test_get_unblocked_room(self) -> None:
"""Test get status of a unblocked room"""
def _request_unblocked_room(room_id: str) -> None:
diff --git a/tests/rest/admin/test_server_notice.py b/tests/rest/admin/test_server_notice.py
index fbceba3254..3c59f5f766 100644
--- a/tests/rest/admin/test_server_notice.py
+++ b/tests/rest/admin/test_server_notice.py
@@ -11,14 +11,18 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
+from http import HTTPStatus
from typing import List
+from twisted.test.proto_helpers import MemoryReactor
+
import synapse.rest.admin
from synapse.api.errors import Codes
from synapse.rest.client import login, room, sync
+from synapse.server import HomeServer
from synapse.storage.roommember import RoomsForUser
from synapse.types import JsonDict
+from synapse.util import Clock
from tests import unittest
from tests.unittest import override_config
@@ -33,7 +37,7 @@ class ServerNoticeTestCase(unittest.HomeserverTestCase):
sync.register_servlets,
]
- def prepare(self, reactor, clock, hs):
+ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastore()
self.room_shutdown_handler = hs.get_room_shutdown_handler()
self.pagination_handler = hs.get_pagination_handler()
@@ -48,14 +52,18 @@ class ServerNoticeTestCase(unittest.HomeserverTestCase):
self.url = "/_synapse/admin/v1/send_server_notice"
- def test_no_auth(self):
+ def test_no_auth(self) -> None:
"""Try to send a server notice without authentication."""
channel = self.make_request("POST", self.url)
- self.assertEqual(401, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.UNAUTHORIZED,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
- def test_requester_is_no_admin(self):
+ def test_requester_is_no_admin(self) -> None:
"""If the user is not a server admin, an error is returned."""
channel = self.make_request(
"POST",
@@ -63,12 +71,16 @@ class ServerNoticeTestCase(unittest.HomeserverTestCase):
access_token=self.other_user_token,
)
- self.assertEqual(403, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.FORBIDDEN,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
@override_config({"server_notices": {"system_mxid_localpart": "notices"}})
- def test_user_does_not_exist(self):
- """Tests that a lookup for a user that does not exist returns a 404"""
+ def test_user_does_not_exist(self) -> None:
+ """Tests that a lookup for a user that does not exist returns a HTTPStatus.NOT_FOUND"""
channel = self.make_request(
"POST",
self.url,
@@ -76,13 +88,13 @@ class ServerNoticeTestCase(unittest.HomeserverTestCase):
content={"user_id": "@unknown_person:test", "content": ""},
)
- self.assertEqual(404, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.NOT_FOUND, channel.code, msg=channel.json_body)
self.assertEqual(Codes.NOT_FOUND, channel.json_body["errcode"])
@override_config({"server_notices": {"system_mxid_localpart": "notices"}})
- def test_user_is_not_local(self):
+ def test_user_is_not_local(self) -> None:
"""
- Tests that a lookup for a user that is not a local returns a 400
+ Tests that a lookup for a user that is not a local returns a HTTPStatus.BAD_REQUEST
"""
channel = self.make_request(
"POST",
@@ -94,13 +106,13 @@ class ServerNoticeTestCase(unittest.HomeserverTestCase):
},
)
- self.assertEqual(400, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(
"Server notices can only be sent to local users", channel.json_body["error"]
)
@override_config({"server_notices": {"system_mxid_localpart": "notices"}})
- def test_invalid_parameter(self):
+ def test_invalid_parameter(self) -> None:
"""If parameters are invalid, an error is returned."""
# no content, no user
@@ -110,7 +122,7 @@ class ServerNoticeTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(400, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(Codes.NOT_JSON, channel.json_body["errcode"])
# no content
@@ -121,7 +133,7 @@ class ServerNoticeTestCase(unittest.HomeserverTestCase):
content={"user_id": self.other_user},
)
- self.assertEqual(400, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(Codes.MISSING_PARAM, channel.json_body["errcode"])
# no body
@@ -132,7 +144,7 @@ class ServerNoticeTestCase(unittest.HomeserverTestCase):
content={"user_id": self.other_user, "content": ""},
)
- self.assertEqual(400, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(Codes.UNKNOWN, channel.json_body["errcode"])
self.assertEqual("'body' not in content", channel.json_body["error"])
@@ -144,11 +156,11 @@ class ServerNoticeTestCase(unittest.HomeserverTestCase):
content={"user_id": self.other_user, "content": {"body": ""}},
)
- self.assertEqual(400, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(Codes.UNKNOWN, channel.json_body["errcode"])
self.assertEqual("'msgtype' not in content", channel.json_body["error"])
- def test_server_notice_disabled(self):
+ def test_server_notice_disabled(self) -> None:
"""Tests that server returns error if server notice is disabled"""
channel = self.make_request(
"POST",
@@ -160,14 +172,14 @@ class ServerNoticeTestCase(unittest.HomeserverTestCase):
},
)
- self.assertEqual(400, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(Codes.UNKNOWN, channel.json_body["errcode"])
self.assertEqual(
"Server notices are not enabled on this server", channel.json_body["error"]
)
@override_config({"server_notices": {"system_mxid_localpart": "notices"}})
- def test_send_server_notice(self):
+ def test_send_server_notice(self) -> None:
"""
Tests that sending two server notices is successfully,
the server uses the same room and do not send messages twice.
@@ -185,7 +197,7 @@ class ServerNoticeTestCase(unittest.HomeserverTestCase):
"content": {"msgtype": "m.text", "body": "test msg one"},
},
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
# user has one invite
invited_rooms = self._check_invite_and_join_status(self.other_user, 1, 0)
@@ -216,7 +228,7 @@ class ServerNoticeTestCase(unittest.HomeserverTestCase):
"content": {"msgtype": "m.text", "body": "test msg two"},
},
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
# user has no new invites or memberships
self._check_invite_and_join_status(self.other_user, 0, 1)
@@ -231,7 +243,7 @@ class ServerNoticeTestCase(unittest.HomeserverTestCase):
self.assertEqual(messages[1]["sender"], "@notices:test")
@override_config({"server_notices": {"system_mxid_localpart": "notices"}})
- def test_send_server_notice_leave_room(self):
+ def test_send_server_notice_leave_room(self) -> None:
"""
Tests that sending a server notices is successfully.
The user leaves the room and the second message appears
@@ -250,7 +262,7 @@ class ServerNoticeTestCase(unittest.HomeserverTestCase):
"content": {"msgtype": "m.text", "body": "test msg one"},
},
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
# user has one invite
invited_rooms = self._check_invite_and_join_status(self.other_user, 1, 0)
@@ -293,7 +305,7 @@ class ServerNoticeTestCase(unittest.HomeserverTestCase):
"content": {"msgtype": "m.text", "body": "test msg two"},
},
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
# user has one invite
invited_rooms = self._check_invite_and_join_status(self.other_user, 1, 0)
@@ -315,7 +327,7 @@ class ServerNoticeTestCase(unittest.HomeserverTestCase):
self.assertNotEqual(first_room_id, second_room_id)
@override_config({"server_notices": {"system_mxid_localpart": "notices"}})
- def test_send_server_notice_delete_room(self):
+ def test_send_server_notice_delete_room(self) -> None:
"""
Tests that the user get server notice in a new room
after the first server notice room was deleted.
@@ -333,7 +345,7 @@ class ServerNoticeTestCase(unittest.HomeserverTestCase):
"content": {"msgtype": "m.text", "body": "test msg one"},
},
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
# user has one invite
invited_rooms = self._check_invite_and_join_status(self.other_user, 1, 0)
@@ -382,7 +394,7 @@ class ServerNoticeTestCase(unittest.HomeserverTestCase):
"content": {"msgtype": "m.text", "body": "test msg two"},
},
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
# user has one invite
invited_rooms = self._check_invite_and_join_status(self.other_user, 1, 0)
@@ -405,7 +417,7 @@ class ServerNoticeTestCase(unittest.HomeserverTestCase):
def _check_invite_and_join_status(
self, user_id: str, expected_invites: int, expected_memberships: int
- ) -> RoomsForUser:
+ ) -> List[RoomsForUser]:
"""Check invite and room membership status of a user.
Args
@@ -440,7 +452,7 @@ class ServerNoticeTestCase(unittest.HomeserverTestCase):
channel = self.make_request(
"GET", "/_matrix/client/r0/sync", access_token=token
)
- self.assertEqual(channel.code, 200)
+ self.assertEqual(channel.code, HTTPStatus.OK)
# Get the messages
room = channel.json_body["rooms"]["join"][room_id]
diff --git a/tests/rest/admin/test_statistics.py b/tests/rest/admin/test_statistics.py
index ece89a65ac..7cb8ec57ba 100644
--- a/tests/rest/admin/test_statistics.py
+++ b/tests/rest/admin/test_statistics.py
@@ -12,13 +12,17 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+from http import HTTPStatus
+from typing import List, Optional
-import json
-from typing import Any, Dict, List, Optional
+from twisted.test.proto_helpers import MemoryReactor
import synapse.rest.admin
from synapse.api.errors import Codes
from synapse.rest.client import login
+from synapse.server import HomeServer
+from synapse.types import JsonDict
+from synapse.util import Clock
from tests import unittest
from tests.test_utils import SMALL_PNG
@@ -30,7 +34,7 @@ class UserMediaStatisticsTestCase(unittest.HomeserverTestCase):
login.register_servlets,
]
- def prepare(self, reactor, clock, hs):
+ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.media_repo = hs.get_media_repository_resource()
self.admin_user = self.register_user("admin", "pass", admin=True)
@@ -41,30 +45,38 @@ class UserMediaStatisticsTestCase(unittest.HomeserverTestCase):
self.url = "/_synapse/admin/v1/statistics/users/media"
- def test_no_auth(self):
+ def test_no_auth(self) -> None:
"""
Try to list users without authentication.
"""
channel = self.make_request("GET", self.url, b"{}")
- self.assertEqual(401, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.UNAUTHORIZED,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
- def test_requester_is_no_admin(self):
+ def test_requester_is_no_admin(self) -> None:
"""
- If the user is not a server admin, an error 403 is returned.
+ If the user is not a server admin, an error HTTPStatus.FORBIDDEN is returned.
"""
channel = self.make_request(
"GET",
self.url,
- json.dumps({}),
+ {},
access_token=self.other_user_tok,
)
- self.assertEqual(403, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.FORBIDDEN,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
- def test_invalid_parameter(self):
+ def test_invalid_parameter(self) -> None:
"""
If parameters are invalid, an error is returned.
"""
@@ -75,7 +87,11 @@ class UserMediaStatisticsTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.BAD_REQUEST,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"])
# negative from
@@ -85,7 +101,11 @@ class UserMediaStatisticsTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.BAD_REQUEST,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"])
# negative limit
@@ -95,7 +115,11 @@ class UserMediaStatisticsTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.BAD_REQUEST,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"])
# negative from_ts
@@ -105,7 +129,11 @@ class UserMediaStatisticsTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.BAD_REQUEST,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"])
# negative until_ts
@@ -115,7 +143,11 @@ class UserMediaStatisticsTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.BAD_REQUEST,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"])
# until_ts smaller from_ts
@@ -125,7 +157,11 @@ class UserMediaStatisticsTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.BAD_REQUEST,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"])
# empty search term
@@ -135,7 +171,11 @@ class UserMediaStatisticsTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.BAD_REQUEST,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"])
# invalid search order
@@ -145,10 +185,14 @@ class UserMediaStatisticsTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.BAD_REQUEST,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"])
- def test_limit(self):
+ def test_limit(self) -> None:
"""
Testing list of media with limit
"""
@@ -160,13 +204,13 @@ class UserMediaStatisticsTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["total"], 10)
self.assertEqual(len(channel.json_body["users"]), 5)
self.assertEqual(channel.json_body["next_token"], 5)
self._check_fields(channel.json_body["users"])
- def test_from(self):
+ def test_from(self) -> None:
"""
Testing list of media with a defined starting point (from)
"""
@@ -178,13 +222,13 @@ class UserMediaStatisticsTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["total"], 20)
self.assertEqual(len(channel.json_body["users"]), 15)
self.assertNotIn("next_token", channel.json_body)
self._check_fields(channel.json_body["users"])
- def test_limit_and_from(self):
+ def test_limit_and_from(self) -> None:
"""
Testing list of media with a defined starting point and limit
"""
@@ -196,13 +240,13 @@ class UserMediaStatisticsTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["total"], 20)
self.assertEqual(channel.json_body["next_token"], 15)
self.assertEqual(len(channel.json_body["users"]), 10)
self._check_fields(channel.json_body["users"])
- def test_next_token(self):
+ def test_next_token(self) -> None:
"""
Testing that `next_token` appears at the right place
"""
@@ -218,7 +262,7 @@ class UserMediaStatisticsTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["total"], number_users)
self.assertEqual(len(channel.json_body["users"]), number_users)
self.assertNotIn("next_token", channel.json_body)
@@ -231,7 +275,7 @@ class UserMediaStatisticsTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["total"], number_users)
self.assertEqual(len(channel.json_body["users"]), number_users)
self.assertNotIn("next_token", channel.json_body)
@@ -244,7 +288,7 @@ class UserMediaStatisticsTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["total"], number_users)
self.assertEqual(len(channel.json_body["users"]), 19)
self.assertEqual(channel.json_body["next_token"], 19)
@@ -257,12 +301,12 @@ class UserMediaStatisticsTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["total"], number_users)
self.assertEqual(len(channel.json_body["users"]), 1)
self.assertNotIn("next_token", channel.json_body)
- def test_no_media(self):
+ def test_no_media(self) -> None:
"""
Tests that a normal lookup for statistics is successfully
if users have no media created
@@ -274,11 +318,11 @@ class UserMediaStatisticsTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(0, channel.json_body["total"])
self.assertEqual(0, len(channel.json_body["users"]))
- def test_order_by(self):
+ def test_order_by(self) -> None:
"""
Testing order list with parameter `order_by`
"""
@@ -356,7 +400,7 @@ class UserMediaStatisticsTestCase(unittest.HomeserverTestCase):
"b",
)
- def test_from_until_ts(self):
+ def test_from_until_ts(self) -> None:
"""
Testing filter by time with parameters `from_ts` and `until_ts`
"""
@@ -371,7 +415,7 @@ class UserMediaStatisticsTestCase(unittest.HomeserverTestCase):
self.url,
access_token=self.admin_user_tok,
)
- self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["users"][0]["media_count"], 3)
# filter media starting at `ts1` after creating first media
@@ -381,7 +425,7 @@ class UserMediaStatisticsTestCase(unittest.HomeserverTestCase):
self.url + "?from_ts=%s" % (ts1,),
access_token=self.admin_user_tok,
)
- self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["total"], 0)
self._create_media(self.other_user_tok, 3)
@@ -396,7 +440,7 @@ class UserMediaStatisticsTestCase(unittest.HomeserverTestCase):
self.url + "?from_ts=%s&until_ts=%s" % (ts1, ts2),
access_token=self.admin_user_tok,
)
- self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["users"][0]["media_count"], 3)
# filter media until `ts2` and earlier
@@ -405,10 +449,10 @@ class UserMediaStatisticsTestCase(unittest.HomeserverTestCase):
self.url + "?until_ts=%s" % (ts2,),
access_token=self.admin_user_tok,
)
- self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["users"][0]["media_count"], 6)
- def test_search_term(self):
+ def test_search_term(self) -> None:
self._create_users_with_media(20, 1)
# check without filter get all users
@@ -417,7 +461,7 @@ class UserMediaStatisticsTestCase(unittest.HomeserverTestCase):
self.url,
access_token=self.admin_user_tok,
)
- self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["total"], 20)
# filter user 1 and 10-19 by `user_id`
@@ -426,7 +470,7 @@ class UserMediaStatisticsTestCase(unittest.HomeserverTestCase):
self.url + "?search_term=foo_user_1",
access_token=self.admin_user_tok,
)
- self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["total"], 11)
# filter on this user in `displayname`
@@ -435,7 +479,7 @@ class UserMediaStatisticsTestCase(unittest.HomeserverTestCase):
self.url + "?search_term=bar_user_10",
access_token=self.admin_user_tok,
)
- self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["users"][0]["displayname"], "bar_user_10")
self.assertEqual(channel.json_body["total"], 1)
@@ -445,10 +489,10 @@ class UserMediaStatisticsTestCase(unittest.HomeserverTestCase):
self.url + "?search_term=foobar",
access_token=self.admin_user_tok,
)
- self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["total"], 0)
- def _create_users_with_media(self, number_users: int, media_per_user: int):
+ def _create_users_with_media(self, number_users: int, media_per_user: int) -> None:
"""
Create a number of users with a number of media
Args:
@@ -460,7 +504,7 @@ class UserMediaStatisticsTestCase(unittest.HomeserverTestCase):
user_tok = self.login("foo_user_%s" % i, "pass")
self._create_media(user_tok, media_per_user)
- def _create_media(self, user_token: str, number_media: int):
+ def _create_media(self, user_token: str, number_media: int) -> None:
"""
Create a number of media for a specific user
Args:
@@ -471,10 +515,10 @@ class UserMediaStatisticsTestCase(unittest.HomeserverTestCase):
for _ in range(number_media):
# Upload some media into the room
self.helper.upload_media(
- upload_resource, SMALL_PNG, tok=user_token, expect_code=200
+ upload_resource, SMALL_PNG, tok=user_token, expect_code=HTTPStatus.OK
)
- def _check_fields(self, content: List[Dict[str, Any]]):
+ def _check_fields(self, content: List[JsonDict]) -> None:
"""Checks that all attributes are present in content
Args:
content: List that is checked for content
@@ -487,7 +531,7 @@ class UserMediaStatisticsTestCase(unittest.HomeserverTestCase):
def _order_test(
self, order_type: str, expected_user_list: List[str], dir: Optional[str] = None
- ):
+ ) -> None:
"""Request the list of users in a certain order. Assert that order is what
we expect
Args:
@@ -505,7 +549,7 @@ class UserMediaStatisticsTestCase(unittest.HomeserverTestCase):
url.encode("ascii"),
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["total"], len(expected_user_list))
returned_order = [row["user_id"] for row in channel.json_body["users"]]
diff --git a/tests/rest/admin/test_user.py b/tests/rest/admin/test_user.py
index 5011e54563..4fedd5fd08 100644
--- a/tests/rest/admin/test_user.py
+++ b/tests/rest/admin/test_user.py
@@ -17,6 +17,7 @@ import hmac
import os
import urllib.parse
from binascii import unhexlify
+from http import HTTPStatus
from typing import List, Optional
from unittest.mock import Mock, patch
@@ -74,7 +75,7 @@ class UserRegisterTestCase(unittest.HomeserverTestCase):
channel = self.make_request("POST", self.url, b"{}")
- self.assertEqual(400, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(
"Shared secret registration is not enabled", channel.json_body["error"]
)
@@ -106,7 +107,7 @@ class UserRegisterTestCase(unittest.HomeserverTestCase):
body = {"nonce": nonce}
channel = self.make_request("POST", self.url, body)
- self.assertEqual(400, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual("username must be specified", channel.json_body["error"])
# 61 seconds
@@ -114,7 +115,7 @@ class UserRegisterTestCase(unittest.HomeserverTestCase):
channel = self.make_request("POST", self.url, body)
- self.assertEqual(400, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual("unrecognised nonce", channel.json_body["error"])
def test_register_incorrect_nonce(self):
@@ -126,18 +127,18 @@ class UserRegisterTestCase(unittest.HomeserverTestCase):
want_mac = hmac.new(key=b"shared", digestmod=hashlib.sha1)
want_mac.update(b"notthenonce\x00bob\x00abc123\x00admin")
- want_mac = want_mac.hexdigest()
+ want_mac_str = want_mac.hexdigest()
body = {
"nonce": nonce,
"username": "bob",
"password": "abc123",
"admin": True,
- "mac": want_mac,
+ "mac": want_mac_str,
}
channel = self.make_request("POST", self.url, body)
- self.assertEqual(403, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.FORBIDDEN, channel.code, msg=channel.json_body)
self.assertEqual("HMAC incorrect", channel.json_body["error"])
def test_register_correct_nonce(self):
@@ -152,7 +153,7 @@ class UserRegisterTestCase(unittest.HomeserverTestCase):
want_mac.update(
nonce.encode("ascii") + b"\x00bob\x00abc123\x00admin\x00support"
)
- want_mac = want_mac.hexdigest()
+ want_mac_str = want_mac.hexdigest()
body = {
"nonce": nonce,
@@ -160,11 +161,11 @@ class UserRegisterTestCase(unittest.HomeserverTestCase):
"password": "abc123",
"admin": True,
"user_type": UserTypes.SUPPORT,
- "mac": want_mac,
+ "mac": want_mac_str,
}
channel = self.make_request("POST", self.url, body)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual("@bob:test", channel.json_body["user_id"])
def test_nonce_reuse(self):
@@ -176,24 +177,24 @@ class UserRegisterTestCase(unittest.HomeserverTestCase):
want_mac = hmac.new(key=b"shared", digestmod=hashlib.sha1)
want_mac.update(nonce.encode("ascii") + b"\x00bob\x00abc123\x00admin")
- want_mac = want_mac.hexdigest()
+ want_mac_str = want_mac.hexdigest()
body = {
"nonce": nonce,
"username": "bob",
"password": "abc123",
"admin": True,
- "mac": want_mac,
+ "mac": want_mac_str,
}
channel = self.make_request("POST", self.url, body)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual("@bob:test", channel.json_body["user_id"])
# Now, try and reuse it
channel = self.make_request("POST", self.url, body)
- self.assertEqual(400, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual("unrecognised nonce", channel.json_body["error"])
def test_missing_parts(self):
@@ -214,7 +215,7 @@ class UserRegisterTestCase(unittest.HomeserverTestCase):
# Must be an empty body present
channel = self.make_request("POST", self.url, {})
- self.assertEqual(400, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual("nonce must be specified", channel.json_body["error"])
#
@@ -224,28 +225,28 @@ class UserRegisterTestCase(unittest.HomeserverTestCase):
# Must be present
channel = self.make_request("POST", self.url, {"nonce": nonce()})
- self.assertEqual(400, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual("username must be specified", channel.json_body["error"])
# Must be a string
body = {"nonce": nonce(), "username": 1234}
channel = self.make_request("POST", self.url, body)
- self.assertEqual(400, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual("Invalid username", channel.json_body["error"])
# Must not have null bytes
body = {"nonce": nonce(), "username": "abcd\u0000"}
channel = self.make_request("POST", self.url, body)
- self.assertEqual(400, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual("Invalid username", channel.json_body["error"])
# Must not have null bytes
body = {"nonce": nonce(), "username": "a" * 1000}
channel = self.make_request("POST", self.url, body)
- self.assertEqual(400, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual("Invalid username", channel.json_body["error"])
#
@@ -256,28 +257,28 @@ class UserRegisterTestCase(unittest.HomeserverTestCase):
body = {"nonce": nonce(), "username": "a"}
channel = self.make_request("POST", self.url, body)
- self.assertEqual(400, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual("password must be specified", channel.json_body["error"])
# Must be a string
body = {"nonce": nonce(), "username": "a", "password": 1234}
channel = self.make_request("POST", self.url, body)
- self.assertEqual(400, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual("Invalid password", channel.json_body["error"])
# Must not have null bytes
body = {"nonce": nonce(), "username": "a", "password": "abcd\u0000"}
channel = self.make_request("POST", self.url, body)
- self.assertEqual(400, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual("Invalid password", channel.json_body["error"])
# Super long
body = {"nonce": nonce(), "username": "a", "password": "A" * 1000}
channel = self.make_request("POST", self.url, body)
- self.assertEqual(400, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual("Invalid password", channel.json_body["error"])
#
@@ -293,7 +294,7 @@ class UserRegisterTestCase(unittest.HomeserverTestCase):
}
channel = self.make_request("POST", self.url, body)
- self.assertEqual(400, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual("Invalid user type", channel.json_body["error"])
def test_displayname(self):
@@ -307,22 +308,22 @@ class UserRegisterTestCase(unittest.HomeserverTestCase):
want_mac = hmac.new(key=b"shared", digestmod=hashlib.sha1)
want_mac.update(nonce.encode("ascii") + b"\x00bob1\x00abc123\x00notadmin")
- want_mac = want_mac.hexdigest()
+ want_mac_str = want_mac.hexdigest()
body = {
"nonce": nonce,
"username": "bob1",
"password": "abc123",
- "mac": want_mac,
+ "mac": want_mac_str,
}
channel = self.make_request("POST", self.url, body)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual("@bob1:test", channel.json_body["user_id"])
channel = self.make_request("GET", "/profile/@bob1:test/displayname")
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual("bob1", channel.json_body["displayname"])
# displayname is None
@@ -331,22 +332,22 @@ class UserRegisterTestCase(unittest.HomeserverTestCase):
want_mac = hmac.new(key=b"shared", digestmod=hashlib.sha1)
want_mac.update(nonce.encode("ascii") + b"\x00bob2\x00abc123\x00notadmin")
- want_mac = want_mac.hexdigest()
+ want_mac_str = want_mac.hexdigest()
body = {
"nonce": nonce,
"username": "bob2",
"displayname": None,
"password": "abc123",
- "mac": want_mac,
+ "mac": want_mac_str,
}
channel = self.make_request("POST", self.url, body)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual("@bob2:test", channel.json_body["user_id"])
channel = self.make_request("GET", "/profile/@bob2:test/displayname")
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual("bob2", channel.json_body["displayname"])
# displayname is empty
@@ -355,22 +356,22 @@ class UserRegisterTestCase(unittest.HomeserverTestCase):
want_mac = hmac.new(key=b"shared", digestmod=hashlib.sha1)
want_mac.update(nonce.encode("ascii") + b"\x00bob3\x00abc123\x00notadmin")
- want_mac = want_mac.hexdigest()
+ want_mac_str = want_mac.hexdigest()
body = {
"nonce": nonce,
"username": "bob3",
"displayname": "",
"password": "abc123",
- "mac": want_mac,
+ "mac": want_mac_str,
}
channel = self.make_request("POST", self.url, body)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual("@bob3:test", channel.json_body["user_id"])
channel = self.make_request("GET", "/profile/@bob3:test/displayname")
- self.assertEqual(404, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.NOT_FOUND, channel.code, msg=channel.json_body)
# set displayname
channel = self.make_request("GET", self.url)
@@ -378,22 +379,22 @@ class UserRegisterTestCase(unittest.HomeserverTestCase):
want_mac = hmac.new(key=b"shared", digestmod=hashlib.sha1)
want_mac.update(nonce.encode("ascii") + b"\x00bob4\x00abc123\x00notadmin")
- want_mac = want_mac.hexdigest()
+ want_mac_str = want_mac.hexdigest()
body = {
"nonce": nonce,
"username": "bob4",
"displayname": "Bob's Name",
"password": "abc123",
- "mac": want_mac,
+ "mac": want_mac_str,
}
channel = self.make_request("POST", self.url, body)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual("@bob4:test", channel.json_body["user_id"])
channel = self.make_request("GET", "/profile/@bob4:test/displayname")
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual("Bob's Name", channel.json_body["displayname"])
@override_config(
@@ -425,7 +426,7 @@ class UserRegisterTestCase(unittest.HomeserverTestCase):
want_mac.update(
nonce.encode("ascii") + b"\x00bob\x00abc123\x00admin\x00support"
)
- want_mac = want_mac.hexdigest()
+ want_mac_str = want_mac.hexdigest()
body = {
"nonce": nonce,
@@ -433,11 +434,11 @@ class UserRegisterTestCase(unittest.HomeserverTestCase):
"password": "abc123",
"admin": True,
"user_type": UserTypes.SUPPORT,
- "mac": want_mac,
+ "mac": want_mac_str,
}
channel = self.make_request("POST", self.url, body)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual("@bob:test", channel.json_body["user_id"])
@@ -461,7 +462,7 @@ class UsersListTestCase(unittest.HomeserverTestCase):
"""
channel = self.make_request("GET", self.url, b"{}")
- self.assertEqual(401, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.UNAUTHORIZED, channel.code, msg=channel.json_body)
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
def test_requester_is_no_admin(self):
@@ -473,7 +474,7 @@ class UsersListTestCase(unittest.HomeserverTestCase):
channel = self.make_request("GET", self.url, access_token=other_user_token)
- self.assertEqual(403, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.FORBIDDEN, channel.code, msg=channel.json_body)
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
def test_all_users(self):
@@ -489,7 +490,7 @@ class UsersListTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(3, len(channel.json_body["users"]))
self.assertEqual(3, channel.json_body["total"])
@@ -503,7 +504,7 @@ class UsersListTestCase(unittest.HomeserverTestCase):
expected_user_id: Optional[str],
search_term: str,
search_field: Optional[str] = "name",
- expected_http_code: Optional[int] = 200,
+ expected_http_code: Optional[int] = HTTPStatus.OK,
):
"""Search for a user and check that the returned user's id is a match
@@ -525,7 +526,7 @@ class UsersListTestCase(unittest.HomeserverTestCase):
)
self.assertEqual(expected_http_code, channel.code, msg=channel.json_body)
- if expected_http_code != 200:
+ if expected_http_code != HTTPStatus.OK:
return
# Check that users were returned
@@ -586,7 +587,7 @@ class UsersListTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(400, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"])
# negative from
@@ -596,7 +597,7 @@ class UsersListTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(400, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"])
# invalid guests
@@ -606,7 +607,7 @@ class UsersListTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(400, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(Codes.UNKNOWN, channel.json_body["errcode"])
# invalid deactivated
@@ -616,7 +617,7 @@ class UsersListTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(400, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(Codes.UNKNOWN, channel.json_body["errcode"])
# unkown order_by
@@ -626,7 +627,7 @@ class UsersListTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(400, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(Codes.UNKNOWN, channel.json_body["errcode"])
# invalid search order
@@ -636,7 +637,7 @@ class UsersListTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(400, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(Codes.UNKNOWN, channel.json_body["errcode"])
def test_limit(self):
@@ -654,7 +655,7 @@ class UsersListTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["total"], number_users)
self.assertEqual(len(channel.json_body["users"]), 5)
self.assertEqual(channel.json_body["next_token"], "5")
@@ -675,7 +676,7 @@ class UsersListTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["total"], number_users)
self.assertEqual(len(channel.json_body["users"]), 15)
self.assertNotIn("next_token", channel.json_body)
@@ -696,7 +697,7 @@ class UsersListTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["total"], number_users)
self.assertEqual(channel.json_body["next_token"], "15")
self.assertEqual(len(channel.json_body["users"]), 10)
@@ -719,7 +720,7 @@ class UsersListTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["total"], number_users)
self.assertEqual(len(channel.json_body["users"]), number_users)
self.assertNotIn("next_token", channel.json_body)
@@ -732,7 +733,7 @@ class UsersListTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["total"], number_users)
self.assertEqual(len(channel.json_body["users"]), number_users)
self.assertNotIn("next_token", channel.json_body)
@@ -745,7 +746,7 @@ class UsersListTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["total"], number_users)
self.assertEqual(len(channel.json_body["users"]), 19)
self.assertEqual(channel.json_body["next_token"], "19")
@@ -759,7 +760,7 @@ class UsersListTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["total"], number_users)
self.assertEqual(len(channel.json_body["users"]), 1)
self.assertNotIn("next_token", channel.json_body)
@@ -862,14 +863,14 @@ class UsersListTestCase(unittest.HomeserverTestCase):
url,
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["total"], len(expected_user_list))
returned_order = [row["name"] for row in channel.json_body["users"]]
self.assertEqual(expected_user_list, returned_order)
self._check_fields(channel.json_body["users"])
- def _check_fields(self, content: JsonDict):
+ def _check_fields(self, content: List[JsonDict]):
"""Checks that the expected user attributes are present in content
Args:
content: List that is checked for content
@@ -936,7 +937,7 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase):
"""
channel = self.make_request("POST", self.url, b"{}")
- self.assertEqual(401, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.UNAUTHORIZED, channel.code, msg=channel.json_body)
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
def test_requester_is_not_admin(self):
@@ -947,7 +948,7 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase):
channel = self.make_request("POST", url, access_token=self.other_user_token)
- self.assertEqual(403, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.FORBIDDEN, channel.code, msg=channel.json_body)
self.assertEqual("You are not a server admin", channel.json_body["error"])
channel = self.make_request(
@@ -957,12 +958,12 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase):
content=b"{}",
)
- self.assertEqual(403, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.FORBIDDEN, channel.code, msg=channel.json_body)
self.assertEqual("You are not a server admin", channel.json_body["error"])
def test_user_does_not_exist(self):
"""
- Tests that deactivation for a user that does not exist returns a 404
+ Tests that deactivation for a user that does not exist returns a HTTPStatus.NOT_FOUND
"""
channel = self.make_request(
@@ -971,7 +972,7 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(404, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.NOT_FOUND, channel.code, msg=channel.json_body)
self.assertEqual(Codes.NOT_FOUND, channel.json_body["errcode"])
def test_erase_is_not_bool(self):
@@ -986,18 +987,18 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(400, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(Codes.BAD_JSON, channel.json_body["errcode"])
def test_user_is_not_local(self):
"""
- Tests that deactivation for a user that is not a local returns a 400
+ Tests that deactivation for a user that is not a local returns a HTTPStatus.BAD_REQUEST
"""
url = "/_synapse/admin/v1/deactivate/@unknown_person:unknown_domain"
channel = self.make_request("POST", url, access_token=self.admin_user_tok)
- self.assertEqual(400, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual("Can only deactivate local users", channel.json_body["error"])
def test_deactivate_user_erase_true(self):
@@ -1012,7 +1013,7 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual("@user:test", channel.json_body["name"])
self.assertEqual(False, channel.json_body["deactivated"])
self.assertEqual("foo@bar.com", channel.json_body["threepids"][0]["address"])
@@ -1027,7 +1028,7 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase):
content={"erase": True},
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
# Get user
channel = self.make_request(
@@ -1036,7 +1037,7 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual("@user:test", channel.json_body["name"])
self.assertEqual(True, channel.json_body["deactivated"])
self.assertEqual(0, len(channel.json_body["threepids"]))
@@ -1057,7 +1058,7 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual("@user:test", channel.json_body["name"])
self.assertEqual(False, channel.json_body["deactivated"])
self.assertEqual("foo@bar.com", channel.json_body["threepids"][0]["address"])
@@ -1072,7 +1073,7 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase):
content={"erase": False},
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
# Get user
channel = self.make_request(
@@ -1081,7 +1082,7 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual("@user:test", channel.json_body["name"])
self.assertEqual(True, channel.json_body["deactivated"])
self.assertEqual(0, len(channel.json_body["threepids"]))
@@ -1111,7 +1112,7 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual("@user:test", channel.json_body["name"])
self.assertEqual(False, channel.json_body["deactivated"])
self.assertEqual("foo@bar.com", channel.json_body["threepids"][0]["address"])
@@ -1126,7 +1127,7 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase):
content={"erase": True},
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
# Get user
channel = self.make_request(
@@ -1135,7 +1136,7 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual("@user:test", channel.json_body["name"])
self.assertEqual(True, channel.json_body["deactivated"])
self.assertEqual(0, len(channel.json_body["threepids"]))
@@ -1195,7 +1196,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
access_token=self.other_user_token,
)
- self.assertEqual(403, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.FORBIDDEN, channel.code, msg=channel.json_body)
self.assertEqual("You are not a server admin", channel.json_body["error"])
channel = self.make_request(
@@ -1205,12 +1206,12 @@ class UserRestTestCase(unittest.HomeserverTestCase):
content=b"{}",
)
- self.assertEqual(403, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.FORBIDDEN, channel.code, msg=channel.json_body)
self.assertEqual("You are not a server admin", channel.json_body["error"])
def test_user_does_not_exist(self):
"""
- Tests that a lookup for a user that does not exist returns a 404
+ Tests that a lookup for a user that does not exist returns a HTTPStatus.NOT_FOUND
"""
channel = self.make_request(
@@ -1219,7 +1220,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(404, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.NOT_FOUND, channel.code, msg=channel.json_body)
self.assertEqual("M_NOT_FOUND", channel.json_body["errcode"])
def test_invalid_parameter(self):
@@ -1234,7 +1235,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
content={"admin": "not_bool"},
)
- self.assertEqual(400, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(Codes.BAD_JSON, channel.json_body["errcode"])
# deactivated not bool
@@ -1244,7 +1245,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
content={"deactivated": "not_bool"},
)
- self.assertEqual(400, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(Codes.UNKNOWN, channel.json_body["errcode"])
# password not str
@@ -1254,7 +1255,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
content={"password": True},
)
- self.assertEqual(400, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(Codes.UNKNOWN, channel.json_body["errcode"])
# password not length
@@ -1264,7 +1265,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
content={"password": "x" * 513},
)
- self.assertEqual(400, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(Codes.UNKNOWN, channel.json_body["errcode"])
# user_type not valid
@@ -1274,7 +1275,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
content={"user_type": "new type"},
)
- self.assertEqual(400, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(Codes.UNKNOWN, channel.json_body["errcode"])
# external_ids not valid
@@ -1286,7 +1287,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
"external_ids": {"auth_provider": "prov", "wrong_external_id": "id"}
},
)
- self.assertEqual(400, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(Codes.MISSING_PARAM, channel.json_body["errcode"])
channel = self.make_request(
@@ -1295,7 +1296,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
content={"external_ids": {"external_id": "id"}},
)
- self.assertEqual(400, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(Codes.MISSING_PARAM, channel.json_body["errcode"])
# threepids not valid
@@ -1305,7 +1306,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
content={"threepids": {"medium": "email", "wrong_address": "id"}},
)
- self.assertEqual(400, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(Codes.MISSING_PARAM, channel.json_body["errcode"])
channel = self.make_request(
@@ -1314,7 +1315,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
content={"threepids": {"address": "value"}},
)
- self.assertEqual(400, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(Codes.MISSING_PARAM, channel.json_body["errcode"])
def test_get_user(self):
@@ -1327,7 +1328,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual("@user:test", channel.json_body["name"])
self.assertEqual("User", channel.json_body["displayname"])
self._check_fields(channel.json_body)
@@ -1370,7 +1371,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual("@bob:test", channel.json_body["name"])
self.assertEqual("Bob's name", channel.json_body["displayname"])
self.assertEqual("email", channel.json_body["threepids"][0]["medium"])
@@ -1433,7 +1434,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual("@bob:test", channel.json_body["name"])
self.assertEqual("Bob's name", channel.json_body["displayname"])
self.assertEqual("email", channel.json_body["threepids"][0]["medium"])
@@ -1461,9 +1462,9 @@ class UserRestTestCase(unittest.HomeserverTestCase):
# before limit of monthly active users is reached
channel = self.make_request("GET", "/sync", access_token=self.admin_user_tok)
- if channel.code != 200:
+ if channel.code != HTTPStatus.OK:
raise HttpResponseException(
- channel.code, channel.result["reason"], channel.result["body"]
+ channel.code, channel.result["reason"], channel.json_body
)
# Set monthly active users to the limit
@@ -1625,7 +1626,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
content={"password": "hahaha"},
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self._check_fields(channel.json_body)
def test_set_displayname(self):
@@ -1641,7 +1642,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
content={"displayname": "foobar"},
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual("@user:test", channel.json_body["name"])
self.assertEqual("foobar", channel.json_body["displayname"])
@@ -1652,7 +1653,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual("@user:test", channel.json_body["name"])
self.assertEqual("foobar", channel.json_body["displayname"])
@@ -1674,7 +1675,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
},
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual("@user:test", channel.json_body["name"])
self.assertEqual(2, len(channel.json_body["threepids"]))
# result does not always have the same sort order, therefore it becomes sorted
@@ -1700,7 +1701,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
},
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual("@user:test", channel.json_body["name"])
self.assertEqual(2, len(channel.json_body["threepids"]))
self.assertEqual("email", channel.json_body["threepids"][0]["medium"])
@@ -1716,7 +1717,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual("@user:test", channel.json_body["name"])
self.assertEqual(2, len(channel.json_body["threepids"]))
self.assertEqual("email", channel.json_body["threepids"][0]["medium"])
@@ -1732,7 +1733,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
content={"threepids": []},
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual("@user:test", channel.json_body["name"])
self.assertEqual(0, len(channel.json_body["threepids"]))
self._check_fields(channel.json_body)
@@ -1759,7 +1760,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
},
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(first_user, channel.json_body["name"])
self.assertEqual(1, len(channel.json_body["threepids"]))
self.assertEqual("email", channel.json_body["threepids"][0]["medium"])
@@ -1778,7 +1779,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
},
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual("@user:test", channel.json_body["name"])
self.assertEqual(1, len(channel.json_body["threepids"]))
self.assertEqual("email", channel.json_body["threepids"][0]["medium"])
@@ -1800,7 +1801,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
)
# other user has this two threepids
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual("@user:test", channel.json_body["name"])
self.assertEqual(2, len(channel.json_body["threepids"]))
# result does not always have the same sort order, therefore it becomes sorted
@@ -1819,7 +1820,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
url_first_user,
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(first_user, channel.json_body["name"])
self.assertEqual(0, len(channel.json_body["threepids"]))
self._check_fields(channel.json_body)
@@ -1848,7 +1849,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
},
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual("@user:test", channel.json_body["name"])
self.assertEqual(2, len(channel.json_body["external_ids"]))
# result does not always have the same sort order, therefore it becomes sorted
@@ -1880,7 +1881,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
},
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual("@user:test", channel.json_body["name"])
self.assertEqual(2, len(channel.json_body["external_ids"]))
self.assertEqual(
@@ -1899,7 +1900,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual("@user:test", channel.json_body["name"])
self.assertEqual(2, len(channel.json_body["external_ids"]))
self.assertEqual(
@@ -1918,7 +1919,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
content={"external_ids": []},
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual("@user:test", channel.json_body["name"])
self.assertEqual(0, len(channel.json_body["external_ids"]))
@@ -1947,7 +1948,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
},
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(first_user, channel.json_body["name"])
self.assertEqual(1, len(channel.json_body["external_ids"]))
self.assertEqual(
@@ -1973,7 +1974,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
},
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual("@user:test", channel.json_body["name"])
self.assertEqual(1, len(channel.json_body["external_ids"]))
self.assertEqual(
@@ -2005,7 +2006,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
)
# must fail
- self.assertEqual(409, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.CONFLICT, channel.code, msg=channel.json_body)
self.assertEqual(Codes.UNKNOWN, channel.json_body["errcode"])
self.assertEqual("External id is already in use.", channel.json_body["error"])
@@ -2016,7 +2017,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual("@user:test", channel.json_body["name"])
self.assertEqual(1, len(channel.json_body["external_ids"]))
self.assertEqual(
@@ -2034,7 +2035,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(first_user, channel.json_body["name"])
self.assertEqual(1, len(channel.json_body["external_ids"]))
self.assertEqual(
@@ -2065,7 +2066,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual("@user:test", channel.json_body["name"])
self.assertFalse(channel.json_body["deactivated"])
self.assertEqual("foo@bar.com", channel.json_body["threepids"][0]["address"])
@@ -2080,7 +2081,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
content={"deactivated": True},
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual("@user:test", channel.json_body["name"])
self.assertTrue(channel.json_body["deactivated"])
self.assertIsNone(channel.json_body["password_hash"])
@@ -2096,7 +2097,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual("@user:test", channel.json_body["name"])
self.assertTrue(channel.json_body["deactivated"])
self.assertIsNone(channel.json_body["password_hash"])
@@ -2123,7 +2124,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
content={"deactivated": True},
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual("@user:test", channel.json_body["name"])
self.assertTrue(channel.json_body["deactivated"])
@@ -2139,7 +2140,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
content={"displayname": "Foobar"},
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual("@user:test", channel.json_body["name"])
self.assertTrue(channel.json_body["deactivated"])
self.assertEqual("Foobar", channel.json_body["displayname"])
@@ -2163,7 +2164,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
content={"deactivated": False},
)
- self.assertEqual(400, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
# Reactivate the user.
channel = self.make_request(
@@ -2172,7 +2173,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
content={"deactivated": False, "password": "foo"},
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual("@user:test", channel.json_body["name"])
self.assertFalse(channel.json_body["deactivated"])
self.assertIsNotNone(channel.json_body["password_hash"])
@@ -2194,7 +2195,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
content={"deactivated": False, "password": "foo"},
)
- self.assertEqual(403, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.FORBIDDEN, channel.code, msg=channel.json_body)
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
# Reactivate the user without a password.
@@ -2204,7 +2205,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
content={"deactivated": False},
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual("@user:test", channel.json_body["name"])
self.assertFalse(channel.json_body["deactivated"])
self.assertIsNone(channel.json_body["password_hash"])
@@ -2226,7 +2227,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
content={"deactivated": False, "password": "foo"},
)
- self.assertEqual(403, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.FORBIDDEN, channel.code, msg=channel.json_body)
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
# Reactivate the user without a password.
@@ -2236,7 +2237,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
content={"deactivated": False},
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual("@user:test", channel.json_body["name"])
self.assertFalse(channel.json_body["deactivated"])
self.assertIsNone(channel.json_body["password_hash"])
@@ -2255,7 +2256,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
content={"admin": True},
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual("@user:test", channel.json_body["name"])
self.assertTrue(channel.json_body["admin"])
@@ -2266,7 +2267,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual("@user:test", channel.json_body["name"])
self.assertTrue(channel.json_body["admin"])
@@ -2283,7 +2284,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
content={"user_type": UserTypes.SUPPORT},
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual("@user:test", channel.json_body["name"])
self.assertEqual(UserTypes.SUPPORT, channel.json_body["user_type"])
@@ -2294,7 +2295,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual("@user:test", channel.json_body["name"])
self.assertEqual(UserTypes.SUPPORT, channel.json_body["user_type"])
@@ -2306,7 +2307,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
content={"user_type": None},
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual("@user:test", channel.json_body["name"])
self.assertIsNone(channel.json_body["user_type"])
@@ -2317,7 +2318,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual("@user:test", channel.json_body["name"])
self.assertIsNone(channel.json_body["user_type"])
@@ -2347,7 +2348,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual("@bob:test", channel.json_body["name"])
self.assertEqual("bob", channel.json_body["displayname"])
self.assertEqual(0, channel.json_body["deactivated"])
@@ -2360,7 +2361,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
content={"password": "abc123", "deactivated": "false"},
)
- self.assertEqual(400, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
# Check user is not deactivated
channel = self.make_request(
@@ -2369,7 +2370,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual("@bob:test", channel.json_body["name"])
self.assertEqual("bob", channel.json_body["displayname"])
@@ -2394,7 +2395,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
content={"deactivated": True},
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertTrue(channel.json_body["deactivated"])
self.assertIsNone(channel.json_body["password_hash"])
self._is_erased(user_id, False)
@@ -2445,7 +2446,7 @@ class UserMembershipRestTestCase(unittest.HomeserverTestCase):
"""
channel = self.make_request("GET", self.url, b"{}")
- self.assertEqual(401, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.UNAUTHORIZED, channel.code, msg=channel.json_body)
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
def test_requester_is_no_admin(self):
@@ -2460,7 +2461,7 @@ class UserMembershipRestTestCase(unittest.HomeserverTestCase):
access_token=other_user_token,
)
- self.assertEqual(403, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.FORBIDDEN, channel.code, msg=channel.json_body)
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
def test_user_does_not_exist(self):
@@ -2474,7 +2475,7 @@ class UserMembershipRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(0, channel.json_body["total"])
self.assertEqual(0, len(channel.json_body["joined_rooms"]))
@@ -2490,7 +2491,7 @@ class UserMembershipRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(0, channel.json_body["total"])
self.assertEqual(0, len(channel.json_body["joined_rooms"]))
@@ -2506,7 +2507,7 @@ class UserMembershipRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(0, channel.json_body["total"])
self.assertEqual(0, len(channel.json_body["joined_rooms"]))
@@ -2527,7 +2528,7 @@ class UserMembershipRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(number_rooms, channel.json_body["total"])
self.assertEqual(number_rooms, len(channel.json_body["joined_rooms"]))
@@ -2574,7 +2575,7 @@ class UserMembershipRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(1, channel.json_body["total"])
self.assertEqual([local_and_remote_room_id], channel.json_body["joined_rooms"])
@@ -2603,7 +2604,7 @@ class PushersRestTestCase(unittest.HomeserverTestCase):
"""
channel = self.make_request("GET", self.url, b"{}")
- self.assertEqual(401, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.UNAUTHORIZED, channel.code, msg=channel.json_body)
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
def test_requester_is_no_admin(self):
@@ -2618,12 +2619,12 @@ class PushersRestTestCase(unittest.HomeserverTestCase):
access_token=other_user_token,
)
- self.assertEqual(403, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.FORBIDDEN, channel.code, msg=channel.json_body)
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
def test_user_does_not_exist(self):
"""
- Tests that a lookup for a user that does not exist returns a 404
+ Tests that a lookup for a user that does not exist returns a HTTPStatus.NOT_FOUND
"""
url = "/_synapse/admin/v1/users/@unknown_person:test/pushers"
channel = self.make_request(
@@ -2632,12 +2633,12 @@ class PushersRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(404, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.NOT_FOUND, channel.code, msg=channel.json_body)
self.assertEqual(Codes.NOT_FOUND, channel.json_body["errcode"])
def test_user_is_not_local(self):
"""
- Tests that a lookup for a user that is not a local returns a 400
+ Tests that a lookup for a user that is not a local returns a HTTPStatus.BAD_REQUEST
"""
url = "/_synapse/admin/v1/users/@unknown_person:unknown_domain/pushers"
@@ -2647,7 +2648,7 @@ class PushersRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(400, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual("Can only look up local users", channel.json_body["error"])
def test_get_pushers(self):
@@ -2662,7 +2663,7 @@ class PushersRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(0, channel.json_body["total"])
# Register the pusher
@@ -2693,7 +2694,7 @@ class PushersRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(1, channel.json_body["total"])
for p in channel.json_body["pushers"]:
@@ -2732,7 +2733,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase):
"""Try to list media of an user without authentication."""
channel = self.make_request(method, self.url, {})
- self.assertEqual(401, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.UNAUTHORIZED, channel.code, msg=channel.json_body)
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
@parameterized.expand(["GET", "DELETE"])
@@ -2746,12 +2747,12 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase):
access_token=other_user_token,
)
- self.assertEqual(403, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.FORBIDDEN, channel.code, msg=channel.json_body)
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
@parameterized.expand(["GET", "DELETE"])
def test_user_does_not_exist(self, method: str):
- """Tests that a lookup for a user that does not exist returns a 404"""
+ """Tests that a lookup for a user that does not exist returns a HTTPStatus.NOT_FOUND"""
url = "/_synapse/admin/v1/users/@unknown_person:test/media"
channel = self.make_request(
method,
@@ -2759,12 +2760,12 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(404, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.NOT_FOUND, channel.code, msg=channel.json_body)
self.assertEqual(Codes.NOT_FOUND, channel.json_body["errcode"])
@parameterized.expand(["GET", "DELETE"])
def test_user_is_not_local(self, method: str):
- """Tests that a lookup for a user that is not a local returns a 400"""
+ """Tests that a lookup for a user that is not a local returns a HTTPStatus.BAD_REQUEST"""
url = "/_synapse/admin/v1/users/@unknown_person:unknown_domain/media"
channel = self.make_request(
@@ -2773,7 +2774,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(400, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual("Can only look up local users", channel.json_body["error"])
def test_limit_GET(self):
@@ -2789,7 +2790,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["total"], number_media)
self.assertEqual(len(channel.json_body["media"]), 5)
self.assertEqual(channel.json_body["next_token"], 5)
@@ -2808,7 +2809,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["total"], 5)
self.assertEqual(len(channel.json_body["deleted_media"]), 5)
@@ -2825,7 +2826,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["total"], number_media)
self.assertEqual(len(channel.json_body["media"]), 15)
self.assertNotIn("next_token", channel.json_body)
@@ -2844,7 +2845,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["total"], 15)
self.assertEqual(len(channel.json_body["deleted_media"]), 15)
@@ -2861,7 +2862,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["total"], number_media)
self.assertEqual(channel.json_body["next_token"], 15)
self.assertEqual(len(channel.json_body["media"]), 10)
@@ -2880,7 +2881,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["total"], 10)
self.assertEqual(len(channel.json_body["deleted_media"]), 10)
@@ -2894,7 +2895,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(400, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(Codes.UNKNOWN, channel.json_body["errcode"])
# invalid search order
@@ -2904,7 +2905,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(400, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(Codes.UNKNOWN, channel.json_body["errcode"])
# negative limit
@@ -2914,7 +2915,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(400, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"])
# negative from
@@ -2924,7 +2925,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(400, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"])
def test_next_token(self):
@@ -2947,7 +2948,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["total"], number_media)
self.assertEqual(len(channel.json_body["media"]), number_media)
self.assertNotIn("next_token", channel.json_body)
@@ -2960,7 +2961,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["total"], number_media)
self.assertEqual(len(channel.json_body["media"]), number_media)
self.assertNotIn("next_token", channel.json_body)
@@ -2973,7 +2974,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["total"], number_media)
self.assertEqual(len(channel.json_body["media"]), 19)
self.assertEqual(channel.json_body["next_token"], 19)
@@ -2987,7 +2988,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["total"], number_media)
self.assertEqual(len(channel.json_body["media"]), 1)
self.assertNotIn("next_token", channel.json_body)
@@ -3004,7 +3005,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(0, channel.json_body["total"])
self.assertEqual(0, len(channel.json_body["media"]))
@@ -3019,7 +3020,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(0, channel.json_body["total"])
self.assertEqual(0, len(channel.json_body["deleted_media"]))
@@ -3036,7 +3037,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(number_media, channel.json_body["total"])
self.assertEqual(number_media, len(channel.json_body["media"]))
self.assertNotIn("next_token", channel.json_body)
@@ -3062,7 +3063,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(number_media, channel.json_body["total"])
self.assertEqual(number_media, len(channel.json_body["deleted_media"]))
self.assertCountEqual(channel.json_body["deleted_media"], media_ids)
@@ -3207,7 +3208,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase):
# Upload some media into the room
response = self.helper.upload_media(
- upload_resource, image_data, user_token, filename, expect_code=200
+ upload_resource, image_data, user_token, filename, expect_code=HTTPStatus.OK
)
# Extract media ID from the response
@@ -3225,16 +3226,16 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase):
)
self.assertEqual(
- 200,
+ HTTPStatus.OK,
channel.code,
msg=(
- f"Expected to receive a 200 on accessing media: {server_and_media_id}"
+ f"Expected to receive a HTTPStatus.OK on accessing media: {server_and_media_id}"
),
)
return media_id
- def _check_fields(self, content: JsonDict):
+ def _check_fields(self, content: List[JsonDict]):
"""Checks that the expected user attributes are present in content
Args:
content: List that is checked for content
@@ -3274,7 +3275,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase):
url,
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(channel.json_body["total"], len(expected_media_list))
returned_order = [row["media_id"] for row in channel.json_body["media"]]
@@ -3310,14 +3311,14 @@ class UserTokenRestTestCase(unittest.HomeserverTestCase):
channel = self.make_request(
"POST", self.url, b"{}", access_token=self.admin_user_tok
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
return channel.json_body["access_token"]
def test_no_auth(self):
"""Try to login as a user without authentication."""
channel = self.make_request("POST", self.url, b"{}")
- self.assertEqual(401, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.UNAUTHORIZED, channel.code, msg=channel.json_body)
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
def test_not_admin(self):
@@ -3326,7 +3327,7 @@ class UserTokenRestTestCase(unittest.HomeserverTestCase):
"POST", self.url, b"{}", access_token=self.other_user_tok
)
- self.assertEqual(403, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.FORBIDDEN, channel.code, msg=channel.json_body)
def test_send_event(self):
"""Test that sending event as a user works."""
@@ -3351,7 +3352,7 @@ class UserTokenRestTestCase(unittest.HomeserverTestCase):
channel = self.make_request(
"GET", "devices", b"{}", access_token=self.other_user_tok
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
# We should only see the one device (from the login in `prepare`)
self.assertEqual(len(channel.json_body["devices"]), 1)
@@ -3363,21 +3364,21 @@ class UserTokenRestTestCase(unittest.HomeserverTestCase):
# Test that we can successfully make a request
channel = self.make_request("GET", "devices", b"{}", access_token=puppet_token)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
# Logout with the puppet token
channel = self.make_request("POST", "logout", b"{}", access_token=puppet_token)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
# The puppet token should no longer work
channel = self.make_request("GET", "devices", b"{}", access_token=puppet_token)
- self.assertEqual(401, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.UNAUTHORIZED, channel.code, msg=channel.json_body)
# .. but the real user's tokens should still work
channel = self.make_request(
"GET", "devices", b"{}", access_token=self.other_user_tok
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
def test_user_logout_all(self):
"""Tests that the target user calling `/logout/all` does *not* expire
@@ -3388,23 +3389,23 @@ class UserTokenRestTestCase(unittest.HomeserverTestCase):
# Test that we can successfully make a request
channel = self.make_request("GET", "devices", b"{}", access_token=puppet_token)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
# Logout all with the real user token
channel = self.make_request(
"POST", "logout/all", b"{}", access_token=self.other_user_tok
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
# The puppet token should still work
channel = self.make_request("GET", "devices", b"{}", access_token=puppet_token)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
# .. but the real user's tokens shouldn't
channel = self.make_request(
"GET", "devices", b"{}", access_token=self.other_user_tok
)
- self.assertEqual(401, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.UNAUTHORIZED, channel.code, msg=channel.json_body)
def test_admin_logout_all(self):
"""Tests that the admin user calling `/logout/all` does expire the
@@ -3415,23 +3416,23 @@ class UserTokenRestTestCase(unittest.HomeserverTestCase):
# Test that we can successfully make a request
channel = self.make_request("GET", "devices", b"{}", access_token=puppet_token)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
# Logout all with the admin user token
channel = self.make_request(
"POST", "logout/all", b"{}", access_token=self.admin_user_tok
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
# The puppet token should no longer work
channel = self.make_request("GET", "devices", b"{}", access_token=puppet_token)
- self.assertEqual(401, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.UNAUTHORIZED, channel.code, msg=channel.json_body)
# .. but the real user's tokens should still work
channel = self.make_request(
"GET", "devices", b"{}", access_token=self.other_user_tok
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
@unittest.override_config(
{
@@ -3459,7 +3460,10 @@ class UserTokenRestTestCase(unittest.HomeserverTestCase):
# Now unaccept it and check that we can't send an event
self.get_success(self.store.user_set_consent_version(self.other_user, "0.0"))
self.helper.send_event(
- room_id, "com.example.test", tok=self.other_user_tok, expect_code=403
+ room_id,
+ "com.example.test",
+ tok=self.other_user_tok,
+ expect_code=HTTPStatus.FORBIDDEN,
)
# Login in as the user
@@ -3477,7 +3481,10 @@ class UserTokenRestTestCase(unittest.HomeserverTestCase):
# Trying to join as the other user should fail due to reaching MAU limit.
self.helper.join(
- room_id, user=self.other_user, tok=self.other_user_tok, expect_code=403
+ room_id,
+ user=self.other_user,
+ tok=self.other_user_tok,
+ expect_code=HTTPStatus.FORBIDDEN,
)
# Logging in as the other user and joining a room should work, even
@@ -3512,7 +3519,7 @@ class WhoisRestTestCase(unittest.HomeserverTestCase):
Try to get information of an user without authentication.
"""
channel = self.make_request("GET", self.url, b"{}")
- self.assertEqual(401, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.UNAUTHORIZED, channel.code, msg=channel.json_body)
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
def test_requester_is_not_admin(self):
@@ -3527,12 +3534,12 @@ class WhoisRestTestCase(unittest.HomeserverTestCase):
self.url,
access_token=other_user2_token,
)
- self.assertEqual(403, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.FORBIDDEN, channel.code, msg=channel.json_body)
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
def test_user_is_not_local(self):
"""
- Tests that a lookup for a user that is not a local returns a 400
+ Tests that a lookup for a user that is not a local returns a HTTPStatus.BAD_REQUEST
"""
url = self.url_prefix % "@unknown_person:unknown_domain"
@@ -3541,7 +3548,7 @@ class WhoisRestTestCase(unittest.HomeserverTestCase):
url,
access_token=self.admin_user_tok,
)
- self.assertEqual(400, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual("Can only whois a local user", channel.json_body["error"])
def test_get_whois_admin(self):
@@ -3553,7 +3560,7 @@ class WhoisRestTestCase(unittest.HomeserverTestCase):
self.url,
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(self.other_user, channel.json_body["user_id"])
self.assertIn("devices", channel.json_body)
@@ -3568,7 +3575,7 @@ class WhoisRestTestCase(unittest.HomeserverTestCase):
self.url,
access_token=other_user_token,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(self.other_user, channel.json_body["user_id"])
self.assertIn("devices", channel.json_body)
@@ -3598,7 +3605,7 @@ class ShadowBanRestTestCase(unittest.HomeserverTestCase):
Try to get information of an user without authentication.
"""
channel = self.make_request(method, self.url)
- self.assertEqual(401, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.UNAUTHORIZED, channel.code, msg=channel.json_body)
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
@parameterized.expand(["POST", "DELETE"])
@@ -3609,18 +3616,18 @@ class ShadowBanRestTestCase(unittest.HomeserverTestCase):
other_user_token = self.login("user", "pass")
channel = self.make_request(method, self.url, access_token=other_user_token)
- self.assertEqual(403, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.FORBIDDEN, channel.code, msg=channel.json_body)
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
@parameterized.expand(["POST", "DELETE"])
def test_user_is_not_local(self, method: str):
"""
- Tests that shadow-banning for a user that is not a local returns a 400
+ Tests that shadow-banning for a user that is not a local returns a HTTPStatus.BAD_REQUEST
"""
url = "/_synapse/admin/v1/whois/@unknown_person:unknown_domain"
channel = self.make_request(method, url, access_token=self.admin_user_tok)
- self.assertEqual(400, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
def test_success(self):
"""
@@ -3632,7 +3639,7 @@ class ShadowBanRestTestCase(unittest.HomeserverTestCase):
self.assertFalse(result.shadow_banned)
channel = self.make_request("POST", self.url, access_token=self.admin_user_tok)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual({}, channel.json_body)
# Ensure the user is shadow-banned (and the cache was cleared).
@@ -3643,7 +3650,7 @@ class ShadowBanRestTestCase(unittest.HomeserverTestCase):
channel = self.make_request(
"DELETE", self.url, access_token=self.admin_user_tok
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual({}, channel.json_body)
# Ensure the user is no longer shadow-banned (and the cache was cleared).
@@ -3677,7 +3684,7 @@ class RateLimitTestCase(unittest.HomeserverTestCase):
"""
channel = self.make_request(method, self.url, b"{}")
- self.assertEqual(401, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.UNAUTHORIZED, channel.code, msg=channel.json_body)
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
@parameterized.expand(["GET", "POST", "DELETE"])
@@ -3693,13 +3700,13 @@ class RateLimitTestCase(unittest.HomeserverTestCase):
access_token=other_user_token,
)
- self.assertEqual(403, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.FORBIDDEN, channel.code, msg=channel.json_body)
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
@parameterized.expand(["GET", "POST", "DELETE"])
def test_user_does_not_exist(self, method: str):
"""
- Tests that a lookup for a user that does not exist returns a 404
+ Tests that a lookup for a user that does not exist returns a HTTPStatus.NOT_FOUND
"""
url = "/_synapse/admin/v1/users/@unknown_person:test/override_ratelimit"
@@ -3709,7 +3716,7 @@ class RateLimitTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(404, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.NOT_FOUND, channel.code, msg=channel.json_body)
self.assertEqual(Codes.NOT_FOUND, channel.json_body["errcode"])
@parameterized.expand(
@@ -3721,7 +3728,7 @@ class RateLimitTestCase(unittest.HomeserverTestCase):
)
def test_user_is_not_local(self, method: str, error_msg: str):
"""
- Tests that a lookup for a user that is not a local returns a 400
+ Tests that a lookup for a user that is not a local returns a HTTPStatus.BAD_REQUEST
"""
url = (
"/_synapse/admin/v1/users/@unknown_person:unknown_domain/override_ratelimit"
@@ -3733,7 +3740,7 @@ class RateLimitTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
)
- self.assertEqual(400, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(error_msg, channel.json_body["error"])
def test_invalid_parameter(self):
@@ -3748,7 +3755,7 @@ class RateLimitTestCase(unittest.HomeserverTestCase):
content={"messages_per_second": "string"},
)
- self.assertEqual(400, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"])
# messages_per_second is negative
@@ -3759,7 +3766,7 @@ class RateLimitTestCase(unittest.HomeserverTestCase):
content={"messages_per_second": -1},
)
- self.assertEqual(400, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"])
# burst_count is a string
@@ -3770,7 +3777,7 @@ class RateLimitTestCase(unittest.HomeserverTestCase):
content={"burst_count": "string"},
)
- self.assertEqual(400, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"])
# burst_count is negative
@@ -3781,7 +3788,7 @@ class RateLimitTestCase(unittest.HomeserverTestCase):
content={"burst_count": -1},
)
- self.assertEqual(400, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"])
def test_return_zero_when_null(self):
@@ -3806,7 +3813,7 @@ class RateLimitTestCase(unittest.HomeserverTestCase):
self.url,
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(0, channel.json_body["messages_per_second"])
self.assertEqual(0, channel.json_body["burst_count"])
@@ -3820,7 +3827,7 @@ class RateLimitTestCase(unittest.HomeserverTestCase):
self.url,
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertNotIn("messages_per_second", channel.json_body)
self.assertNotIn("burst_count", channel.json_body)
@@ -3831,7 +3838,7 @@ class RateLimitTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
content={"messages_per_second": 10, "burst_count": 11},
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(10, channel.json_body["messages_per_second"])
self.assertEqual(11, channel.json_body["burst_count"])
@@ -3842,7 +3849,7 @@ class RateLimitTestCase(unittest.HomeserverTestCase):
access_token=self.admin_user_tok,
content={"messages_per_second": 20, "burst_count": 21},
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(20, channel.json_body["messages_per_second"])
self.assertEqual(21, channel.json_body["burst_count"])
@@ -3852,7 +3859,7 @@ class RateLimitTestCase(unittest.HomeserverTestCase):
self.url,
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertEqual(20, channel.json_body["messages_per_second"])
self.assertEqual(21, channel.json_body["burst_count"])
@@ -3862,7 +3869,7 @@ class RateLimitTestCase(unittest.HomeserverTestCase):
self.url,
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertNotIn("messages_per_second", channel.json_body)
self.assertNotIn("burst_count", channel.json_body)
@@ -3872,6 +3879,6 @@ class RateLimitTestCase(unittest.HomeserverTestCase):
self.url,
access_token=self.admin_user_tok,
)
- self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertNotIn("messages_per_second", channel.json_body)
self.assertNotIn("burst_count", channel.json_body)
diff --git a/tests/rest/admin/test_username_available.py b/tests/rest/admin/test_username_available.py
index 4e1c49c28b..7978626e71 100644
--- a/tests/rest/admin/test_username_available.py
+++ b/tests/rest/admin/test_username_available.py
@@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from http import HTTPStatus
+
import synapse.rest.admin
from synapse.api.errors import Codes, SynapseError
from synapse.rest.client import login
@@ -33,30 +35,38 @@ class UsernameAvailableTestCase(unittest.HomeserverTestCase):
async def check_username(username):
if username == "allowed":
return True
- raise SynapseError(400, "User ID already taken.", errcode=Codes.USER_IN_USE)
+ raise SynapseError(
+ HTTPStatus.BAD_REQUEST,
+ "User ID already taken.",
+ errcode=Codes.USER_IN_USE,
+ )
handler = self.hs.get_registration_handler()
handler.check_username = check_username
def test_username_available(self):
"""
- The endpoint should return a 200 response if the username does not exist
+ The endpoint should return a HTTPStatus.OK response if the username does not exist
"""
url = "%s?username=%s" % (self.url, "allowed")
channel = self.make_request("GET", url, None, self.admin_user_tok)
- self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
self.assertTrue(channel.json_body["available"])
def test_username_unavailable(self):
"""
- The endpoint should return a 200 response if the username does not exist
+ The endpoint should return a HTTPStatus.OK response if the username does not exist
"""
url = "%s?username=%s" % (self.url, "disallowed")
channel = self.make_request("GET", url, None, self.admin_user_tok)
- self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(
+ HTTPStatus.BAD_REQUEST,
+ channel.code,
+ msg=channel.json_body,
+ )
self.assertEqual(channel.json_body["errcode"], "M_USER_IN_USE")
self.assertEqual(channel.json_body["error"], "User ID already taken.")
|