diff --git a/tests/rest/admin/test_media.py b/tests/rest/admin/test_media.py
index f378165513..da0e9749aa 100644
--- a/tests/rest/admin/test_media.py
+++ b/tests/rest/admin/test_media.py
@@ -35,7 +35,8 @@ from synapse.server import HomeServer
from synapse.util import Clock
from tests import unittest
-from tests.test_utils import SMALL_PNG
+from tests.test_utils import SMALL_CMYK_JPEG, SMALL_PNG
+from tests.unittest import override_config
VALID_TIMESTAMP = 1609459200000 # 2021-01-01 in milliseconds
INVALID_TIMESTAMP_IN_S = 1893456000 # 2030-01-01 in seconds
@@ -126,6 +127,7 @@ class DeleteMediaByIDTestCase(_AdminMediaTests):
self.assertEqual(400, channel.code, msg=channel.json_body)
self.assertEqual("Can only delete local media", channel.json_body["error"])
+ @override_config({"enable_authenticated_media": False})
def test_delete_media(self) -> None:
"""
Tests that delete a media is successfully
@@ -371,6 +373,7 @@ class DeleteMediaByDateSizeTestCase(_AdminMediaTests):
self._access_media(server_and_media_id, False)
+ @override_config({"enable_authenticated_media": False})
def test_keep_media_by_date(self) -> None:
"""
Tests that media is not deleted if it is newer than `before_ts`
@@ -408,6 +411,7 @@ class DeleteMediaByDateSizeTestCase(_AdminMediaTests):
self._access_media(server_and_media_id, False)
+ @override_config({"enable_authenticated_media": False})
def test_keep_media_by_size(self) -> None:
"""
Tests that media is not deleted if its size is smaller than or equal
@@ -443,6 +447,7 @@ class DeleteMediaByDateSizeTestCase(_AdminMediaTests):
self._access_media(server_and_media_id, False)
+ @override_config({"enable_authenticated_media": False})
def test_keep_media_by_user_avatar(self) -> None:
"""
Tests that we do not delete media if is used as a user avatar
@@ -487,6 +492,7 @@ class DeleteMediaByDateSizeTestCase(_AdminMediaTests):
self._access_media(server_and_media_id, False)
+ @override_config({"enable_authenticated_media": False})
def test_keep_media_by_room_avatar(self) -> None:
"""
Tests that we do not delete media if it is used as a room avatar
@@ -592,23 +598,27 @@ class DeleteMediaByDateSizeTestCase(_AdminMediaTests):
class QuarantineMediaByIDTestCase(_AdminMediaTests):
- def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
- self.store = hs.get_datastores().main
- self.server_name = hs.hostname
-
- self.admin_user = self.register_user("admin", "pass", admin=True)
- self.admin_user_tok = self.login("admin", "pass")
-
+ def upload_media_and_return_media_id(self, data: bytes) -> str:
# Upload some media into the room
response = self.helper.upload_media(
- SMALL_PNG,
+ data,
tok=self.admin_user_tok,
expect_code=200,
)
# Extract media ID from the response
server_and_media_id = response["content_uri"][6:] # Cut off 'mxc://'
- self.media_id = server_and_media_id.split("/")[1]
+ return server_and_media_id.split("/")[1]
+
+ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
+ self.store = hs.get_datastores().main
+ self.server_name = hs.hostname
+ self.admin_user = self.register_user("admin", "pass", admin=True)
+ self.admin_user_tok = self.login("admin", "pass")
+ self.media_id = self.upload_media_and_return_media_id(SMALL_PNG)
+ self.media_id_2 = self.upload_media_and_return_media_id(SMALL_PNG)
+ self.media_id_3 = self.upload_media_and_return_media_id(SMALL_PNG)
+ self.media_id_other = self.upload_media_and_return_media_id(SMALL_CMYK_JPEG)
self.url = "/_synapse/admin/v1/media/%s/%s/%s"
@parameterized.expand(["quarantine", "unquarantine"])
@@ -680,6 +690,52 @@ class QuarantineMediaByIDTestCase(_AdminMediaTests):
assert media_info is not None
self.assertFalse(media_info.quarantined_by)
+ def test_quarantine_media_match_hash(self) -> None:
+ """
+ Tests that quarantining removes all media with the same hash
+ """
+
+ media_info = self.get_success(self.store.get_local_media(self.media_id))
+ assert media_info is not None
+ self.assertFalse(media_info.quarantined_by)
+
+ # quarantining
+ channel = self.make_request(
+ "POST",
+ self.url % ("quarantine", self.server_name, self.media_id),
+ access_token=self.admin_user_tok,
+ )
+
+ self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertFalse(channel.json_body)
+
+ # Test that ALL similar media was quarantined.
+ for media in [self.media_id, self.media_id_2, self.media_id_3]:
+ media_info = self.get_success(self.store.get_local_media(media))
+ assert media_info is not None
+ self.assertTrue(media_info.quarantined_by)
+
+ # Test that other media was not.
+ media_info = self.get_success(self.store.get_local_media(self.media_id_other))
+ assert media_info is not None
+ self.assertFalse(media_info.quarantined_by)
+
+ # remove from quarantine
+ channel = self.make_request(
+ "POST",
+ self.url % ("unquarantine", self.server_name, self.media_id),
+ access_token=self.admin_user_tok,
+ )
+
+ self.assertEqual(200, channel.code, msg=channel.json_body)
+ self.assertFalse(channel.json_body)
+
+ # Test that ALL similar media is now reset.
+ for media in [self.media_id, self.media_id_2, self.media_id_3]:
+ media_info = self.get_success(self.store.get_local_media(media))
+ assert media_info is not None
+ self.assertFalse(media_info.quarantined_by)
+
def test_quarantine_protected_media(self) -> None:
"""
Tests that quarantining from protected media fails
|