diff --git a/tests/rest/admin/test_media.py b/tests/rest/admin/test_media.py
index f741121ea2..6fee0f95b6 100644
--- a/tests/rest/admin/test_media.py
+++ b/tests/rest/admin/test_media.py
@@ -566,6 +566,134 @@ class DeleteMediaByDateSizeTestCase(unittest.HomeserverTestCase):
self.assertFalse(os.path.exists(local_path))
+class QuarantineMediaByIDTestCase(unittest.HomeserverTestCase):
+
+ servlets = [
+ synapse.rest.admin.register_servlets,
+ synapse.rest.admin.register_servlets_for_media_repo,
+ login.register_servlets,
+ ]
+
+ def prepare(self, reactor, clock, hs):
+ media_repo = hs.get_media_repository_resource()
+ self.store = hs.get_datastore()
+ self.server_name = hs.hostname
+
+ self.admin_user = self.register_user("admin", "pass", admin=True)
+ self.admin_user_tok = self.login("admin", "pass")
+
+ # Create media
+ upload_resource = media_repo.children[b"upload"]
+ # file size is 67 Byte
+ image_data = unhexlify(
+ b"89504e470d0a1a0a0000000d4948445200000001000000010806"
+ b"0000001f15c4890000000a49444154789c63000100000500010d"
+ b"0a2db40000000049454e44ae426082"
+ )
+
+ # Upload some media into the room
+ response = self.helper.upload_media(
+ upload_resource, image_data, tok=self.admin_user_tok, expect_code=200
+ )
+ # Extract media ID from the response
+ server_and_media_id = response["content_uri"][6:] # Cut off 'mxc://'
+ self.media_id = server_and_media_id.split("/")[1]
+
+ self.url = "/_synapse/admin/v1/media/%s/%s/%s"
+
+ @parameterized.expand(["quarantine", "unquarantine"])
+ def test_no_auth(self, action: str):
+ """
+ Try to protect media without authentication.
+ """
+
+ channel = self.make_request(
+ "POST",
+ self.url % (action, self.server_name, self.media_id),
+ b"{}",
+ )
+
+ self.assertEqual(401, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
+
+ @parameterized.expand(["quarantine", "unquarantine"])
+ def test_requester_is_no_admin(self, action: str):
+ """
+ If the user is not a server admin, an error is returned.
+ """
+ self.other_user = self.register_user("user", "pass")
+ self.other_user_token = self.login("user", "pass")
+
+ channel = self.make_request(
+ "POST",
+ self.url % (action, self.server_name, self.media_id),
+ access_token=self.other_user_token,
+ )
+
+ self.assertEqual(403, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
+
+ def test_quarantine_media(self):
+ """
+ Tests that quarantining and remove from quarantine a media is successfully
+ """
+
+ media_info = self.get_success(self.store.get_local_media(self.media_id))
+ self.assertFalse(media_info["quarantined_by"])
+
+ # quarantining
+ channel = self.make_request(
+ "POST",
+ self.url % ("quarantine", self.server_name, self.media_id),
+ access_token=self.admin_user_tok,
+ )
+
+ self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertFalse(channel.json_body)
+
+ media_info = self.get_success(self.store.get_local_media(self.media_id))
+ self.assertTrue(media_info["quarantined_by"])
+
+ # remove from quarantine
+ channel = self.make_request(
+ "POST",
+ self.url % ("unquarantine", self.server_name, self.media_id),
+ access_token=self.admin_user_tok,
+ )
+
+ self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertFalse(channel.json_body)
+
+ media_info = self.get_success(self.store.get_local_media(self.media_id))
+ self.assertFalse(media_info["quarantined_by"])
+
+ def test_quarantine_protected_media(self):
+ """
+ Tests that quarantining from protected media fails
+ """
+
+ # protect
+ self.get_success(self.store.mark_local_media_as_safe(self.media_id, safe=True))
+
+ # verify protection
+ media_info = self.get_success(self.store.get_local_media(self.media_id))
+ self.assertTrue(media_info["safe_from_quarantine"])
+
+ # quarantining
+ channel = self.make_request(
+ "POST",
+ self.url % ("quarantine", self.server_name, self.media_id),
+ access_token=self.admin_user_tok,
+ )
+
+ self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertFalse(channel.json_body)
+
+ # verify that is not in quarantine
+ media_info = self.get_success(self.store.get_local_media(self.media_id))
+ self.assertFalse(media_info["quarantined_by"])
+
+
class ProtectMediaByIDTestCase(unittest.HomeserverTestCase):
servlets = [
|