diff options
Diffstat (limited to 'tests/federation/test_federation_media.py')
-rw-r--r-- | tests/federation/test_federation_media.py | 110 |
1 files changed, 110 insertions, 0 deletions
diff --git a/tests/federation/test_federation_media.py b/tests/federation/test_federation_media.py index 142f73cfdb..0dcf20f5f5 100644 --- a/tests/federation/test_federation_media.py +++ b/tests/federation/test_federation_media.py @@ -35,6 +35,7 @@ from synapse.types import UserID from synapse.util import Clock from tests import unittest +from tests.media.test_media_storage import small_png from tests.test_utils import SMALL_PNG @@ -146,3 +147,112 @@ class FederationMediaDownloadsTest(unittest.FederatingHomeserverTestCase): # check that the png file exists and matches what was uploaded found_file = any(SMALL_PNG in field for field in stripped_bytes) self.assertTrue(found_file) + + +class FederationThumbnailTest(unittest.FederatingHomeserverTestCase): + + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + super().prepare(reactor, clock, hs) + self.test_dir = tempfile.mkdtemp(prefix="synapse-tests-") + self.addCleanup(shutil.rmtree, self.test_dir) + self.primary_base_path = os.path.join(self.test_dir, "primary") + self.secondary_base_path = os.path.join(self.test_dir, "secondary") + + hs.config.media.media_store_path = self.primary_base_path + + storage_providers = [ + StorageProviderWrapper( + FileStorageProviderBackend(hs, self.secondary_base_path), + store_local=True, + store_remote=False, + store_synchronous=True, + ) + ] + + self.filepaths = MediaFilePaths(self.primary_base_path) + self.media_storage = MediaStorage( + hs, self.primary_base_path, self.filepaths, storage_providers + ) + self.media_repo = hs.get_media_repository() + + def test_thumbnail_download_scaled(self) -> None: + content = io.BytesIO(small_png.data) + content_uri = self.get_success( + self.media_repo.create_content( + "image/png", + "test_png_thumbnail", + content, + 67, + UserID.from_string("@user_id:whatever.org"), + ) + ) + # test with an image file + channel = self.make_signed_federation_request( + "GET", + f"/_matrix/federation/v1/media/thumbnail/{content_uri.media_id}?width=32&height=32&method=scale", + ) + self.pump() + self.assertEqual(200, channel.code) + + content_type = channel.headers.getRawHeaders("content-type") + assert content_type is not None + assert "multipart/mixed" in content_type[0] + assert "boundary" in content_type[0] + + # extract boundary + boundary = content_type[0].split("boundary=")[1] + # split on boundary and check that json field and expected value exist + body = channel.result.get("body") + assert body is not None + stripped_bytes = body.split(b"\r\n" + b"--" + boundary.encode("utf-8")) + found_json = any( + b"\r\nContent-Type: application/json\r\n\r\n{}" in field + for field in stripped_bytes + ) + self.assertTrue(found_json) + + # check that the png file exists and matches the expected scaled bytes + found_file = any(small_png.expected_scaled in field for field in stripped_bytes) + self.assertTrue(found_file) + + def test_thumbnail_download_cropped(self) -> None: + content = io.BytesIO(small_png.data) + content_uri = self.get_success( + self.media_repo.create_content( + "image/png", + "test_png_thumbnail", + content, + 67, + UserID.from_string("@user_id:whatever.org"), + ) + ) + # test with an image file + channel = self.make_signed_federation_request( + "GET", + f"/_matrix/federation/v1/media/thumbnail/{content_uri.media_id}?width=32&height=32&method=crop", + ) + self.pump() + self.assertEqual(200, channel.code) + + content_type = channel.headers.getRawHeaders("content-type") + assert content_type is not None + assert "multipart/mixed" in content_type[0] + assert "boundary" in content_type[0] + + # extract boundary + boundary = content_type[0].split("boundary=")[1] + # split on boundary and check that json field and expected value exist + body = channel.result.get("body") + assert body is not None + stripped_bytes = body.split(b"\r\n" + b"--" + boundary.encode("utf-8")) + found_json = any( + b"\r\nContent-Type: application/json\r\n\r\n{}" in field + for field in stripped_bytes + ) + self.assertTrue(found_json) + + # check that the png file exists and matches the expected cropped bytes + found_file = any( + small_png.expected_cropped in field for field in stripped_bytes + ) + self.assertTrue(found_file) |