summary refs log tree commit diff
path: root/tests/federation/test_federation_media.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/federation/test_federation_media.py')
-rw-r--r--tests/federation/test_federation_media.py110
1 files changed, 110 insertions, 0 deletions
diff --git a/tests/federation/test_federation_media.py b/tests/federation/test_federation_media.py
index 142f73cfdb..0dcf20f5f5 100644
--- a/tests/federation/test_federation_media.py
+++ b/tests/federation/test_federation_media.py
@@ -35,6 +35,7 @@ from synapse.types import UserID
 from synapse.util import Clock
 
 from tests import unittest
+from tests.media.test_media_storage import small_png
 from tests.test_utils import SMALL_PNG
 
 
@@ -146,3 +147,112 @@ class FederationMediaDownloadsTest(unittest.FederatingHomeserverTestCase):
         # check that the png file exists and matches what was uploaded
         found_file = any(SMALL_PNG in field for field in stripped_bytes)
         self.assertTrue(found_file)
+
+
+class FederationThumbnailTest(unittest.FederatingHomeserverTestCase):
+
+    def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
+        super().prepare(reactor, clock, hs)
+        self.test_dir = tempfile.mkdtemp(prefix="synapse-tests-")
+        self.addCleanup(shutil.rmtree, self.test_dir)
+        self.primary_base_path = os.path.join(self.test_dir, "primary")
+        self.secondary_base_path = os.path.join(self.test_dir, "secondary")
+
+        hs.config.media.media_store_path = self.primary_base_path
+
+        storage_providers = [
+            StorageProviderWrapper(
+                FileStorageProviderBackend(hs, self.secondary_base_path),
+                store_local=True,
+                store_remote=False,
+                store_synchronous=True,
+            )
+        ]
+
+        self.filepaths = MediaFilePaths(self.primary_base_path)
+        self.media_storage = MediaStorage(
+            hs, self.primary_base_path, self.filepaths, storage_providers
+        )
+        self.media_repo = hs.get_media_repository()
+
+    def test_thumbnail_download_scaled(self) -> None:
+        content = io.BytesIO(small_png.data)
+        content_uri = self.get_success(
+            self.media_repo.create_content(
+                "image/png",
+                "test_png_thumbnail",
+                content,
+                67,
+                UserID.from_string("@user_id:whatever.org"),
+            )
+        )
+        # test with an image file
+        channel = self.make_signed_federation_request(
+            "GET",
+            f"/_matrix/federation/v1/media/thumbnail/{content_uri.media_id}?width=32&height=32&method=scale",
+        )
+        self.pump()
+        self.assertEqual(200, channel.code)
+
+        content_type = channel.headers.getRawHeaders("content-type")
+        assert content_type is not None
+        assert "multipart/mixed" in content_type[0]
+        assert "boundary" in content_type[0]
+
+        # extract boundary
+        boundary = content_type[0].split("boundary=")[1]
+        # split on boundary and check that json field and expected value exist
+        body = channel.result.get("body")
+        assert body is not None
+        stripped_bytes = body.split(b"\r\n" + b"--" + boundary.encode("utf-8"))
+        found_json = any(
+            b"\r\nContent-Type: application/json\r\n\r\n{}" in field
+            for field in stripped_bytes
+        )
+        self.assertTrue(found_json)
+
+        # check that the png file exists and matches the expected scaled bytes
+        found_file = any(small_png.expected_scaled in field for field in stripped_bytes)
+        self.assertTrue(found_file)
+
+    def test_thumbnail_download_cropped(self) -> None:
+        content = io.BytesIO(small_png.data)
+        content_uri = self.get_success(
+            self.media_repo.create_content(
+                "image/png",
+                "test_png_thumbnail",
+                content,
+                67,
+                UserID.from_string("@user_id:whatever.org"),
+            )
+        )
+        # test with an image file
+        channel = self.make_signed_federation_request(
+            "GET",
+            f"/_matrix/federation/v1/media/thumbnail/{content_uri.media_id}?width=32&height=32&method=crop",
+        )
+        self.pump()
+        self.assertEqual(200, channel.code)
+
+        content_type = channel.headers.getRawHeaders("content-type")
+        assert content_type is not None
+        assert "multipart/mixed" in content_type[0]
+        assert "boundary" in content_type[0]
+
+        # extract boundary
+        boundary = content_type[0].split("boundary=")[1]
+        # split on boundary and check that json field and expected value exist
+        body = channel.result.get("body")
+        assert body is not None
+        stripped_bytes = body.split(b"\r\n" + b"--" + boundary.encode("utf-8"))
+        found_json = any(
+            b"\r\nContent-Type: application/json\r\n\r\n{}" in field
+            for field in stripped_bytes
+        )
+        self.assertTrue(found_json)
+
+        # check that the png file exists and matches the expected cropped bytes
+        found_file = any(
+            small_png.expected_cropped in field for field in stripped_bytes
+        )
+        self.assertTrue(found_file)