diff options
Diffstat (limited to 'synapse/rest/media/v1/oembed.py')
-rw-r--r-- | synapse/rest/media/v1/oembed.py | 145 |
1 files changed, 88 insertions, 57 deletions
diff --git a/synapse/rest/media/v1/oembed.py b/synapse/rest/media/v1/oembed.py index 2e6706dbfa..8b74e72655 100644 --- a/synapse/rest/media/v1/oembed.py +++ b/synapse/rest/media/v1/oembed.py @@ -12,11 +12,14 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging +import urllib.parse from typing import TYPE_CHECKING, Optional import attr from synapse.http.client import SimpleHttpClient +from synapse.types import JsonDict +from synapse.util import json_decoder if TYPE_CHECKING: from synapse.server import HomeServer @@ -24,18 +27,15 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) -@attr.s(slots=True, auto_attribs=True) +@attr.s(slots=True, frozen=True, auto_attribs=True) class OEmbedResult: - # Either HTML content or URL must be provided. - html: Optional[str] - url: Optional[str] - title: Optional[str] - # Number of seconds to cache the content. - cache_age: int - - -class OEmbedError(Exception): - """An error occurred processing the oEmbed object.""" + # The Open Graph result (converted from the oEmbed result). + open_graph_result: JsonDict + # Number of seconds to cache the content, according to the oEmbed response. + # + # This will be None if no cache-age is provided in the oEmbed response (or + # if the oEmbed response cannot be turned into an Open Graph response). + cache_age: Optional[int] class OEmbedProvider: @@ -81,75 +81,106 @@ class OEmbedProvider: """ for url_pattern, endpoint in self._oembed_patterns.items(): if url_pattern.fullmatch(url): - return endpoint + # TODO Specify max height / width. + + # Note that only the JSON format is supported, some endpoints want + # this in the URL, others want it as an argument. + endpoint = endpoint.replace("{format}", "json") + + args = {"url": url, "format": "json"} + query_str = urllib.parse.urlencode(args, True) + return f"{endpoint}?{query_str}" # No match. return None - async def get_oembed_content(self, endpoint: str, url: str) -> OEmbedResult: + def parse_oembed_response(self, url: str, raw_body: bytes) -> OEmbedResult: """ - Request content from an oEmbed endpoint. + Parse the oEmbed response into an Open Graph response. Args: - endpoint: The oEmbed API endpoint. - url: The URL to pass to the API. + url: The URL which is being previewed (not the one which was + requested). + raw_body: The oEmbed response as JSON encoded as bytes. Returns: - An object representing the metadata returned. - - Raises: - OEmbedError if fetching or parsing of the oEmbed information fails. + json-encoded Open Graph data """ - try: - logger.debug("Trying to get oEmbed content for url '%s'", url) - # Note that only the JSON format is supported, some endpoints want - # this in the URL, others want it as an argument. - endpoint = endpoint.replace("{format}", "json") - - result = await self._client.get_json( - endpoint, - # TODO Specify max height / width. - args={"url": url, "format": "json"}, - ) + try: + # oEmbed responses *must* be UTF-8 according to the spec. + oembed = json_decoder.decode(raw_body.decode("utf-8")) # Ensure there's a version of 1.0. - if result.get("version") != "1.0": - raise OEmbedError("Invalid version: %s" % (result.get("version"),)) - - oembed_type = result.get("type") + oembed_version = oembed["version"] + if oembed_version != "1.0": + raise RuntimeError(f"Invalid version: {oembed_version}") # Ensure the cache age is None or an int. - cache_age = result.get("cache_age") + cache_age = oembed.get("cache_age") if cache_age: cache_age = int(cache_age) - oembed_result = OEmbedResult(None, None, result.get("title"), cache_age) + # The results. + open_graph_response = {"og:title": oembed.get("title")} - # HTML content. + # If a thumbnail exists, use it. Note that dimensions will be calculated later. + if "thumbnail_url" in oembed: + open_graph_response["og:image"] = oembed["thumbnail_url"] + + # Process each type separately. + oembed_type = oembed["type"] if oembed_type == "rich": - oembed_result.html = result.get("html") - return oembed_result + calc_description_and_urls(open_graph_response, oembed["html"]) - if oembed_type == "photo": - oembed_result.url = result.get("url") - return oembed_result + elif oembed_type == "photo": + # If this is a photo, use the full image, not the thumbnail. + open_graph_response["og:image"] = oembed["url"] - # TODO Handle link and video types. + else: + raise RuntimeError(f"Unknown oEmbed type: {oembed_type}") - if "thumbnail_url" in result: - oembed_result.url = result.get("thumbnail_url") - return oembed_result + except Exception as e: + # Trap any exception and let the code follow as usual. + logger.warning(f"Error parsing oEmbed metadata from {url}: {e:r}") + open_graph_response = {} + cache_age = None - raise OEmbedError("Incompatible oEmbed information.") + return OEmbedResult(open_graph_response, cache_age) - except OEmbedError as e: - # Trap OEmbedErrors first so we can directly re-raise them. - logger.warning("Error parsing oEmbed metadata from %s: %r", url, e) - raise - except Exception as e: - # Trap any exception and let the code follow as usual. - # FIXME: pass through 404s and other error messages nicely - logger.warning("Error downloading oEmbed metadata from %s: %r", url, e) - raise OEmbedError() from e +def calc_description_and_urls(open_graph_response: JsonDict, html_body: str) -> None: + """ + Calculate description for an HTML document. + + This uses lxml to convert the HTML document into plaintext. If errors + occur during processing of the document, an empty response is returned. + + Args: + open_graph_response: The current Open Graph summary. This is updated with additional fields. + html_body: The HTML document, as bytes. + + Returns: + The summary + """ + # If there's no body, nothing useful is going to be found. + if not html_body: + return + + from lxml import etree + + # Create an HTML parser. If this fails, log and return no metadata. + parser = etree.HTMLParser(recover=True, encoding="utf-8") + + # Attempt to parse the body. If this fails, log and return no metadata. + tree = etree.fromstring(html_body, parser) + + # The data was successfully parsed, but no tree was found. + if tree is None: + return + + from synapse.rest.media.v1.preview_url_resource import _calc_description + + description = _calc_description(tree) + if description: + open_graph_response["og:description"] = description |