diff --git a/tests/rest/media/test_media_retention.py b/tests/rest/media/test_media_retention.py
index 23f227aed6..b59d9dfd4d 100644
--- a/tests/rest/media/test_media_retention.py
+++ b/tests/rest/media/test_media_retention.py
@@ -31,7 +31,6 @@ from tests.utils import MockClock
class MediaRetentionTestCase(unittest.HomeserverTestCase):
-
ONE_DAY_IN_MS = 24 * 60 * 60 * 1000
THIRTY_DAYS_IN_MS = 30 * ONE_DAY_IN_MS
diff --git a/tests/rest/media/v1/test_url_preview.py b/tests/rest/media/test_url_preview.py
index 2c321f8d04..e91dc581c2 100644
--- a/tests/rest/media/v1/test_url_preview.py
+++ b/tests/rest/media/test_url_preview.py
@@ -26,8 +26,8 @@ from twisted.internet.interfaces import IAddress, IResolutionReceiver
from twisted.test.proto_helpers import AccumulatingProtocol, MemoryReactor
from synapse.config.oembed import OEmbedEndpointConfig
-from synapse.rest.media.v1.media_repository import MediaRepositoryResource
-from synapse.rest.media.v1.preview_url_resource import IMAGE_CACHE_EXPIRY_MS
+from synapse.rest.media.media_repository_resource import MediaRepositoryResource
+from synapse.rest.media.preview_url_resource import IMAGE_CACHE_EXPIRY_MS
from synapse.server import HomeServer
from synapse.types import JsonDict
from synapse.util import Clock
@@ -58,7 +58,6 @@ class URLPreviewTests(unittest.HomeserverTestCase):
)
def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
-
config = self.default_config()
config["url_preview_enabled"] = True
config["max_spider_size"] = 9999999
@@ -83,7 +82,7 @@ class URLPreviewTests(unittest.HomeserverTestCase):
config["media_store_path"] = self.media_store_path
provider_config = {
- "module": "synapse.rest.media.v1.storage_provider.FileStorageProviderBackend",
+ "module": "synapse.media.storage_provider.FileStorageProviderBackend",
"store_local": True,
"store_synchronous": False,
"store_remote": True,
@@ -118,7 +117,6 @@ class URLPreviewTests(unittest.HomeserverTestCase):
return hs
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
-
self.media_repo = hs.get_media_repository_resource()
self.preview_url = self.media_repo.children[b"preview_url"]
@@ -133,7 +131,6 @@ class URLPreviewTests(unittest.HomeserverTestCase):
addressTypes: Optional[Sequence[Type[IAddress]]] = None,
transportSemantics: str = "TCP",
) -> IResolutionReceiver:
-
resolution = HostResolution(hostName)
resolutionReceiver.resolutionBegan(resolution)
if hostName not in self.lookups:
@@ -660,7 +657,7 @@ class URLPreviewTests(unittest.HomeserverTestCase):
"""If the preview image doesn't exist, ensure some data is returned."""
self.lookups["matrix.org"] = [(IPv4Address, "10.1.2.3")]
- end_content = (
+ result = (
b"""<html><body><img src="http://cdn.matrix.org/foo.jpg"></body></html>"""
)
@@ -681,8 +678,8 @@ class URLPreviewTests(unittest.HomeserverTestCase):
b"HTTP/1.0 200 OK\r\nContent-Length: %d\r\n"
b'Content-Type: text/html; charset="utf8"\r\n\r\n'
)
- % (len(end_content),)
- + end_content
+ % (len(result),)
+ + result
)
self.pump()
@@ -691,6 +688,44 @@ class URLPreviewTests(unittest.HomeserverTestCase):
# The image should not be in the result.
self.assertNotIn("og:image", channel.json_body)
+ def test_oembed_failure(self) -> None:
+ """If the autodiscovered oEmbed URL fails, ensure some data is returned."""
+ self.lookups["matrix.org"] = [(IPv4Address, "10.1.2.3")]
+
+ result = b"""
+ <title>oEmbed Autodiscovery Fail</title>
+ <link rel="alternate" type="application/json+oembed"
+ href="http://example.com/oembed?url=http%3A%2F%2Fmatrix.org&format=json"
+ title="matrixdotorg" />
+ """
+
+ channel = self.make_request(
+ "GET",
+ "preview_url?url=http://matrix.org",
+ shorthand=False,
+ await_result=False,
+ )
+ self.pump()
+
+ client = self.reactor.tcpClients[0][2].buildProtocol(None)
+ server = AccumulatingProtocol()
+ server.makeConnection(FakeTransport(client, self.reactor))
+ client.makeConnection(FakeTransport(server, self.reactor))
+ client.dataReceived(
+ (
+ b"HTTP/1.0 200 OK\r\nContent-Length: %d\r\n"
+ b'Content-Type: text/html; charset="utf8"\r\n\r\n'
+ )
+ % (len(result),)
+ + result
+ )
+
+ self.pump()
+ self.assertEqual(channel.code, 200)
+
+ # The image should not be in the result.
+ self.assertEqual(channel.json_body["og:title"], "oEmbed Autodiscovery Fail")
+
def test_data_url(self) -> None:
"""
Requesting to preview a data URL is not supported.
diff --git a/tests/rest/media/v1/__init__.py b/tests/rest/media/v1/__init__.py
deleted file mode 100644
index b1ee10cfcc..0000000000
--- a/tests/rest/media/v1/__init__.py
+++ /dev/null
@@ -1,13 +0,0 @@
-# Copyright 2018 New Vector Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
diff --git a/tests/rest/media/v1/test_base.py b/tests/rest/media/v1/test_base.py
deleted file mode 100644
index c73179151a..0000000000
--- a/tests/rest/media/v1/test_base.py
+++ /dev/null
@@ -1,38 +0,0 @@
-# Copyright 2019 New Vector Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from synapse.rest.media.v1._base import get_filename_from_headers
-
-from tests import unittest
-
-
-class GetFileNameFromHeadersTests(unittest.TestCase):
- # input -> expected result
- TEST_CASES = {
- b"inline; filename=abc.txt": "abc.txt",
- b'inline; filename="azerty"': "azerty",
- b'inline; filename="aze%20rty"': "aze%20rty",
- b'inline; filename="aze"rty"': 'aze"rty',
- b'inline; filename="azer;ty"': "azer;ty",
- b"inline; filename*=utf-8''foo%C2%A3bar": "foo£bar",
- }
-
- def tests(self) -> None:
- for hdr, expected in self.TEST_CASES.items():
- res = get_filename_from_headers({b"Content-Disposition": [hdr]})
- self.assertEqual(
- res,
- expected,
- f"expected output for {hdr!r} to be {expected} but was {res}",
- )
diff --git a/tests/rest/media/v1/test_filepath.py b/tests/rest/media/v1/test_filepath.py
deleted file mode 100644
index 43e6f0f70a..0000000000
--- a/tests/rest/media/v1/test_filepath.py
+++ /dev/null
@@ -1,595 +0,0 @@
-# Copyright 2021 The Matrix.org Foundation C.I.C.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-import inspect
-import os
-from typing import Iterable
-
-from synapse.rest.media.v1.filepath import MediaFilePaths, _wrap_with_jail_check
-
-from tests import unittest
-
-
-class MediaFilePathsTestCase(unittest.TestCase):
- def setUp(self) -> None:
- super().setUp()
-
- self.filepaths = MediaFilePaths("/media_store")
-
- def test_local_media_filepath(self) -> None:
- """Test local media paths"""
- self.assertEqual(
- self.filepaths.local_media_filepath_rel("GerZNDnDZVjsOtardLuwfIBg"),
- "local_content/Ge/rZ/NDnDZVjsOtardLuwfIBg",
- )
- self.assertEqual(
- self.filepaths.local_media_filepath("GerZNDnDZVjsOtardLuwfIBg"),
- "/media_store/local_content/Ge/rZ/NDnDZVjsOtardLuwfIBg",
- )
-
- def test_local_media_thumbnail(self) -> None:
- """Test local media thumbnail paths"""
- self.assertEqual(
- self.filepaths.local_media_thumbnail_rel(
- "GerZNDnDZVjsOtardLuwfIBg", 800, 600, "image/jpeg", "scale"
- ),
- "local_thumbnails/Ge/rZ/NDnDZVjsOtardLuwfIBg/800-600-image-jpeg-scale",
- )
- self.assertEqual(
- self.filepaths.local_media_thumbnail(
- "GerZNDnDZVjsOtardLuwfIBg", 800, 600, "image/jpeg", "scale"
- ),
- "/media_store/local_thumbnails/Ge/rZ/NDnDZVjsOtardLuwfIBg/800-600-image-jpeg-scale",
- )
-
- def test_local_media_thumbnail_dir(self) -> None:
- """Test local media thumbnail directory paths"""
- self.assertEqual(
- self.filepaths.local_media_thumbnail_dir("GerZNDnDZVjsOtardLuwfIBg"),
- "/media_store/local_thumbnails/Ge/rZ/NDnDZVjsOtardLuwfIBg",
- )
-
- def test_remote_media_filepath(self) -> None:
- """Test remote media paths"""
- self.assertEqual(
- self.filepaths.remote_media_filepath_rel(
- "example.com", "GerZNDnDZVjsOtardLuwfIBg"
- ),
- "remote_content/example.com/Ge/rZ/NDnDZVjsOtardLuwfIBg",
- )
- self.assertEqual(
- self.filepaths.remote_media_filepath(
- "example.com", "GerZNDnDZVjsOtardLuwfIBg"
- ),
- "/media_store/remote_content/example.com/Ge/rZ/NDnDZVjsOtardLuwfIBg",
- )
-
- def test_remote_media_thumbnail(self) -> None:
- """Test remote media thumbnail paths"""
- self.assertEqual(
- self.filepaths.remote_media_thumbnail_rel(
- "example.com",
- "GerZNDnDZVjsOtardLuwfIBg",
- 800,
- 600,
- "image/jpeg",
- "scale",
- ),
- "remote_thumbnail/example.com/Ge/rZ/NDnDZVjsOtardLuwfIBg/800-600-image-jpeg-scale",
- )
- self.assertEqual(
- self.filepaths.remote_media_thumbnail(
- "example.com",
- "GerZNDnDZVjsOtardLuwfIBg",
- 800,
- 600,
- "image/jpeg",
- "scale",
- ),
- "/media_store/remote_thumbnail/example.com/Ge/rZ/NDnDZVjsOtardLuwfIBg/800-600-image-jpeg-scale",
- )
-
- def test_remote_media_thumbnail_legacy(self) -> None:
- """Test old-style remote media thumbnail paths"""
- self.assertEqual(
- self.filepaths.remote_media_thumbnail_rel_legacy(
- "example.com", "GerZNDnDZVjsOtardLuwfIBg", 800, 600, "image/jpeg"
- ),
- "remote_thumbnail/example.com/Ge/rZ/NDnDZVjsOtardLuwfIBg/800-600-image-jpeg",
- )
-
- def test_remote_media_thumbnail_dir(self) -> None:
- """Test remote media thumbnail directory paths"""
- self.assertEqual(
- self.filepaths.remote_media_thumbnail_dir(
- "example.com", "GerZNDnDZVjsOtardLuwfIBg"
- ),
- "/media_store/remote_thumbnail/example.com/Ge/rZ/NDnDZVjsOtardLuwfIBg",
- )
-
- def test_url_cache_filepath(self) -> None:
- """Test URL cache paths"""
- self.assertEqual(
- self.filepaths.url_cache_filepath_rel("2020-01-02_GerZNDnDZVjsOtar"),
- "url_cache/2020-01-02/GerZNDnDZVjsOtar",
- )
- self.assertEqual(
- self.filepaths.url_cache_filepath("2020-01-02_GerZNDnDZVjsOtar"),
- "/media_store/url_cache/2020-01-02/GerZNDnDZVjsOtar",
- )
-
- def test_url_cache_filepath_legacy(self) -> None:
- """Test old-style URL cache paths"""
- self.assertEqual(
- self.filepaths.url_cache_filepath_rel("GerZNDnDZVjsOtardLuwfIBg"),
- "url_cache/Ge/rZ/NDnDZVjsOtardLuwfIBg",
- )
- self.assertEqual(
- self.filepaths.url_cache_filepath("GerZNDnDZVjsOtardLuwfIBg"),
- "/media_store/url_cache/Ge/rZ/NDnDZVjsOtardLuwfIBg",
- )
-
- def test_url_cache_filepath_dirs_to_delete(self) -> None:
- """Test URL cache cleanup paths"""
- self.assertEqual(
- self.filepaths.url_cache_filepath_dirs_to_delete(
- "2020-01-02_GerZNDnDZVjsOtar"
- ),
- ["/media_store/url_cache/2020-01-02"],
- )
-
- def test_url_cache_filepath_dirs_to_delete_legacy(self) -> None:
- """Test old-style URL cache cleanup paths"""
- self.assertEqual(
- self.filepaths.url_cache_filepath_dirs_to_delete(
- "GerZNDnDZVjsOtardLuwfIBg"
- ),
- [
- "/media_store/url_cache/Ge/rZ",
- "/media_store/url_cache/Ge",
- ],
- )
-
- def test_url_cache_thumbnail(self) -> None:
- """Test URL cache thumbnail paths"""
- self.assertEqual(
- self.filepaths.url_cache_thumbnail_rel(
- "2020-01-02_GerZNDnDZVjsOtar", 800, 600, "image/jpeg", "scale"
- ),
- "url_cache_thumbnails/2020-01-02/GerZNDnDZVjsOtar/800-600-image-jpeg-scale",
- )
- self.assertEqual(
- self.filepaths.url_cache_thumbnail(
- "2020-01-02_GerZNDnDZVjsOtar", 800, 600, "image/jpeg", "scale"
- ),
- "/media_store/url_cache_thumbnails/2020-01-02/GerZNDnDZVjsOtar/800-600-image-jpeg-scale",
- )
-
- def test_url_cache_thumbnail_legacy(self) -> None:
- """Test old-style URL cache thumbnail paths"""
- self.assertEqual(
- self.filepaths.url_cache_thumbnail_rel(
- "GerZNDnDZVjsOtardLuwfIBg", 800, 600, "image/jpeg", "scale"
- ),
- "url_cache_thumbnails/Ge/rZ/NDnDZVjsOtardLuwfIBg/800-600-image-jpeg-scale",
- )
- self.assertEqual(
- self.filepaths.url_cache_thumbnail(
- "GerZNDnDZVjsOtardLuwfIBg", 800, 600, "image/jpeg", "scale"
- ),
- "/media_store/url_cache_thumbnails/Ge/rZ/NDnDZVjsOtardLuwfIBg/800-600-image-jpeg-scale",
- )
-
- def test_url_cache_thumbnail_directory(self) -> None:
- """Test URL cache thumbnail directory paths"""
- self.assertEqual(
- self.filepaths.url_cache_thumbnail_directory_rel(
- "2020-01-02_GerZNDnDZVjsOtar"
- ),
- "url_cache_thumbnails/2020-01-02/GerZNDnDZVjsOtar",
- )
- self.assertEqual(
- self.filepaths.url_cache_thumbnail_directory("2020-01-02_GerZNDnDZVjsOtar"),
- "/media_store/url_cache_thumbnails/2020-01-02/GerZNDnDZVjsOtar",
- )
-
- def test_url_cache_thumbnail_directory_legacy(self) -> None:
- """Test old-style URL cache thumbnail directory paths"""
- self.assertEqual(
- self.filepaths.url_cache_thumbnail_directory_rel(
- "GerZNDnDZVjsOtardLuwfIBg"
- ),
- "url_cache_thumbnails/Ge/rZ/NDnDZVjsOtardLuwfIBg",
- )
- self.assertEqual(
- self.filepaths.url_cache_thumbnail_directory("GerZNDnDZVjsOtardLuwfIBg"),
- "/media_store/url_cache_thumbnails/Ge/rZ/NDnDZVjsOtardLuwfIBg",
- )
-
- def test_url_cache_thumbnail_dirs_to_delete(self) -> None:
- """Test URL cache thumbnail cleanup paths"""
- self.assertEqual(
- self.filepaths.url_cache_thumbnail_dirs_to_delete(
- "2020-01-02_GerZNDnDZVjsOtar"
- ),
- [
- "/media_store/url_cache_thumbnails/2020-01-02/GerZNDnDZVjsOtar",
- "/media_store/url_cache_thumbnails/2020-01-02",
- ],
- )
-
- def test_url_cache_thumbnail_dirs_to_delete_legacy(self) -> None:
- """Test old-style URL cache thumbnail cleanup paths"""
- self.assertEqual(
- self.filepaths.url_cache_thumbnail_dirs_to_delete(
- "GerZNDnDZVjsOtardLuwfIBg"
- ),
- [
- "/media_store/url_cache_thumbnails/Ge/rZ/NDnDZVjsOtardLuwfIBg",
- "/media_store/url_cache_thumbnails/Ge/rZ",
- "/media_store/url_cache_thumbnails/Ge",
- ],
- )
-
- def test_server_name_validation(self) -> None:
- """Test validation of server names"""
- self._test_path_validation(
- [
- "remote_media_filepath_rel",
- "remote_media_filepath",
- "remote_media_thumbnail_rel",
- "remote_media_thumbnail",
- "remote_media_thumbnail_rel_legacy",
- "remote_media_thumbnail_dir",
- ],
- parameter="server_name",
- valid_values=[
- "matrix.org",
- "matrix.org:8448",
- "matrix-federation.matrix.org",
- "matrix-federation.matrix.org:8448",
- "10.1.12.123",
- "10.1.12.123:8448",
- "[fd00:abcd::ffff]",
- "[fd00:abcd::ffff]:8448",
- ],
- invalid_values=[
- "/matrix.org",
- "matrix.org/..",
- "matrix.org\x00",
- "",
- ".",
- "..",
- "/",
- ],
- )
-
- def test_file_id_validation(self) -> None:
- """Test validation of local, remote and legacy URL cache file / media IDs"""
- # File / media IDs get split into three parts to form paths, consisting of the
- # first two characters, next two characters and rest of the ID.
- valid_file_ids = [
- "GerZNDnDZVjsOtardLuwfIBg",
- # Unexpected, but produces an acceptable path:
- "GerZN", # "N" becomes the last directory
- ]
- invalid_file_ids = [
- "/erZNDnDZVjsOtardLuwfIBg",
- "Ge/ZNDnDZVjsOtardLuwfIBg",
- "GerZ/DnDZVjsOtardLuwfIBg",
- "GerZ/..",
- "G\x00rZNDnDZVjsOtardLuwfIBg",
- "Ger\x00NDnDZVjsOtardLuwfIBg",
- "GerZNDnDZVjsOtardLuwfIBg\x00",
- "",
- "Ge",
- "GerZ",
- "GerZ.",
- "..rZNDnDZVjsOtardLuwfIBg",
- "Ge..NDnDZVjsOtardLuwfIBg",
- "GerZ..",
- "GerZ/",
- ]
-
- self._test_path_validation(
- [
- "local_media_filepath_rel",
- "local_media_filepath",
- "local_media_thumbnail_rel",
- "local_media_thumbnail",
- "local_media_thumbnail_dir",
- # Legacy URL cache media IDs
- "url_cache_filepath_rel",
- "url_cache_filepath",
- # `url_cache_filepath_dirs_to_delete` is tested below.
- "url_cache_thumbnail_rel",
- "url_cache_thumbnail",
- "url_cache_thumbnail_directory_rel",
- "url_cache_thumbnail_directory",
- "url_cache_thumbnail_dirs_to_delete",
- ],
- parameter="media_id",
- valid_values=valid_file_ids,
- invalid_values=invalid_file_ids,
- )
-
- # `url_cache_filepath_dirs_to_delete` ignores what would be the last path
- # component, so only the first 4 characters matter.
- self._test_path_validation(
- [
- "url_cache_filepath_dirs_to_delete",
- ],
- parameter="media_id",
- valid_values=valid_file_ids,
- invalid_values=[
- "/erZNDnDZVjsOtardLuwfIBg",
- "Ge/ZNDnDZVjsOtardLuwfIBg",
- "G\x00rZNDnDZVjsOtardLuwfIBg",
- "Ger\x00NDnDZVjsOtardLuwfIBg",
- "",
- "Ge",
- "..rZNDnDZVjsOtardLuwfIBg",
- "Ge..NDnDZVjsOtardLuwfIBg",
- ],
- )
-
- self._test_path_validation(
- [
- "remote_media_filepath_rel",
- "remote_media_filepath",
- "remote_media_thumbnail_rel",
- "remote_media_thumbnail",
- "remote_media_thumbnail_rel_legacy",
- "remote_media_thumbnail_dir",
- ],
- parameter="file_id",
- valid_values=valid_file_ids,
- invalid_values=invalid_file_ids,
- )
-
- def test_url_cache_media_id_validation(self) -> None:
- """Test validation of URL cache media IDs"""
- self._test_path_validation(
- [
- "url_cache_filepath_rel",
- "url_cache_filepath",
- # `url_cache_filepath_dirs_to_delete` only cares about the date prefix
- "url_cache_thumbnail_rel",
- "url_cache_thumbnail",
- "url_cache_thumbnail_directory_rel",
- "url_cache_thumbnail_directory",
- "url_cache_thumbnail_dirs_to_delete",
- ],
- parameter="media_id",
- valid_values=[
- "2020-01-02_GerZNDnDZVjsOtar",
- "2020-01-02_G", # Unexpected, but produces an acceptable path
- ],
- invalid_values=[
- "2020-01-02",
- "2020-01-02-",
- "2020-01-02-.",
- "2020-01-02-..",
- "2020-01-02-/",
- "2020-01-02-/GerZNDnDZVjsOtar",
- "2020-01-02-GerZNDnDZVjsOtar/..",
- "2020-01-02-GerZNDnDZVjsOtar\x00",
- ],
- )
-
- def test_content_type_validation(self) -> None:
- """Test validation of thumbnail content types"""
- self._test_path_validation(
- [
- "local_media_thumbnail_rel",
- "local_media_thumbnail",
- "remote_media_thumbnail_rel",
- "remote_media_thumbnail",
- "remote_media_thumbnail_rel_legacy",
- "url_cache_thumbnail_rel",
- "url_cache_thumbnail",
- ],
- parameter="content_type",
- valid_values=[
- "image/jpeg",
- ],
- invalid_values=[
- "", # ValueError: not enough values to unpack
- "image/jpeg/abc", # ValueError: too many values to unpack
- "image/jpeg\x00",
- ],
- )
-
- def test_thumbnail_method_validation(self) -> None:
- """Test validation of thumbnail methods"""
- self._test_path_validation(
- [
- "local_media_thumbnail_rel",
- "local_media_thumbnail",
- "remote_media_thumbnail_rel",
- "remote_media_thumbnail",
- "url_cache_thumbnail_rel",
- "url_cache_thumbnail",
- ],
- parameter="method",
- valid_values=[
- "crop",
- "scale",
- ],
- invalid_values=[
- "/scale",
- "scale/..",
- "scale\x00",
- "/",
- ],
- )
-
- def _test_path_validation(
- self,
- methods: Iterable[str],
- parameter: str,
- valid_values: Iterable[str],
- invalid_values: Iterable[str],
- ) -> None:
- """Test that the specified methods validate the named parameter as expected
-
- Args:
- methods: The names of `MediaFilePaths` methods to test
- parameter: The name of the parameter to test
- valid_values: A list of parameter values that are expected to be accepted
- invalid_values: A list of parameter values that are expected to be rejected
-
- Raises:
- AssertionError: If a value was accepted when it should have failed
- validation.
- ValueError: If a value failed validation when it should have been accepted.
- """
- for method in methods:
- get_path = getattr(self.filepaths, method)
-
- parameters = inspect.signature(get_path).parameters
- kwargs = {
- "server_name": "matrix.org",
- "media_id": "GerZNDnDZVjsOtardLuwfIBg",
- "file_id": "GerZNDnDZVjsOtardLuwfIBg",
- "width": 800,
- "height": 600,
- "content_type": "image/jpeg",
- "method": "scale",
- }
-
- if get_path.__name__.startswith("url_"):
- kwargs["media_id"] = "2020-01-02_GerZNDnDZVjsOtar"
-
- kwargs = {k: v for k, v in kwargs.items() if k in parameters}
- kwargs.pop(parameter)
-
- for value in valid_values:
- kwargs[parameter] = value
- get_path(**kwargs)
- # No exception should be raised
-
- for value in invalid_values:
- with self.assertRaises(ValueError):
- kwargs[parameter] = value
- path_or_list = get_path(**kwargs)
- self.fail(
- f"{value!r} unexpectedly passed validation: "
- f"{method} returned {path_or_list!r}"
- )
-
-
-class MediaFilePathsJailTestCase(unittest.TestCase):
- def _check_relative_path(self, filepaths: MediaFilePaths, path: str) -> None:
- """Passes a relative path through the jail check.
-
- Args:
- filepaths: The `MediaFilePaths` instance.
- path: A path relative to the media store directory.
-
- Raises:
- ValueError: If the jail check fails.
- """
-
- @_wrap_with_jail_check(relative=True)
- def _make_relative_path(self: MediaFilePaths, path: str) -> str:
- return path
-
- _make_relative_path(filepaths, path)
-
- def _check_absolute_path(self, filepaths: MediaFilePaths, path: str) -> None:
- """Passes an absolute path through the jail check.
-
- Args:
- filepaths: The `MediaFilePaths` instance.
- path: A path relative to the media store directory.
-
- Raises:
- ValueError: If the jail check fails.
- """
-
- @_wrap_with_jail_check(relative=False)
- def _make_absolute_path(self: MediaFilePaths, path: str) -> str:
- return os.path.join(self.base_path, path)
-
- _make_absolute_path(filepaths, path)
-
- def test_traversal_inside(self) -> None:
- """Test the jail check for paths that stay within the media directory."""
- # Despite the `../`s, these paths still lie within the media directory and it's
- # expected for the jail check to allow them through.
- # These paths ought to trip the other checks in place and should never be
- # returned.
- filepaths = MediaFilePaths("/media_store")
- path = "url_cache/2020-01-02/../../GerZNDnDZVjsOtar"
- self._check_relative_path(filepaths, path)
- self._check_absolute_path(filepaths, path)
-
- def test_traversal_outside(self) -> None:
- """Test that the jail check fails for paths that escape the media directory."""
- filepaths = MediaFilePaths("/media_store")
- path = "url_cache/2020-01-02/../../../GerZNDnDZVjsOtar"
- with self.assertRaises(ValueError):
- self._check_relative_path(filepaths, path)
- with self.assertRaises(ValueError):
- self._check_absolute_path(filepaths, path)
-
- def test_traversal_reentry(self) -> None:
- """Test the jail check for paths that exit and re-enter the media directory."""
- # These paths lie outside the media directory if it is a symlink, and inside
- # otherwise. Ideally the check should fail, but this proves difficult.
- # This test documents the behaviour for this edge case.
- # These paths ought to trip the other checks in place and should never be
- # returned.
- filepaths = MediaFilePaths("/media_store")
- path = "url_cache/2020-01-02/../../../media_store/GerZNDnDZVjsOtar"
- self._check_relative_path(filepaths, path)
- self._check_absolute_path(filepaths, path)
-
- def test_symlink(self) -> None:
- """Test that a symlink does not cause the jail check to fail."""
- media_store_path = self.mktemp()
-
- # symlink the media store directory
- os.symlink("/mnt/synapse/media_store", media_store_path)
-
- # Test that relative and absolute paths don't trip the check
- # NB: `media_store_path` is a relative path
- filepaths = MediaFilePaths(media_store_path)
- self._check_relative_path(filepaths, "url_cache/2020-01-02/GerZNDnDZVjsOtar")
- self._check_absolute_path(filepaths, "url_cache/2020-01-02/GerZNDnDZVjsOtar")
-
- filepaths = MediaFilePaths(os.path.abspath(media_store_path))
- self._check_relative_path(filepaths, "url_cache/2020-01-02/GerZNDnDZVjsOtar")
- self._check_absolute_path(filepaths, "url_cache/2020-01-02/GerZNDnDZVjsOtar")
-
- def test_symlink_subdirectory(self) -> None:
- """Test that a symlinked subdirectory does not cause the jail check to fail."""
- media_store_path = self.mktemp()
- os.mkdir(media_store_path)
-
- # symlink `url_cache/`
- os.symlink(
- "/mnt/synapse/media_store_url_cache",
- os.path.join(media_store_path, "url_cache"),
- )
-
- # Test that relative and absolute paths don't trip the check
- # NB: `media_store_path` is a relative path
- filepaths = MediaFilePaths(media_store_path)
- self._check_relative_path(filepaths, "url_cache/2020-01-02/GerZNDnDZVjsOtar")
- self._check_absolute_path(filepaths, "url_cache/2020-01-02/GerZNDnDZVjsOtar")
-
- filepaths = MediaFilePaths(os.path.abspath(media_store_path))
- self._check_relative_path(filepaths, "url_cache/2020-01-02/GerZNDnDZVjsOtar")
- self._check_absolute_path(filepaths, "url_cache/2020-01-02/GerZNDnDZVjsOtar")
diff --git a/tests/rest/media/v1/test_html_preview.py b/tests/rest/media/v1/test_html_preview.py
deleted file mode 100644
index 1062081a06..0000000000
--- a/tests/rest/media/v1/test_html_preview.py
+++ /dev/null
@@ -1,542 +0,0 @@
-# Copyright 2014-2016 OpenMarket Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from synapse.rest.media.v1.preview_html import (
- _get_html_media_encodings,
- decode_body,
- parse_html_to_open_graph,
- summarize_paragraphs,
-)
-
-from tests import unittest
-
-try:
- import lxml
-except ImportError:
- lxml = None
-
-
-class SummarizeTestCase(unittest.TestCase):
- if not lxml:
- skip = "url preview feature requires lxml"
-
- def test_long_summarize(self) -> None:
- example_paras = [
- """Tromsø (Norwegian pronunciation: [ˈtrʊmsœ] ( listen); Northern Sami:
- Romsa; Finnish: Tromssa[2] Kven: Tromssa) is a city and municipality in
- Troms county, Norway. The administrative centre of the municipality is
- the city of Tromsø. Outside of Norway, Tromso and Tromsö are
- alternative spellings of the city.Tromsø is considered the northernmost
- city in the world with a population above 50,000. The most populous town
- north of it is Alta, Norway, with a population of 14,272 (2013).""",
- """Tromsø lies in Northern Norway. The municipality has a population of
- (2015) 72,066, but with an annual influx of students it has over 75,000
- most of the year. It is the largest urban area in Northern Norway and the
- third largest north of the Arctic Circle (following Murmansk and Norilsk).
- Most of Tromsø, including the city centre, is located on the island of
- Tromsøya, 350 kilometres (217 mi) north of the Arctic Circle. In 2012,
- Tromsøya had a population of 36,088. Substantial parts of the urban area
- are also situated on the mainland to the east, and on parts of Kvaløya—a
- large island to the west. Tromsøya is connected to the mainland by the Tromsø
- Bridge and the Tromsøysund Tunnel, and to the island of Kvaløya by the
- Sandnessund Bridge. Tromsø Airport connects the city to many destinations
- in Europe. The city is warmer than most other places located on the same
- latitude, due to the warming effect of the Gulf Stream.""",
- """The city centre of Tromsø contains the highest number of old wooden
- houses in Northern Norway, the oldest house dating from 1789. The Arctic
- Cathedral, a modern church from 1965, is probably the most famous landmark
- in Tromsø. The city is a cultural centre for its region, with several
- festivals taking place in the summer. Some of Norway's best-known
- musicians, Torbjørn Brundtland and Svein Berge of the electronica duo
- Röyksopp and Lene Marlin grew up and started their careers in Tromsø.
- Noted electronic musician Geir Jenssen also hails from Tromsø.""",
- ]
-
- desc = summarize_paragraphs(example_paras, min_size=200, max_size=500)
-
- self.assertEqual(
- desc,
- "Tromsø (Norwegian pronunciation: [ˈtrʊmsœ] ( listen); Northern Sami:"
- " Romsa; Finnish: Tromssa[2] Kven: Tromssa) is a city and municipality in"
- " Troms county, Norway. The administrative centre of the municipality is"
- " the city of Tromsø. Outside of Norway, Tromso and Tromsö are"
- " alternative spellings of the city.Tromsø is considered the northernmost"
- " city in the world with a population above 50,000. The most populous town"
- " north of it is Alta, Norway, with a population of 14,272 (2013).",
- )
-
- desc = summarize_paragraphs(example_paras[1:], min_size=200, max_size=500)
-
- self.assertEqual(
- desc,
- "Tromsø lies in Northern Norway. The municipality has a population of"
- " (2015) 72,066, but with an annual influx of students it has over 75,000"
- " most of the year. It is the largest urban area in Northern Norway and the"
- " third largest north of the Arctic Circle (following Murmansk and Norilsk)."
- " Most of Tromsø, including the city centre, is located on the island of"
- " Tromsøya, 350 kilometres (217 mi) north of the Arctic Circle. In 2012,"
- " Tromsøya had a population of 36,088. Substantial parts of the urban…",
- )
-
- def test_short_summarize(self) -> None:
- example_paras = [
- "Tromsø (Norwegian pronunciation: [ˈtrʊmsœ] ( listen); Northern Sami:"
- " Romsa; Finnish: Tromssa[2] Kven: Tromssa) is a city and municipality in"
- " Troms county, Norway.",
- "Tromsø lies in Northern Norway. The municipality has a population of"
- " (2015) 72,066, but with an annual influx of students it has over 75,000"
- " most of the year.",
- "The city centre of Tromsø contains the highest number of old wooden"
- " houses in Northern Norway, the oldest house dating from 1789. The Arctic"
- " Cathedral, a modern church from 1965, is probably the most famous landmark"
- " in Tromsø.",
- ]
-
- desc = summarize_paragraphs(example_paras, min_size=200, max_size=500)
-
- self.assertEqual(
- desc,
- "Tromsø (Norwegian pronunciation: [ˈtrʊmsœ] ( listen); Northern Sami:"
- " Romsa; Finnish: Tromssa[2] Kven: Tromssa) is a city and municipality in"
- " Troms county, Norway.\n"
- "\n"
- "Tromsø lies in Northern Norway. The municipality has a population of"
- " (2015) 72,066, but with an annual influx of students it has over 75,000"
- " most of the year.",
- )
-
- def test_small_then_large_summarize(self) -> None:
- example_paras = [
- "Tromsø (Norwegian pronunciation: [ˈtrʊmsœ] ( listen); Northern Sami:"
- " Romsa; Finnish: Tromssa[2] Kven: Tromssa) is a city and municipality in"
- " Troms county, Norway.",
- "Tromsø lies in Northern Norway. The municipality has a population of"
- " (2015) 72,066, but with an annual influx of students it has over 75,000"
- " most of the year."
- " The city centre of Tromsø contains the highest number of old wooden"
- " houses in Northern Norway, the oldest house dating from 1789. The Arctic"
- " Cathedral, a modern church from 1965, is probably the most famous landmark"
- " in Tromsø.",
- ]
-
- desc = summarize_paragraphs(example_paras, min_size=200, max_size=500)
- self.assertEqual(
- desc,
- "Tromsø (Norwegian pronunciation: [ˈtrʊmsœ] ( listen); Northern Sami:"
- " Romsa; Finnish: Tromssa[2] Kven: Tromssa) is a city and municipality in"
- " Troms county, Norway.\n"
- "\n"
- "Tromsø lies in Northern Norway. The municipality has a population of"
- " (2015) 72,066, but with an annual influx of students it has over 75,000"
- " most of the year. The city centre of Tromsø contains the highest number"
- " of old wooden houses in Northern Norway, the oldest house dating from"
- " 1789. The Arctic Cathedral, a modern church from…",
- )
-
-
-class OpenGraphFromHtmlTestCase(unittest.TestCase):
- if not lxml:
- skip = "url preview feature requires lxml"
-
- def test_simple(self) -> None:
- html = b"""
- <html>
- <head><title>Foo</title></head>
- <body>
- Some text.
- </body>
- </html>
- """
-
- tree = decode_body(html, "http://example.com/test.html")
- og = parse_html_to_open_graph(tree)
-
- self.assertEqual(og, {"og:title": "Foo", "og:description": "Some text."})
-
- def test_comment(self) -> None:
- html = b"""
- <html>
- <head><title>Foo</title></head>
- <body>
- <!-- HTML comment -->
- Some text.
- </body>
- </html>
- """
-
- tree = decode_body(html, "http://example.com/test.html")
- og = parse_html_to_open_graph(tree)
-
- self.assertEqual(og, {"og:title": "Foo", "og:description": "Some text."})
-
- def test_comment2(self) -> None:
- html = b"""
- <html>
- <head><title>Foo</title></head>
- <body>
- Some text.
- <!-- HTML comment -->
- Some more text.
- <p>Text</p>
- More text
- </body>
- </html>
- """
-
- tree = decode_body(html, "http://example.com/test.html")
- og = parse_html_to_open_graph(tree)
-
- self.assertEqual(
- og,
- {
- "og:title": "Foo",
- "og:description": "Some text.\n\nSome more text.\n\nText\n\nMore text",
- },
- )
-
- def test_script(self) -> None:
- html = b"""
- <html>
- <head><title>Foo</title></head>
- <body>
- <script> (function() {})() </script>
- Some text.
- </body>
- </html>
- """
-
- tree = decode_body(html, "http://example.com/test.html")
- og = parse_html_to_open_graph(tree)
-
- self.assertEqual(og, {"og:title": "Foo", "og:description": "Some text."})
-
- def test_missing_title(self) -> None:
- html = b"""
- <html>
- <body>
- Some text.
- </body>
- </html>
- """
-
- tree = decode_body(html, "http://example.com/test.html")
- og = parse_html_to_open_graph(tree)
-
- self.assertEqual(og, {"og:title": None, "og:description": "Some text."})
-
- # Another variant is a title with no content.
- html = b"""
- <html>
- <head><title></title></head>
- <body>
- <h1>Title</h1>
- </body>
- </html>
- """
-
- tree = decode_body(html, "http://example.com/test.html")
- og = parse_html_to_open_graph(tree)
-
- self.assertEqual(og, {"og:title": "Title", "og:description": "Title"})
-
- def test_h1_as_title(self) -> None:
- html = b"""
- <html>
- <meta property="og:description" content="Some text."/>
- <body>
- <h1>Title</h1>
- </body>
- </html>
- """
-
- tree = decode_body(html, "http://example.com/test.html")
- og = parse_html_to_open_graph(tree)
-
- self.assertEqual(og, {"og:title": "Title", "og:description": "Some text."})
-
- def test_empty_description(self) -> None:
- """Description tags with empty content should be ignored."""
- html = b"""
- <html>
- <meta property="og:description" content=""/>
- <meta property="og:description"/>
- <meta name="description" content=""/>
- <meta name="description"/>
- <meta name="description" content="Finally!"/>
- <body>
- <h1>Title</h1>
- </body>
- </html>
- """
-
- tree = decode_body(html, "http://example.com/test.html")
- og = parse_html_to_open_graph(tree)
-
- self.assertEqual(og, {"og:title": "Title", "og:description": "Finally!"})
-
- def test_missing_title_and_broken_h1(self) -> None:
- html = b"""
- <html>
- <body>
- <h1><a href="foo"/></h1>
- Some text.
- </body>
- </html>
- """
-
- tree = decode_body(html, "http://example.com/test.html")
- og = parse_html_to_open_graph(tree)
-
- self.assertEqual(og, {"og:title": None, "og:description": "Some text."})
-
- def test_empty(self) -> None:
- """Test a body with no data in it."""
- html = b""
- tree = decode_body(html, "http://example.com/test.html")
- self.assertIsNone(tree)
-
- def test_no_tree(self) -> None:
- """A valid body with no tree in it."""
- html = b"\x00"
- tree = decode_body(html, "http://example.com/test.html")
- self.assertIsNone(tree)
-
- def test_xml(self) -> None:
- """Test decoding XML and ensure it works properly."""
- # Note that the strip() call is important to ensure the xml tag starts
- # at the initial byte.
- html = b"""
- <?xml version="1.0" encoding="UTF-8"?>
-
- <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
- <html xmlns="http://www.w3.org/1999/xhtml" xml:lang="en" lang="en">
- <head><title>Foo</title></head><body>Some text.</body></html>
- """.strip()
- tree = decode_body(html, "http://example.com/test.html")
- og = parse_html_to_open_graph(tree)
- self.assertEqual(og, {"og:title": "Foo", "og:description": "Some text."})
-
- def test_invalid_encoding(self) -> None:
- """An invalid character encoding should be ignored and treated as UTF-8, if possible."""
- html = b"""
- <html>
- <head><title>Foo</title></head>
- <body>
- Some text.
- </body>
- </html>
- """
- tree = decode_body(html, "http://example.com/test.html", "invalid-encoding")
- og = parse_html_to_open_graph(tree)
- self.assertEqual(og, {"og:title": "Foo", "og:description": "Some text."})
-
- def test_invalid_encoding2(self) -> None:
- """A body which doesn't match the sent character encoding."""
- # Note that this contains an invalid UTF-8 sequence in the title.
- html = b"""
- <html>
- <head><title>\xff\xff Foo</title></head>
- <body>
- Some text.
- </body>
- </html>
- """
- tree = decode_body(html, "http://example.com/test.html")
- og = parse_html_to_open_graph(tree)
- self.assertEqual(og, {"og:title": "ÿÿ Foo", "og:description": "Some text."})
-
- def test_windows_1252(self) -> None:
- """A body which uses cp1252, but doesn't declare that."""
- html = b"""
- <html>
- <head><title>\xf3</title></head>
- <body>
- Some text.
- </body>
- </html>
- """
- tree = decode_body(html, "http://example.com/test.html")
- og = parse_html_to_open_graph(tree)
- self.assertEqual(og, {"og:title": "ó", "og:description": "Some text."})
-
- def test_twitter_tag(self) -> None:
- """Twitter card tags should be used if nothing else is available."""
- html = b"""
- <html>
- <meta name="twitter:card" content="summary">
- <meta name="twitter:description" content="Description">
- <meta name="twitter:site" content="@matrixdotorg">
- </html>
- """
- tree = decode_body(html, "http://example.com/test.html")
- og = parse_html_to_open_graph(tree)
- self.assertEqual(
- og,
- {
- "og:title": None,
- "og:description": "Description",
- "og:site_name": "@matrixdotorg",
- },
- )
-
- # But they shouldn't override Open Graph values.
- html = b"""
- <html>
- <meta name="twitter:card" content="summary">
- <meta name="twitter:description" content="Description">
- <meta property="og:description" content="Real Description">
- <meta name="twitter:site" content="@matrixdotorg">
- <meta property="og:site_name" content="matrix.org">
- </html>
- """
- tree = decode_body(html, "http://example.com/test.html")
- og = parse_html_to_open_graph(tree)
- self.assertEqual(
- og,
- {
- "og:title": None,
- "og:description": "Real Description",
- "og:site_name": "matrix.org",
- },
- )
-
- def test_nested_nodes(self) -> None:
- """A body with some nested nodes. Tests that we iterate over children
- in the right order (and don't reverse the order of the text)."""
- html = b"""
- <a href="somewhere">Welcome <b>the bold <u>and underlined text <svg>
- with a cheeky SVG</svg></u> and <strong>some</strong> tail text</b></a>
- """
- tree = decode_body(html, "http://example.com/test.html")
- og = parse_html_to_open_graph(tree)
- self.assertEqual(
- og,
- {
- "og:title": None,
- "og:description": "Welcome\n\nthe bold\n\nand underlined text\n\nand\n\nsome\n\ntail text",
- },
- )
-
-
-class MediaEncodingTestCase(unittest.TestCase):
- def test_meta_charset(self) -> None:
- """A character encoding is found via the meta tag."""
- encodings = _get_html_media_encodings(
- b"""
- <html>
- <head><meta charset="ascii">
- </head>
- </html>
- """,
- "text/html",
- )
- self.assertEqual(list(encodings), ["ascii", "utf-8", "cp1252"])
-
- # A less well-formed version.
- encodings = _get_html_media_encodings(
- b"""
- <html>
- <head>< meta charset = ascii>
- </head>
- </html>
- """,
- "text/html",
- )
- self.assertEqual(list(encodings), ["ascii", "utf-8", "cp1252"])
-
- def test_meta_charset_underscores(self) -> None:
- """A character encoding contains underscore."""
- encodings = _get_html_media_encodings(
- b"""
- <html>
- <head><meta charset="Shift_JIS">
- </head>
- </html>
- """,
- "text/html",
- )
- self.assertEqual(list(encodings), ["shift_jis", "utf-8", "cp1252"])
-
- def test_xml_encoding(self) -> None:
- """A character encoding is found via the meta tag."""
- encodings = _get_html_media_encodings(
- b"""
- <?xml version="1.0" encoding="ascii"?>
- <html>
- </html>
- """,
- "text/html",
- )
- self.assertEqual(list(encodings), ["ascii", "utf-8", "cp1252"])
-
- def test_meta_xml_encoding(self) -> None:
- """Meta tags take precedence over XML encoding."""
- encodings = _get_html_media_encodings(
- b"""
- <?xml version="1.0" encoding="ascii"?>
- <html>
- <head><meta charset="UTF-16">
- </head>
- </html>
- """,
- "text/html",
- )
- self.assertEqual(list(encodings), ["utf-16", "ascii", "utf-8", "cp1252"])
-
- def test_content_type(self) -> None:
- """A character encoding is found via the Content-Type header."""
- # Test a few variations of the header.
- headers = (
- 'text/html; charset="ascii";',
- "text/html;charset=ascii;",
- 'text/html; charset="ascii"',
- "text/html; charset=ascii",
- 'text/html; charset="ascii;',
- 'text/html; charset=ascii";',
- )
- for header in headers:
- encodings = _get_html_media_encodings(b"", header)
- self.assertEqual(list(encodings), ["ascii", "utf-8", "cp1252"])
-
- def test_fallback(self) -> None:
- """A character encoding cannot be found in the body or header."""
- encodings = _get_html_media_encodings(b"", "text/html")
- self.assertEqual(list(encodings), ["utf-8", "cp1252"])
-
- def test_duplicates(self) -> None:
- """Ensure each encoding is only attempted once."""
- encodings = _get_html_media_encodings(
- b"""
- <?xml version="1.0" encoding="utf8"?>
- <html>
- <head><meta charset="UTF-8">
- </head>
- </html>
- """,
- 'text/html; charset="UTF_8"',
- )
- self.assertEqual(list(encodings), ["utf-8", "cp1252"])
-
- def test_unknown_invalid(self) -> None:
- """A character encoding should be ignored if it is unknown or invalid."""
- encodings = _get_html_media_encodings(
- b"""
- <html>
- <head><meta charset="invalid">
- </head>
- </html>
- """,
- 'text/html; charset="invalid"',
- )
- self.assertEqual(list(encodings), ["utf-8", "cp1252"])
diff --git a/tests/rest/media/v1/test_media_storage.py b/tests/rest/media/v1/test_media_storage.py
deleted file mode 100644
index d18fc13c21..0000000000
--- a/tests/rest/media/v1/test_media_storage.py
+++ /dev/null
@@ -1,782 +0,0 @@
-# Copyright 2018-2021 The Matrix.org Foundation C.I.C.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-import os
-import shutil
-import tempfile
-from binascii import unhexlify
-from io import BytesIO
-from typing import Any, BinaryIO, Dict, List, Optional, Union
-from unittest.mock import Mock
-from urllib import parse
-
-import attr
-from parameterized import parameterized, parameterized_class
-from PIL import Image as Image
-from typing_extensions import Literal
-
-from twisted.internet import defer
-from twisted.internet.defer import Deferred
-from twisted.test.proto_helpers import MemoryReactor
-
-from synapse.api.errors import Codes
-from synapse.events import EventBase
-from synapse.events.spamcheck import load_legacy_spam_checkers
-from synapse.logging.context import make_deferred_yieldable
-from synapse.module_api import ModuleApi
-from synapse.rest import admin
-from synapse.rest.client import login
-from synapse.rest.media.v1._base import FileInfo
-from synapse.rest.media.v1.filepath import MediaFilePaths
-from synapse.rest.media.v1.media_storage import MediaStorage, ReadableFileWrapper
-from synapse.rest.media.v1.storage_provider import FileStorageProviderBackend
-from synapse.server import HomeServer
-from synapse.types import RoomAlias
-from synapse.util import Clock
-
-from tests import unittest
-from tests.server import FakeChannel, FakeSite, make_request
-from tests.test_utils import SMALL_PNG
-from tests.utils import default_config
-
-
-class MediaStorageTests(unittest.HomeserverTestCase):
-
- needs_threadpool = True
-
- def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
- self.test_dir = tempfile.mkdtemp(prefix="synapse-tests-")
- self.addCleanup(shutil.rmtree, self.test_dir)
-
- self.primary_base_path = os.path.join(self.test_dir, "primary")
- self.secondary_base_path = os.path.join(self.test_dir, "secondary")
-
- hs.config.media.media_store_path = self.primary_base_path
-
- storage_providers = [FileStorageProviderBackend(hs, self.secondary_base_path)]
-
- self.filepaths = MediaFilePaths(self.primary_base_path)
- self.media_storage = MediaStorage(
- hs, self.primary_base_path, self.filepaths, storage_providers
- )
-
- def test_ensure_media_is_in_local_cache(self) -> None:
- media_id = "some_media_id"
- test_body = "Test\n"
-
- # First we create a file that is in a storage provider but not in the
- # local primary media store
- rel_path = self.filepaths.local_media_filepath_rel(media_id)
- secondary_path = os.path.join(self.secondary_base_path, rel_path)
-
- os.makedirs(os.path.dirname(secondary_path))
-
- with open(secondary_path, "w") as f:
- f.write(test_body)
-
- # Now we run ensure_media_is_in_local_cache, which should copy the file
- # to the local cache.
- file_info = FileInfo(None, media_id)
-
- # This uses a real blocking threadpool so we have to wait for it to be
- # actually done :/
- x = defer.ensureDeferred(
- self.media_storage.ensure_media_is_in_local_cache(file_info)
- )
-
- # Hotloop until the threadpool does its job...
- self.wait_on_thread(x)
-
- local_path = self.get_success(x)
-
- self.assertTrue(os.path.exists(local_path))
-
- # Asserts the file is under the expected local cache directory
- self.assertEqual(
- os.path.commonprefix([self.primary_base_path, local_path]),
- self.primary_base_path,
- )
-
- with open(local_path) as f:
- body = f.read()
-
- self.assertEqual(test_body, body)
-
-
-@attr.s(auto_attribs=True, slots=True, frozen=True)
-class _TestImage:
- """An image for testing thumbnailing with the expected results
-
- Attributes:
- data: The raw image to thumbnail
- content_type: The type of the image as a content type, e.g. "image/png"
- extension: The extension associated with the format, e.g. ".png"
- expected_cropped: The expected bytes from cropped thumbnailing, or None if
- test should just check for success.
- expected_scaled: The expected bytes from scaled thumbnailing, or None if
- test should just check for a valid image returned.
- expected_found: True if the file should exist on the server, or False if
- a 404/400 is expected.
- unable_to_thumbnail: True if we expect the thumbnailing to fail (400), or
- False if the thumbnailing should succeed or a normal 404 is expected.
- """
-
- data: bytes
- content_type: bytes
- extension: bytes
- expected_cropped: Optional[bytes] = None
- expected_scaled: Optional[bytes] = None
- expected_found: bool = True
- unable_to_thumbnail: bool = False
-
-
-@parameterized_class(
- ("test_image",),
- [
- # small png
- (
- _TestImage(
- SMALL_PNG,
- b"image/png",
- b".png",
- unhexlify(
- b"89504e470d0a1a0a0000000d4948445200000020000000200806"
- b"000000737a7af40000001a49444154789cedc101010000008220"
- b"ffaf6e484001000000ef0610200001194334ee0000000049454e"
- b"44ae426082"
- ),
- unhexlify(
- b"89504e470d0a1a0a0000000d4948445200000001000000010806"
- b"0000001f15c4890000000d49444154789c636060606000000005"
- b"0001a5f645400000000049454e44ae426082"
- ),
- ),
- ),
- # small png with transparency.
- (
- _TestImage(
- unhexlify(
- b"89504e470d0a1a0a0000000d49484452000000010000000101000"
- b"00000376ef9240000000274524e5300010194fdae0000000a4944"
- b"4154789c636800000082008177cd72b60000000049454e44ae426"
- b"082"
- ),
- b"image/png",
- b".png",
- # Note that we don't check the output since it varies across
- # different versions of Pillow.
- ),
- ),
- # small lossless webp
- (
- _TestImage(
- unhexlify(
- b"524946461a000000574542505650384c0d0000002f0000001007"
- b"1011118888fe0700"
- ),
- b"image/webp",
- b".webp",
- ),
- ),
- # an empty file
- (
- _TestImage(
- b"",
- b"image/gif",
- b".gif",
- expected_found=False,
- unable_to_thumbnail=True,
- ),
- ),
- ],
-)
-class MediaRepoTests(unittest.HomeserverTestCase):
-
- hijack_auth = True
- user_id = "@test:user"
-
- def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
-
- self.fetches = []
-
- def get_file(
- destination: str,
- path: str,
- output_stream: BinaryIO,
- args: Optional[Dict[str, Union[str, List[str]]]] = None,
- max_size: Optional[int] = None,
- ) -> Deferred:
- """
- Returns tuple[int,dict,str,int] of file length, response headers,
- absolute URI, and response code.
- """
-
- def write_to(r):
- data, response = r
- output_stream.write(data)
- return response
-
- d = Deferred()
- d.addCallback(write_to)
- self.fetches.append((d, destination, path, args))
- return make_deferred_yieldable(d)
-
- client = Mock()
- client.get_file = get_file
-
- self.storage_path = self.mktemp()
- self.media_store_path = self.mktemp()
- os.mkdir(self.storage_path)
- os.mkdir(self.media_store_path)
-
- config = self.default_config()
- config["media_store_path"] = self.media_store_path
- config["max_image_pixels"] = 2000000
-
- provider_config = {
- "module": "synapse.rest.media.v1.storage_provider.FileStorageProviderBackend",
- "store_local": True,
- "store_synchronous": False,
- "store_remote": True,
- "config": {"directory": self.storage_path},
- }
- config["media_storage_providers"] = [provider_config]
-
- hs = self.setup_test_homeserver(config=config, federation_http_client=client)
-
- return hs
-
- def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
-
- media_resource = hs.get_media_repository_resource()
- self.download_resource = media_resource.children[b"download"]
- self.thumbnail_resource = media_resource.children[b"thumbnail"]
- self.store = hs.get_datastores().main
- self.media_repo = hs.get_media_repository()
-
- self.media_id = "example.com/12345"
-
- def _req(
- self, content_disposition: Optional[bytes], include_content_type: bool = True
- ) -> FakeChannel:
- channel = make_request(
- self.reactor,
- FakeSite(self.download_resource, self.reactor),
- "GET",
- self.media_id,
- shorthand=False,
- await_result=False,
- )
- self.pump()
-
- # We've made one fetch, to example.com, using the media URL, and asking
- # the other server not to do a remote fetch
- self.assertEqual(len(self.fetches), 1)
- self.assertEqual(self.fetches[0][1], "example.com")
- self.assertEqual(
- self.fetches[0][2], "/_matrix/media/r0/download/" + self.media_id
- )
- self.assertEqual(self.fetches[0][3], {"allow_remote": "false"})
-
- headers = {
- b"Content-Length": [b"%d" % (len(self.test_image.data))],
- }
-
- if include_content_type:
- headers[b"Content-Type"] = [self.test_image.content_type]
-
- if content_disposition:
- headers[b"Content-Disposition"] = [content_disposition]
-
- self.fetches[0][0].callback(
- (self.test_image.data, (len(self.test_image.data), headers))
- )
-
- self.pump()
- self.assertEqual(channel.code, 200)
-
- return channel
-
- def test_handle_missing_content_type(self) -> None:
- channel = self._req(
- b"inline; filename=out" + self.test_image.extension,
- include_content_type=False,
- )
- headers = channel.headers
- self.assertEqual(channel.code, 200)
- self.assertEqual(
- headers.getRawHeaders(b"Content-Type"), [b"application/octet-stream"]
- )
-
- def test_disposition_filename_ascii(self) -> None:
- """
- If the filename is filename=<ascii> then Synapse will decode it as an
- ASCII string, and use filename= in the response.
- """
- channel = self._req(b"inline; filename=out" + self.test_image.extension)
-
- headers = channel.headers
- self.assertEqual(
- headers.getRawHeaders(b"Content-Type"), [self.test_image.content_type]
- )
- self.assertEqual(
- headers.getRawHeaders(b"Content-Disposition"),
- [b"inline; filename=out" + self.test_image.extension],
- )
-
- def test_disposition_filenamestar_utf8escaped(self) -> None:
- """
- If the filename is filename=*utf8''<utf8 escaped> then Synapse will
- correctly decode it as the UTF-8 string, and use filename* in the
- response.
- """
- filename = parse.quote("\u2603".encode()).encode("ascii")
- channel = self._req(
- b"inline; filename*=utf-8''" + filename + self.test_image.extension
- )
-
- headers = channel.headers
- self.assertEqual(
- headers.getRawHeaders(b"Content-Type"), [self.test_image.content_type]
- )
- self.assertEqual(
- headers.getRawHeaders(b"Content-Disposition"),
- [b"inline; filename*=utf-8''" + filename + self.test_image.extension],
- )
-
- def test_disposition_none(self) -> None:
- """
- If there is no filename, one isn't passed on in the Content-Disposition
- of the request.
- """
- channel = self._req(None)
-
- headers = channel.headers
- self.assertEqual(
- headers.getRawHeaders(b"Content-Type"), [self.test_image.content_type]
- )
- self.assertEqual(headers.getRawHeaders(b"Content-Disposition"), None)
-
- def test_thumbnail_crop(self) -> None:
- """Test that a cropped remote thumbnail is available."""
- self._test_thumbnail(
- "crop",
- self.test_image.expected_cropped,
- expected_found=self.test_image.expected_found,
- unable_to_thumbnail=self.test_image.unable_to_thumbnail,
- )
-
- def test_thumbnail_scale(self) -> None:
- """Test that a scaled remote thumbnail is available."""
- self._test_thumbnail(
- "scale",
- self.test_image.expected_scaled,
- expected_found=self.test_image.expected_found,
- unable_to_thumbnail=self.test_image.unable_to_thumbnail,
- )
-
- def test_invalid_type(self) -> None:
- """An invalid thumbnail type is never available."""
- self._test_thumbnail(
- "invalid",
- None,
- expected_found=False,
- unable_to_thumbnail=self.test_image.unable_to_thumbnail,
- )
-
- @unittest.override_config(
- {"thumbnail_sizes": [{"width": 32, "height": 32, "method": "scale"}]}
- )
- def test_no_thumbnail_crop(self) -> None:
- """
- Override the config to generate only scaled thumbnails, but request a cropped one.
- """
- self._test_thumbnail(
- "crop",
- None,
- expected_found=False,
- unable_to_thumbnail=self.test_image.unable_to_thumbnail,
- )
-
- @unittest.override_config(
- {"thumbnail_sizes": [{"width": 32, "height": 32, "method": "crop"}]}
- )
- def test_no_thumbnail_scale(self) -> None:
- """
- Override the config to generate only cropped thumbnails, but request a scaled one.
- """
- self._test_thumbnail(
- "scale",
- None,
- expected_found=False,
- unable_to_thumbnail=self.test_image.unable_to_thumbnail,
- )
-
- def test_thumbnail_repeated_thumbnail(self) -> None:
- """Test that fetching the same thumbnail works, and deleting the on disk
- thumbnail regenerates it.
- """
- self._test_thumbnail(
- "scale",
- self.test_image.expected_scaled,
- expected_found=self.test_image.expected_found,
- unable_to_thumbnail=self.test_image.unable_to_thumbnail,
- )
-
- if not self.test_image.expected_found:
- return
-
- # Fetching again should work, without re-requesting the image from the
- # remote.
- params = "?width=32&height=32&method=scale"
- channel = make_request(
- self.reactor,
- FakeSite(self.thumbnail_resource, self.reactor),
- "GET",
- self.media_id + params,
- shorthand=False,
- await_result=False,
- )
- self.pump()
-
- self.assertEqual(channel.code, 200)
- if self.test_image.expected_scaled:
- self.assertEqual(
- channel.result["body"],
- self.test_image.expected_scaled,
- channel.result["body"],
- )
-
- # Deleting the thumbnail on disk then re-requesting it should work as
- # Synapse should regenerate missing thumbnails.
- origin, media_id = self.media_id.split("/")
- info = self.get_success(self.store.get_cached_remote_media(origin, media_id))
- file_id = info["filesystem_id"]
-
- thumbnail_dir = self.media_repo.filepaths.remote_media_thumbnail_dir(
- origin, file_id
- )
- shutil.rmtree(thumbnail_dir, ignore_errors=True)
-
- channel = make_request(
- self.reactor,
- FakeSite(self.thumbnail_resource, self.reactor),
- "GET",
- self.media_id + params,
- shorthand=False,
- await_result=False,
- )
- self.pump()
-
- self.assertEqual(channel.code, 200)
- if self.test_image.expected_scaled:
- self.assertEqual(
- channel.result["body"],
- self.test_image.expected_scaled,
- channel.result["body"],
- )
-
- def _test_thumbnail(
- self,
- method: str,
- expected_body: Optional[bytes],
- expected_found: bool,
- unable_to_thumbnail: bool = False,
- ) -> None:
- """Test the given thumbnailing method works as expected.
-
- Args:
- method: The thumbnailing method to use (crop, scale).
- expected_body: The expected bytes from thumbnailing, or None if
- test should just check for a valid image.
- expected_found: True if the file should exist on the server, or False if
- a 404/400 is expected.
- unable_to_thumbnail: True if we expect the thumbnailing to fail (400), or
- False if the thumbnailing should succeed or a normal 404 is expected.
- """
-
- params = "?width=32&height=32&method=" + method
- channel = make_request(
- self.reactor,
- FakeSite(self.thumbnail_resource, self.reactor),
- "GET",
- self.media_id + params,
- shorthand=False,
- await_result=False,
- )
- self.pump()
-
- headers = {
- b"Content-Length": [b"%d" % (len(self.test_image.data))],
- b"Content-Type": [self.test_image.content_type],
- }
- self.fetches[0][0].callback(
- (self.test_image.data, (len(self.test_image.data), headers))
- )
- self.pump()
-
- if expected_found:
- self.assertEqual(channel.code, 200)
-
- self.assertEqual(
- channel.headers.getRawHeaders(b"Cross-Origin-Resource-Policy"),
- [b"cross-origin"],
- )
-
- if expected_body is not None:
- self.assertEqual(
- channel.result["body"], expected_body, channel.result["body"]
- )
- else:
- # ensure that the result is at least some valid image
- Image.open(BytesIO(channel.result["body"]))
- elif unable_to_thumbnail:
- # A 400 with a JSON body.
- self.assertEqual(channel.code, 400)
- self.assertEqual(
- channel.json_body,
- {
- "errcode": "M_UNKNOWN",
- "error": "Cannot find any thumbnails for the requested media ([b'example.com', b'12345']). This might mean the media is not a supported_media_format=(image/jpeg, image/jpg, image/webp, image/gif, image/png) or that thumbnailing failed for some other reason. (Dynamic thumbnails are disabled on this server.)",
- },
- )
- else:
- # A 404 with a JSON body.
- self.assertEqual(channel.code, 404)
- self.assertEqual(
- channel.json_body,
- {
- "errcode": "M_NOT_FOUND",
- "error": "Not found [b'example.com', b'12345']",
- },
- )
-
- @parameterized.expand([("crop", 16), ("crop", 64), ("scale", 16), ("scale", 64)])
- def test_same_quality(self, method: str, desired_size: int) -> None:
- """Test that choosing between thumbnails with the same quality rating succeeds.
-
- We are not particular about which thumbnail is chosen."""
- self.assertIsNotNone(
- self.thumbnail_resource._select_thumbnail(
- desired_width=desired_size,
- desired_height=desired_size,
- desired_method=method,
- desired_type=self.test_image.content_type,
- # Provide two identical thumbnails which are guaranteed to have the same
- # quality rating.
- thumbnail_infos=[
- {
- "thumbnail_width": 32,
- "thumbnail_height": 32,
- "thumbnail_method": method,
- "thumbnail_type": self.test_image.content_type,
- "thumbnail_length": 256,
- "filesystem_id": f"thumbnail1{self.test_image.extension}",
- },
- {
- "thumbnail_width": 32,
- "thumbnail_height": 32,
- "thumbnail_method": method,
- "thumbnail_type": self.test_image.content_type,
- "thumbnail_length": 256,
- "filesystem_id": f"thumbnail2{self.test_image.extension}",
- },
- ],
- file_id=f"image{self.test_image.extension}",
- url_cache=None,
- server_name=None,
- )
- )
-
- def test_x_robots_tag_header(self) -> None:
- """
- Tests that the `X-Robots-Tag` header is present, which informs web crawlers
- to not index, archive, or follow links in media.
- """
- channel = self._req(b"inline; filename=out" + self.test_image.extension)
-
- headers = channel.headers
- self.assertEqual(
- headers.getRawHeaders(b"X-Robots-Tag"),
- [b"noindex, nofollow, noarchive, noimageindex"],
- )
-
- def test_cross_origin_resource_policy_header(self) -> None:
- """
- Test that the Cross-Origin-Resource-Policy header is set to "cross-origin"
- allowing web clients to embed media from the downloads API.
- """
- channel = self._req(b"inline; filename=out" + self.test_image.extension)
-
- headers = channel.headers
-
- self.assertEqual(
- headers.getRawHeaders(b"Cross-Origin-Resource-Policy"),
- [b"cross-origin"],
- )
-
-
-class TestSpamCheckerLegacy:
- """A spam checker module that rejects all media that includes the bytes
- `evil`.
-
- Uses the legacy Spam-Checker API.
- """
-
- def __init__(self, config: Dict[str, Any], api: ModuleApi) -> None:
- self.config = config
- self.api = api
-
- def parse_config(config: Dict[str, Any]) -> Dict[str, Any]:
- return config
-
- async def check_event_for_spam(self, event: EventBase) -> Union[bool, str]:
- return False # allow all events
-
- async def user_may_invite(
- self,
- inviter_userid: str,
- invitee_userid: str,
- room_id: str,
- ) -> bool:
- return True # allow all invites
-
- async def user_may_create_room(self, userid: str) -> bool:
- return True # allow all room creations
-
- async def user_may_create_room_alias(
- self, userid: str, room_alias: RoomAlias
- ) -> bool:
- return True # allow all room aliases
-
- async def user_may_publish_room(self, userid: str, room_id: str) -> bool:
- return True # allow publishing of all rooms
-
- async def check_media_file_for_spam(
- self, file_wrapper: ReadableFileWrapper, file_info: FileInfo
- ) -> bool:
- buf = BytesIO()
- await file_wrapper.write_chunks_to(buf.write)
-
- return b"evil" in buf.getvalue()
-
-
-class SpamCheckerTestCaseLegacy(unittest.HomeserverTestCase):
- servlets = [
- login.register_servlets,
- admin.register_servlets,
- ]
-
- def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
- self.user = self.register_user("user", "pass")
- self.tok = self.login("user", "pass")
-
- # Allow for uploading and downloading to/from the media repo
- self.media_repo = hs.get_media_repository_resource()
- self.download_resource = self.media_repo.children[b"download"]
- self.upload_resource = self.media_repo.children[b"upload"]
-
- load_legacy_spam_checkers(hs)
-
- def default_config(self) -> Dict[str, Any]:
- config = default_config("test")
-
- config.update(
- {
- "spam_checker": [
- {
- "module": TestSpamCheckerLegacy.__module__
- + ".TestSpamCheckerLegacy",
- "config": {},
- }
- ]
- }
- )
-
- return config
-
- def test_upload_innocent(self) -> None:
- """Attempt to upload some innocent data that should be allowed."""
- self.helper.upload_media(
- self.upload_resource, SMALL_PNG, tok=self.tok, expect_code=200
- )
-
- def test_upload_ban(self) -> None:
- """Attempt to upload some data that includes bytes "evil", which should
- get rejected by the spam checker.
- """
-
- data = b"Some evil data"
-
- self.helper.upload_media(
- self.upload_resource, data, tok=self.tok, expect_code=400
- )
-
-
-EVIL_DATA = b"Some evil data"
-EVIL_DATA_EXPERIMENT = b"Some evil data to trigger the experimental tuple API"
-
-
-class SpamCheckerTestCase(unittest.HomeserverTestCase):
- servlets = [
- login.register_servlets,
- admin.register_servlets,
- ]
-
- def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
- self.user = self.register_user("user", "pass")
- self.tok = self.login("user", "pass")
-
- # Allow for uploading and downloading to/from the media repo
- self.media_repo = hs.get_media_repository_resource()
- self.download_resource = self.media_repo.children[b"download"]
- self.upload_resource = self.media_repo.children[b"upload"]
-
- hs.get_module_api().register_spam_checker_callbacks(
- check_media_file_for_spam=self.check_media_file_for_spam
- )
-
- async def check_media_file_for_spam(
- self, file_wrapper: ReadableFileWrapper, file_info: FileInfo
- ) -> Union[Codes, Literal["NOT_SPAM"]]:
- buf = BytesIO()
- await file_wrapper.write_chunks_to(buf.write)
-
- if buf.getvalue() == EVIL_DATA:
- return Codes.FORBIDDEN
- elif buf.getvalue() == EVIL_DATA_EXPERIMENT:
- return (Codes.FORBIDDEN, {})
- else:
- return "NOT_SPAM"
-
- def test_upload_innocent(self) -> None:
- """Attempt to upload some innocent data that should be allowed."""
- self.helper.upload_media(
- self.upload_resource, SMALL_PNG, tok=self.tok, expect_code=200
- )
-
- def test_upload_ban(self) -> None:
- """Attempt to upload some data that includes bytes "evil", which should
- get rejected by the spam checker.
- """
-
- self.helper.upload_media(
- self.upload_resource, EVIL_DATA, tok=self.tok, expect_code=400
- )
-
- self.helper.upload_media(
- self.upload_resource,
- EVIL_DATA_EXPERIMENT,
- tok=self.tok,
- expect_code=400,
- )
diff --git a/tests/rest/media/v1/test_oembed.py b/tests/rest/media/v1/test_oembed.py
deleted file mode 100644
index 3f7f1dbab9..0000000000
--- a/tests/rest/media/v1/test_oembed.py
+++ /dev/null
@@ -1,162 +0,0 @@
-# Copyright 2021 The Matrix.org Foundation C.I.C.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import json
-
-from parameterized import parameterized
-
-from twisted.test.proto_helpers import MemoryReactor
-
-from synapse.rest.media.v1.oembed import OEmbedProvider, OEmbedResult
-from synapse.server import HomeServer
-from synapse.types import JsonDict
-from synapse.util import Clock
-
-from tests.unittest import HomeserverTestCase
-
-try:
- import lxml
-except ImportError:
- lxml = None
-
-
-class OEmbedTests(HomeserverTestCase):
- if not lxml:
- skip = "url preview feature requires lxml"
-
- def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
- self.oembed = OEmbedProvider(hs)
-
- def parse_response(self, response: JsonDict) -> OEmbedResult:
- return self.oembed.parse_oembed_response(
- "https://test", json.dumps(response).encode("utf-8")
- )
-
- def test_version(self) -> None:
- """Accept versions that are similar to 1.0 as a string or int (or missing)."""
- for version in ("1.0", 1.0, 1):
- result = self.parse_response({"version": version})
- # An empty Open Graph response is an error, ensure the URL is included.
- self.assertIn("og:url", result.open_graph_result)
-
- # A missing version should be treated as 1.0.
- result = self.parse_response({"type": "link"})
- self.assertIn("og:url", result.open_graph_result)
-
- # Invalid versions should be rejected.
- for version in ("2.0", "1", 1.1, 0, None, {}, []):
- result = self.parse_response({"version": version, "type": "link"})
- # An empty Open Graph response is an error, ensure the URL is included.
- self.assertEqual({}, result.open_graph_result)
-
- def test_cache_age(self) -> None:
- """Ensure a cache-age is parsed properly."""
- # Correct-ish cache ages are allowed.
- for cache_age in ("1", 1.0, 1):
- result = self.parse_response({"cache_age": cache_age})
- self.assertEqual(result.cache_age, 1000)
-
- # Invalid cache ages are ignored.
- for cache_age in ("invalid", {}):
- result = self.parse_response({"cache_age": cache_age})
- self.assertIsNone(result.cache_age)
-
- # Cache age is optional.
- result = self.parse_response({})
- self.assertIsNone(result.cache_age)
-
- @parameterized.expand(
- [
- ("title", "title"),
- ("provider_name", "site_name"),
- ("thumbnail_url", "image"),
- ],
- name_func=lambda func, num, p: f"{func.__name__}_{p.args[0]}",
- )
- def test_property(self, oembed_property: str, open_graph_property: str) -> None:
- """Test properties which must be strings."""
- result = self.parse_response({oembed_property: "test"})
- self.assertIn(f"og:{open_graph_property}", result.open_graph_result)
- self.assertEqual(result.open_graph_result[f"og:{open_graph_property}"], "test")
-
- result = self.parse_response({oembed_property: 1})
- self.assertNotIn(f"og:{open_graph_property}", result.open_graph_result)
-
- def test_author_name(self) -> None:
- """Test the author_name property."""
- result = self.parse_response({"author_name": "test"})
- self.assertEqual(result.author_name, "test")
-
- result = self.parse_response({"author_name": 1})
- self.assertIsNone(result.author_name)
-
- def test_rich(self) -> None:
- """Test a type of rich."""
- result = self.parse_response({"html": "test<img src='foo'>", "type": "rich"})
- self.assertIn("og:description", result.open_graph_result)
- self.assertIn("og:image", result.open_graph_result)
- self.assertEqual(result.open_graph_result["og:description"], "test")
- self.assertEqual(result.open_graph_result["og:image"], "foo")
-
- result = self.parse_response({"type": "rich"})
- self.assertNotIn("og:description", result.open_graph_result)
-
- result = self.parse_response({"html": 1, "type": "rich"})
- self.assertNotIn("og:description", result.open_graph_result)
-
- def test_photo(self) -> None:
- """Test a type of photo."""
- result = self.parse_response({"url": "test", "type": "photo"})
- self.assertIn("og:image", result.open_graph_result)
- self.assertEqual(result.open_graph_result["og:image"], "test")
-
- result = self.parse_response({"type": "photo"})
- self.assertNotIn("og:image", result.open_graph_result)
-
- result = self.parse_response({"url": 1, "type": "photo"})
- self.assertNotIn("og:image", result.open_graph_result)
-
- def test_video(self) -> None:
- """Test a type of video."""
- result = self.parse_response({"html": "test", "type": "video"})
- self.assertIn("og:type", result.open_graph_result)
- self.assertEqual(result.open_graph_result["og:type"], "video.other")
- self.assertIn("og:description", result.open_graph_result)
- self.assertEqual(result.open_graph_result["og:description"], "test")
-
- result = self.parse_response({"type": "video"})
- self.assertIn("og:type", result.open_graph_result)
- self.assertEqual(result.open_graph_result["og:type"], "video.other")
- self.assertNotIn("og:description", result.open_graph_result)
-
- result = self.parse_response({"url": 1, "type": "video"})
- self.assertIn("og:type", result.open_graph_result)
- self.assertEqual(result.open_graph_result["og:type"], "video.other")
- self.assertNotIn("og:description", result.open_graph_result)
-
- def test_link(self) -> None:
- """Test type of link."""
- result = self.parse_response({"type": "link"})
- self.assertIn("og:type", result.open_graph_result)
- self.assertEqual(result.open_graph_result["og:type"], "website")
-
- def test_title_html_entities(self) -> None:
- """Test HTML entities in title"""
- result = self.parse_response(
- {"title": "Why JSON isn’t a Good Configuration Language"}
- )
- self.assertEqual(
- result.open_graph_result["og:title"],
- "Why JSON isn’t a Good Configuration Language",
- )
|