diff --git a/changelog.d/14089.bugfix b/changelog.d/14089.bugfix
new file mode 100644
index 0000000000..4a398921bb
--- /dev/null
+++ b/changelog.d/14089.bugfix
@@ -0,0 +1 @@
+Fix a bug where invalid oEmbed fields would cause the entire response to be discarded. Introduced in Synapse 1.18.0.
diff --git a/synapse/rest/media/v1/oembed.py b/synapse/rest/media/v1/oembed.py
index 2177b46c9e..827afd868d 100644
--- a/synapse/rest/media/v1/oembed.py
+++ b/synapse/rest/media/v1/oembed.py
@@ -139,65 +139,72 @@ class OEmbedProvider:
try:
# oEmbed responses *must* be UTF-8 according to the spec.
oembed = json_decoder.decode(raw_body.decode("utf-8"))
+ except ValueError:
+ return OEmbedResult({}, None, None)
- # The version is a required string field, but not always provided,
- # or sometimes provided as a float. Be lenient.
- oembed_version = oembed.get("version", "1.0")
- if oembed_version != "1.0" and oembed_version != 1:
- raise RuntimeError(f"Invalid oEmbed version: {oembed_version}")
+ # The version is a required string field, but not always provided,
+ # or sometimes provided as a float. Be lenient.
+ oembed_version = oembed.get("version", "1.0")
+ if oembed_version != "1.0" and oembed_version != 1:
+ return OEmbedResult({}, None, None)
- # Ensure the cache age is None or an int.
- cache_age = oembed.get("cache_age")
- if cache_age:
- cache_age = int(cache_age) * 1000
-
- # The results.
- open_graph_response = {
- "og:url": url,
- }
-
- title = oembed.get("title")
- if title:
- open_graph_response["og:title"] = title
-
- author_name = oembed.get("author_name")
+ # Attempt to parse the cache age, if possible.
+ try:
+ cache_age = int(oembed.get("cache_age")) * 1000
+ except (TypeError, ValueError):
+ # If the cache age cannot be parsed (e.g. wrong type or invalid
+ # string), ignore it.
+ cache_age = None
- # Use the provider name and as the site.
- provider_name = oembed.get("provider_name")
- if provider_name:
- open_graph_response["og:site_name"] = provider_name
+ # The oEmbed response converted to Open Graph.
+ open_graph_response: JsonDict = {"og:url": url}
- # If a thumbnail exists, use it. Note that dimensions will be calculated later.
- if "thumbnail_url" in oembed:
- open_graph_response["og:image"] = oembed["thumbnail_url"]
+ title = oembed.get("title")
+ if title and isinstance(title, str):
+ open_graph_response["og:title"] = title
- # Process each type separately.
- oembed_type = oembed["type"]
- if oembed_type == "rich":
- calc_description_and_urls(open_graph_response, oembed["html"])
-
- elif oembed_type == "photo":
- # If this is a photo, use the full image, not the thumbnail.
- open_graph_response["og:image"] = oembed["url"]
+ author_name = oembed.get("author_name")
+ if not isinstance(author_name, str):
+ author_name = None
- elif oembed_type == "video":
- open_graph_response["og:type"] = "video.other"
+ # Use the provider name and as the site.
+ provider_name = oembed.get("provider_name")
+ if provider_name and isinstance(provider_name, str):
+ open_graph_response["og:site_name"] = provider_name
+
+ # If a thumbnail exists, use it. Note that dimensions will be calculated later.
+ thumbnail_url = oembed.get("thumbnail_url")
+ if thumbnail_url and isinstance(thumbnail_url, str):
+ open_graph_response["og:image"] = thumbnail_url
+
+ # Process each type separately.
+ oembed_type = oembed.get("type")
+ if oembed_type == "rich":
+ html = oembed.get("html")
+ if isinstance(html, str):
+ calc_description_and_urls(open_graph_response, html)
+
+ elif oembed_type == "photo":
+ # If this is a photo, use the full image, not the thumbnail.
+ url = oembed.get("url")
+ if url and isinstance(url, str):
+ open_graph_response["og:image"] = url
+
+ elif oembed_type == "video":
+ open_graph_response["og:type"] = "video.other"
+ html = oembed.get("html")
+ if html and isinstance(html, str):
calc_description_and_urls(open_graph_response, oembed["html"])
- open_graph_response["og:video:width"] = oembed["width"]
- open_graph_response["og:video:height"] = oembed["height"]
-
- elif oembed_type == "link":
- open_graph_response["og:type"] = "website"
+ for size in ("width", "height"):
+ val = oembed.get(size)
+ if val is not None and isinstance(val, int):
+ open_graph_response[f"og:video:{size}"] = val
- else:
- raise RuntimeError(f"Unknown oEmbed type: {oembed_type}")
+ elif oembed_type == "link":
+ open_graph_response["og:type"] = "website"
- except Exception as e:
- # Trap any exception and let the code follow as usual.
- logger.warning("Error parsing oEmbed metadata from %s: %r", url, e)
- open_graph_response = {}
- author_name = None
- cache_age = None
+ else:
+ logger.warning("Unknown oEmbed type: %s", oembed_type)
return OEmbedResult(open_graph_response, author_name, cache_age)
diff --git a/tests/rest/media/v1/test_oembed.py b/tests/rest/media/v1/test_oembed.py
index f38d7225f8..319ae8b1cc 100644
--- a/tests/rest/media/v1/test_oembed.py
+++ b/tests/rest/media/v1/test_oembed.py
@@ -14,6 +14,8 @@
import json
+from parameterized import parameterized
+
from twisted.test.proto_helpers import MemoryReactor
from synapse.rest.media.v1.oembed import OEmbedProvider, OEmbedResult
@@ -23,8 +25,16 @@ from synapse.util import Clock
from tests.unittest import HomeserverTestCase
+try:
+ import lxml
+except ImportError:
+ lxml = None
+
class OEmbedTests(HomeserverTestCase):
+ if not lxml:
+ skip = "url preview feature requires lxml"
+
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.oembed = OEmbedProvider(hs)
@@ -36,7 +46,7 @@ class OEmbedTests(HomeserverTestCase):
def test_version(self) -> None:
"""Accept versions that are similar to 1.0 as a string or int (or missing)."""
for version in ("1.0", 1.0, 1):
- result = self.parse_response({"version": version, "type": "link"})
+ result = self.parse_response({"version": version})
# An empty Open Graph response is an error, ensure the URL is included.
self.assertIn("og:url", result.open_graph_result)
@@ -49,3 +59,94 @@ class OEmbedTests(HomeserverTestCase):
result = self.parse_response({"version": version, "type": "link"})
# An empty Open Graph response is an error, ensure the URL is included.
self.assertEqual({}, result.open_graph_result)
+
+ def test_cache_age(self) -> None:
+ """Ensure a cache-age is parsed properly."""
+ # Correct-ish cache ages are allowed.
+ for cache_age in ("1", 1.0, 1):
+ result = self.parse_response({"cache_age": cache_age})
+ self.assertEqual(result.cache_age, 1000)
+
+ # Invalid cache ages are ignored.
+ for cache_age in ("invalid", {}):
+ result = self.parse_response({"cache_age": cache_age})
+ self.assertIsNone(result.cache_age)
+
+ # Cache age is optional.
+ result = self.parse_response({})
+ self.assertIsNone(result.cache_age)
+
+ @parameterized.expand(
+ [
+ ("title", "title"),
+ ("provider_name", "site_name"),
+ ("thumbnail_url", "image"),
+ ],
+ name_func=lambda func, num, p: f"{func.__name__}_{p.args[0]}",
+ )
+ def test_property(self, oembed_property: str, open_graph_property: str) -> None:
+ """Test properties which must be strings."""
+ result = self.parse_response({oembed_property: "test"})
+ self.assertIn(f"og:{open_graph_property}", result.open_graph_result)
+ self.assertEqual(result.open_graph_result[f"og:{open_graph_property}"], "test")
+
+ result = self.parse_response({oembed_property: 1})
+ self.assertNotIn(f"og:{open_graph_property}", result.open_graph_result)
+
+ def test_author_name(self) -> None:
+ """Test the author_name property."""
+ result = self.parse_response({"author_name": "test"})
+ self.assertEqual(result.author_name, "test")
+
+ result = self.parse_response({"author_name": 1})
+ self.assertIsNone(result.author_name)
+
+ def test_rich(self) -> None:
+ """Test a type of rich."""
+ result = self.parse_response({"html": "test<img src='foo'>", "type": "rich"})
+ self.assertIn("og:description", result.open_graph_result)
+ self.assertIn("og:image", result.open_graph_result)
+ self.assertEqual(result.open_graph_result["og:description"], "test")
+ self.assertEqual(result.open_graph_result["og:image"], "foo")
+
+ result = self.parse_response({"type": "rich"})
+ self.assertNotIn("og:description", result.open_graph_result)
+
+ result = self.parse_response({"html": 1, "type": "rich"})
+ self.assertNotIn("og:description", result.open_graph_result)
+
+ def test_photo(self) -> None:
+ """Test a type of photo."""
+ result = self.parse_response({"url": "test", "type": "photo"})
+ self.assertIn("og:image", result.open_graph_result)
+ self.assertEqual(result.open_graph_result["og:image"], "test")
+
+ result = self.parse_response({"type": "photo"})
+ self.assertNotIn("og:image", result.open_graph_result)
+
+ result = self.parse_response({"url": 1, "type": "photo"})
+ self.assertNotIn("og:image", result.open_graph_result)
+
+ def test_video(self) -> None:
+ """Test a type of video."""
+ result = self.parse_response({"html": "test", "type": "video"})
+ self.assertIn("og:type", result.open_graph_result)
+ self.assertEqual(result.open_graph_result["og:type"], "video.other")
+ self.assertIn("og:description", result.open_graph_result)
+ self.assertEqual(result.open_graph_result["og:description"], "test")
+
+ result = self.parse_response({"type": "video"})
+ self.assertIn("og:type", result.open_graph_result)
+ self.assertEqual(result.open_graph_result["og:type"], "video.other")
+ self.assertNotIn("og:description", result.open_graph_result)
+
+ result = self.parse_response({"url": 1, "type": "video"})
+ self.assertIn("og:type", result.open_graph_result)
+ self.assertEqual(result.open_graph_result["og:type"], "video.other")
+ self.assertNotIn("og:description", result.open_graph_result)
+
+ def test_link(self) -> None:
+ """Test type of link."""
+ result = self.parse_response({"type": "link"})
+ self.assertIn("og:type", result.open_graph_result)
+ self.assertEqual(result.open_graph_result["og:type"], "website")
|