diff options
author | Shay <hillerys@element.io> | 2024-06-07 05:54:28 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-06-07 13:54:28 +0100 |
commit | ab94bce02cc6c268d8c3b693cbbbacd8ef926481 (patch) | |
tree | 369dbcbb397a59e29347aa19b09fb4749819def2 /synapse | |
parent | Add debug logging for when room keys are uploaded, including whether they are... (diff) | |
download | synapse-ab94bce02cc6c268d8c3b693cbbbacd8ef926481.tar.xz |
Support MSC3916 by adding a federation `/download` endpoint (#17172)
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/federation/transport/server/__init__.py | 24 | ||||
-rw-r--r-- | synapse/federation/transport/server/_base.py | 24 | ||||
-rw-r--r-- | synapse/federation/transport/server/federation.py | 41 | ||||
-rw-r--r-- | synapse/media/_base.py | 63 | ||||
-rw-r--r-- | synapse/media/media_repository.py | 18 | ||||
-rw-r--r-- | synapse/media/media_storage.py | 223 | ||||
-rw-r--r-- | synapse/media/storage_provider.py | 40 |
7 files changed, 411 insertions, 22 deletions
diff --git a/synapse/federation/transport/server/__init__.py b/synapse/federation/transport/server/__init__.py index bac569e977..266675c9b8 100644 --- a/synapse/federation/transport/server/__init__.py +++ b/synapse/federation/transport/server/__init__.py @@ -19,6 +19,7 @@ # [This file includes modifications made by New Vector Limited] # # +import inspect import logging from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Tuple, Type @@ -33,6 +34,7 @@ from synapse.federation.transport.server.federation import ( FEDERATION_SERVLET_CLASSES, FederationAccountStatusServlet, FederationUnstableClientKeysClaimServlet, + FederationUnstableMediaDownloadServlet, ) from synapse.http.server import HttpServer, JsonResource from synapse.http.servlet import ( @@ -315,6 +317,28 @@ def register_servlets( ): continue + if servletclass == FederationUnstableMediaDownloadServlet: + if ( + not hs.config.server.enable_media_repo + or not hs.config.experimental.msc3916_authenticated_media_enabled + ): + continue + + # don't load the endpoint if the storage provider is incompatible + media_repo = hs.get_media_repository() + load_download_endpoint = True + for provider in media_repo.media_storage.storage_providers: + signature = inspect.signature(provider.backend.fetch) + if "federation" not in signature.parameters: + logger.warning( + f"Federation media `/download` endpoint will not be enabled as storage provider {provider.backend} is not compatible with this endpoint." + ) + load_download_endpoint = False + break + + if not load_download_endpoint: + continue + servletclass( hs=hs, authenticator=authenticator, diff --git a/synapse/federation/transport/server/_base.py b/synapse/federation/transport/server/_base.py index db0f5076a9..4e2717b565 100644 --- a/synapse/federation/transport/server/_base.py +++ b/synapse/federation/transport/server/_base.py @@ -360,13 +360,29 @@ class BaseFederationServlet: "request" ) return None + if ( + func.__self__.__class__.__name__ # type: ignore + == "FederationUnstableMediaDownloadServlet" + ): + response = await func( + origin, content, request, *args, **kwargs + ) + else: + response = await func( + origin, content, request.args, *args, **kwargs + ) + else: + if ( + func.__self__.__class__.__name__ # type: ignore + == "FederationUnstableMediaDownloadServlet" + ): + response = await func( + origin, content, request, *args, **kwargs + ) + else: response = await func( origin, content, request.args, *args, **kwargs ) - else: - response = await func( - origin, content, request.args, *args, **kwargs - ) finally: # if we used the origin's context as the parent, add a new span using # the servlet span as a parent, so that we have a link diff --git a/synapse/federation/transport/server/federation.py b/synapse/federation/transport/server/federation.py index a59734785f..1f02451efa 100644 --- a/synapse/federation/transport/server/federation.py +++ b/synapse/federation/transport/server/federation.py @@ -44,10 +44,13 @@ from synapse.federation.transport.server._base import ( ) from synapse.http.servlet import ( parse_boolean_from_args, + parse_integer, parse_integer_from_args, parse_string_from_args, parse_strings_from_args, ) +from synapse.http.site import SynapseRequest +from synapse.media._base import DEFAULT_MAX_TIMEOUT_MS, MAXIMUM_ALLOWED_MAX_TIMEOUT_MS from synapse.types import JsonDict from synapse.util import SYNAPSE_VERSION from synapse.util.ratelimitutils import FederationRateLimiter @@ -787,6 +790,43 @@ class FederationAccountStatusServlet(BaseFederationServerServlet): return 200, {"account_statuses": statuses, "failures": failures} +class FederationUnstableMediaDownloadServlet(BaseFederationServerServlet): + """ + Implementation of new federation media `/download` endpoint outlined in MSC3916. Returns + a multipart/form-data response consisting of a JSON object and the requested media + item. This endpoint only returns local media. + """ + + PATH = "/media/download/(?P<media_id>[^/]*)" + PREFIX = FEDERATION_UNSTABLE_PREFIX + "/org.matrix.msc3916" + RATELIMIT = True + + def __init__( + self, + hs: "HomeServer", + ratelimiter: FederationRateLimiter, + authenticator: Authenticator, + server_name: str, + ): + super().__init__(hs, authenticator, ratelimiter, server_name) + self.media_repo = self.hs.get_media_repository() + + async def on_GET( + self, + origin: Optional[str], + content: Literal[None], + request: SynapseRequest, + media_id: str, + ) -> None: + max_timeout_ms = parse_integer( + request, "timeout_ms", default=DEFAULT_MAX_TIMEOUT_MS + ) + max_timeout_ms = min(max_timeout_ms, MAXIMUM_ALLOWED_MAX_TIMEOUT_MS) + await self.media_repo.get_local_media( + request, media_id, None, max_timeout_ms, federation=True + ) + + FEDERATION_SERVLET_CLASSES: Tuple[Type[BaseFederationServlet], ...] = ( FederationSendServlet, FederationEventServlet, @@ -818,4 +858,5 @@ FEDERATION_SERVLET_CLASSES: Tuple[Type[BaseFederationServlet], ...] = ( FederationV1SendKnockServlet, FederationMakeKnockServlet, FederationAccountStatusServlet, + FederationUnstableMediaDownloadServlet, ) diff --git a/synapse/media/_base.py b/synapse/media/_base.py index 3fbed6062f..19bca94170 100644 --- a/synapse/media/_base.py +++ b/synapse/media/_base.py @@ -25,7 +25,16 @@ import os import urllib from abc import ABC, abstractmethod from types import TracebackType -from typing import Awaitable, Dict, Generator, List, Optional, Tuple, Type +from typing import ( + TYPE_CHECKING, + Awaitable, + Dict, + Generator, + List, + Optional, + Tuple, + Type, +) import attr @@ -39,6 +48,11 @@ from synapse.http.site import SynapseRequest from synapse.logging.context import make_deferred_yieldable from synapse.util.stringutils import is_ascii +if TYPE_CHECKING: + from synapse.media.media_storage import MultipartResponder + from synapse.storage.databases.main.media_repository import LocalMedia + + logger = logging.getLogger(__name__) # list all text content types that will have the charset default to UTF-8 when @@ -260,6 +274,53 @@ def _can_encode_filename_as_token(x: str) -> bool: return True +async def respond_with_multipart_responder( + request: SynapseRequest, + responder: "Optional[MultipartResponder]", + media_info: "LocalMedia", +) -> None: + """ + Responds via a Multipart responder for the federation media `/download` requests + + Args: + request: the federation request to respond to + responder: the Multipart responder which will send the response + media_info: metadata about the media item + """ + if not responder: + respond_404(request) + return + + # If we have a responder we *must* use it as a context manager. + with responder: + if request._disconnected: + logger.warning( + "Not sending response to request %s, already disconnected.", request + ) + return + + logger.debug("Responding to media request with responder %s", responder) + if media_info.media_length is not None: + request.setHeader(b"Content-Length", b"%d" % (media_info.media_length,)) + request.setHeader( + b"Content-Type", b"multipart/mixed; boundary=%s" % responder.boundary + ) + + try: + await responder.write_to_consumer(request) + except Exception as e: + # The majority of the time this will be due to the client having gone + # away. Unfortunately, Twisted simply throws a generic exception at us + # in that case. + logger.warning("Failed to write to consumer: %s %s", type(e), e) + + # Unregister the producer, if it has one, so Twisted doesn't complain + if request.producer: + request.unregisterProducer() + + finish_request(request) + + async def respond_with_responder( request: SynapseRequest, responder: "Optional[Responder]", diff --git a/synapse/media/media_repository.py b/synapse/media/media_repository.py index 6ed56099ca..c335e518a0 100644 --- a/synapse/media/media_repository.py +++ b/synapse/media/media_repository.py @@ -54,10 +54,11 @@ from synapse.media._base import ( ThumbnailInfo, get_filename_from_headers, respond_404, + respond_with_multipart_responder, respond_with_responder, ) from synapse.media.filepath import MediaFilePaths -from synapse.media.media_storage import MediaStorage +from synapse.media.media_storage import MediaStorage, MultipartResponder from synapse.media.storage_provider import StorageProviderWrapper from synapse.media.thumbnailer import Thumbnailer, ThumbnailError from synapse.media.url_previewer import UrlPreviewer @@ -429,6 +430,7 @@ class MediaRepository: media_id: str, name: Optional[str], max_timeout_ms: int, + federation: bool = False, ) -> None: """Responds to requests for local media, if exists, or returns 404. @@ -440,6 +442,7 @@ class MediaRepository: the filename in the Content-Disposition header of the response. max_timeout_ms: the maximum number of milliseconds to wait for the media to be uploaded. + federation: whether the local media being fetched is for a federation request Returns: Resolves once a response has successfully been written to request @@ -459,10 +462,17 @@ class MediaRepository: file_info = FileInfo(None, media_id, url_cache=bool(url_cache)) - responder = await self.media_storage.fetch_media(file_info) - await respond_with_responder( - request, responder, media_type, media_length, upload_name + responder = await self.media_storage.fetch_media( + file_info, media_info, federation ) + if federation: + # this really should be a Multipart responder but just in case + assert isinstance(responder, MultipartResponder) + await respond_with_multipart_responder(request, responder, media_info) + else: + await respond_with_responder( + request, responder, media_type, media_length, upload_name + ) async def get_remote_media( self, diff --git a/synapse/media/media_storage.py b/synapse/media/media_storage.py index b3cd3fd8f4..2f55d12b6b 100644 --- a/synapse/media/media_storage.py +++ b/synapse/media/media_storage.py @@ -19,9 +19,12 @@ # # import contextlib +import json import logging import os import shutil +from contextlib import closing +from io import BytesIO from types import TracebackType from typing import ( IO, @@ -30,14 +33,19 @@ from typing import ( AsyncIterator, BinaryIO, Callable, + List, Optional, Sequence, Tuple, Type, + Union, ) +from uuid import uuid4 import attr +from zope.interface import implementer +from twisted.internet import defer, interfaces from twisted.internet.defer import Deferred from twisted.internet.interfaces import IConsumer from twisted.protocols.basic import FileSender @@ -48,15 +56,19 @@ from synapse.logging.opentracing import start_active_span, trace, trace_with_opn from synapse.util import Clock from synapse.util.file_consumer import BackgroundFileConsumer +from ..storage.databases.main.media_repository import LocalMedia +from ..types import JsonDict from ._base import FileInfo, Responder from .filepath import MediaFilePaths if TYPE_CHECKING: - from synapse.media.storage_provider import StorageProvider + from synapse.media.storage_provider import StorageProviderWrapper from synapse.server import HomeServer logger = logging.getLogger(__name__) +CRLF = b"\r\n" + class MediaStorage: """Responsible for storing/fetching files from local sources. @@ -73,7 +85,7 @@ class MediaStorage: hs: "HomeServer", local_media_directory: str, filepaths: MediaFilePaths, - storage_providers: Sequence["StorageProvider"], + storage_providers: Sequence["StorageProviderWrapper"], ): self.hs = hs self.reactor = hs.get_reactor() @@ -169,15 +181,23 @@ class MediaStorage: raise e from None - async def fetch_media(self, file_info: FileInfo) -> Optional[Responder]: + async def fetch_media( + self, + file_info: FileInfo, + media_info: Optional[LocalMedia] = None, + federation: bool = False, + ) -> Optional[Responder]: """Attempts to fetch media described by file_info from the local cache and configured storage providers. Args: - file_info + file_info: Metadata about the media file + media_info: Metadata about the media item + federation: Whether this file is being fetched for a federation request Returns: - Returns a Responder if the file was found, otherwise None. + If the file was found returns a Responder (a Multipart Responder if the requested + file is for the federation /download endpoint), otherwise None. """ paths = [self._file_info_to_path(file_info)] @@ -197,12 +217,19 @@ class MediaStorage: local_path = os.path.join(self.local_media_directory, path) if os.path.exists(local_path): logger.debug("responding with local file %s", local_path) - return FileResponder(open(local_path, "rb")) + if federation: + assert media_info is not None + boundary = uuid4().hex.encode("ascii") + return MultipartResponder( + open(local_path, "rb"), media_info, boundary + ) + else: + return FileResponder(open(local_path, "rb")) logger.debug("local file %s did not exist", local_path) for provider in self.storage_providers: for path in paths: - res: Any = await provider.fetch(path, file_info) + res: Any = await provider.fetch(path, file_info, media_info, federation) if res: logger.debug("Streaming %s from %s", path, provider) return res @@ -316,7 +343,7 @@ class FileResponder(Responder): """Wraps an open file that can be sent to a request. Args: - open_file: A file like object to be streamed ot the client, + open_file: A file like object to be streamed to the client, is closed when finished streaming. """ @@ -337,6 +364,38 @@ class FileResponder(Responder): self.open_file.close() +class MultipartResponder(Responder): + """Wraps an open file, formats the response according to MSC3916 and sends it to a + federation request. + + Args: + open_file: A file like object to be streamed to the client, + is closed when finished streaming. + media_info: metadata about the media item + boundary: bytes to use for the multipart response boundary + """ + + def __init__(self, open_file: IO, media_info: LocalMedia, boundary: bytes) -> None: + self.open_file = open_file + self.media_info = media_info + self.boundary = boundary + + def write_to_consumer(self, consumer: IConsumer) -> Deferred: + return make_deferred_yieldable( + MultipartFileSender().beginFileTransfer( + self.open_file, consumer, self.media_info.media_type, {}, self.boundary + ) + ) + + def __exit__( + self, + exc_type: Optional[Type[BaseException]], + exc_val: Optional[BaseException], + exc_tb: Optional[TracebackType], + ) -> None: + self.open_file.close() + + class SpamMediaException(NotFoundError): """The media was blocked by a spam checker, so we simply 404 the request (in the same way as if it was quarantined). @@ -370,3 +429,151 @@ class ReadableFileWrapper: # We yield to the reactor by sleeping for 0 seconds. await self.clock.sleep(0) + + +@implementer(interfaces.IProducer) +class MultipartFileSender: + """ + A producer that sends the contents of a file to a federation request in the format + outlined in MSC3916 - a multipart/format-data response where the first field is a + JSON object and the second is the requested file. + + This is a slight re-writing of twisted.protocols.basic.FileSender to achieve the format + outlined above. + """ + + CHUNK_SIZE = 2**14 + + lastSent = "" + deferred: Optional[defer.Deferred] = None + + def beginFileTransfer( + self, + file: IO, + consumer: IConsumer, + file_content_type: str, + json_object: JsonDict, + boundary: bytes, + ) -> Deferred: + """ + Begin transferring a file + + Args: + file: The file object to read data from + consumer: The synapse request to write the data to + file_content_type: The content-type of the file + json_object: The JSON object to write to the first field of the response + boundary: bytes to be used as the multipart/form-data boundary + + Returns: A deferred whose callback will be invoked when the file has + been completely written to the consumer. The last byte written to the + consumer is passed to the callback. + """ + self.file: Optional[IO] = file + self.consumer = consumer + self.json_field = json_object + self.json_field_written = False + self.content_type_written = False + self.file_content_type = file_content_type + self.boundary = boundary + self.deferred: Deferred = defer.Deferred() + self.consumer.registerProducer(self, False) + # while it's not entirely clear why this assignment is necessary, it mirrors + # the behavior in FileSender.beginFileTransfer and thus is preserved here + deferred = self.deferred + return deferred + + def resumeProducing(self) -> None: + # write the first field, which will always be a json field + if not self.json_field_written: + self.consumer.write(CRLF + b"--" + self.boundary + CRLF) + + content_type = Header(b"Content-Type", b"application/json") + self.consumer.write(bytes(content_type) + CRLF) + + json_field = json.dumps(self.json_field) + json_bytes = json_field.encode("utf-8") + self.consumer.write(json_bytes) + self.consumer.write(CRLF + b"--" + self.boundary + CRLF) + + self.json_field_written = True + + chunk: Any = "" + if self.file: + # if we haven't written the content type yet, do so + if not self.content_type_written: + type = self.file_content_type.encode("utf-8") + content_type = Header(b"Content-Type", type) + self.consumer.write(bytes(content_type) + CRLF) + self.content_type_written = True + + chunk = self.file.read(self.CHUNK_SIZE) + + if not chunk: + # we've reached the end of the file + self.consumer.write(CRLF + b"--" + self.boundary + b"--" + CRLF) + self.file = None + self.consumer.unregisterProducer() + + if self.deferred: + self.deferred.callback(self.lastSent) + self.deferred = None + return + + self.consumer.write(chunk) + self.lastSent = chunk[-1:] + + def pauseProducing(self) -> None: + pass + + def stopProducing(self) -> None: + if self.deferred: + self.deferred.errback(Exception("Consumer asked us to stop producing")) + self.deferred = None + + +class Header: + """ + `Header` This class is a tiny wrapper that produces + request headers. We can't use standard python header + class because it encodes unicode fields using =? bla bla ?= + encoding, which is correct, but no one in HTTP world expects + that, everyone wants utf-8 raw bytes. (stolen from treq.multipart) + + """ + + def __init__( + self, + name: bytes, + value: Any, + params: Optional[List[Tuple[Any, Any]]] = None, + ): + self.name = name + self.value = value + self.params = params or [] + + def add_param(self, name: Any, value: Any) -> None: + self.params.append((name, value)) + + def __bytes__(self) -> bytes: + with closing(BytesIO()) as h: + h.write(self.name + b": " + escape(self.value).encode("us-ascii")) + if self.params: + for name, val in self.params: + h.write(b"; ") + h.write(escape(name).encode("us-ascii")) + h.write(b"=") + h.write(b'"' + escape(val).encode("utf-8") + b'"') + h.seek(0) + return h.read() + + +def escape(value: Union[str, bytes]) -> str: + """ + This function prevents header values from corrupting the request, + a newline in the file name parameter makes form-data request unreadable + for a majority of parsers. (stolen from treq.multipart) + """ + if isinstance(value, bytes): + value = value.decode("utf-8") + return value.replace("\r", "").replace("\n", "").replace('"', '\\"') diff --git a/synapse/media/storage_provider.py b/synapse/media/storage_provider.py index 06e5d27a53..a2d50adf65 100644 --- a/synapse/media/storage_provider.py +++ b/synapse/media/storage_provider.py @@ -24,14 +24,16 @@ import logging import os import shutil from typing import TYPE_CHECKING, Callable, Optional +from uuid import uuid4 from synapse.config._base import Config from synapse.logging.context import defer_to_thread, run_in_background from synapse.logging.opentracing import start_active_span, trace_with_opname from synapse.util.async_helpers import maybe_awaitable +from ..storage.databases.main.media_repository import LocalMedia from ._base import FileInfo, Responder -from .media_storage import FileResponder +from .media_storage import FileResponder, MultipartResponder logger = logging.getLogger(__name__) @@ -55,13 +57,21 @@ class StorageProvider(metaclass=abc.ABCMeta): """ @abc.abstractmethod - async def fetch(self, path: str, file_info: FileInfo) -> Optional[Responder]: + async def fetch( + self, + path: str, + file_info: FileInfo, + media_info: Optional[LocalMedia] = None, + federation: bool = False, + ) -> Optional[Responder]: """Attempt to fetch the file described by file_info and stream it into writer. Args: path: Relative path of file in local cache file_info: The metadata of the file. + media_info: metadata of the media item + federation: Whether the requested media is for a federation request Returns: Returns a Responder if the provider has the file, otherwise returns None. @@ -124,7 +134,13 @@ class StorageProviderWrapper(StorageProvider): run_in_background(store) @trace_with_opname("StorageProviderWrapper.fetch") - async def fetch(self, path: str, file_info: FileInfo) -> Optional[Responder]: + async def fetch( + self, + path: str, + file_info: FileInfo, + media_info: Optional[LocalMedia] = None, + federation: bool = False, + ) -> Optional[Responder]: if file_info.url_cache: # Files in the URL preview cache definitely aren't stored here, # so avoid any potentially slow I/O or network access. @@ -132,7 +148,9 @@ class StorageProviderWrapper(StorageProvider): # store_file is supposed to return an Awaitable, but guard # against improper implementations. - return await maybe_awaitable(self.backend.fetch(path, file_info)) + return await maybe_awaitable( + self.backend.fetch(path, file_info, media_info, federation) + ) class FileStorageProviderBackend(StorageProvider): @@ -172,11 +190,23 @@ class FileStorageProviderBackend(StorageProvider): ) @trace_with_opname("FileStorageProviderBackend.fetch") - async def fetch(self, path: str, file_info: FileInfo) -> Optional[Responder]: + async def fetch( + self, + path: str, + file_info: FileInfo, + media_info: Optional[LocalMedia] = None, + federation: bool = False, + ) -> Optional[Responder]: """See StorageProvider.fetch""" backup_fname = os.path.join(self.base_directory, path) if os.path.isfile(backup_fname): + if federation: + assert media_info is not None + boundary = uuid4().hex.encode("ascii") + return MultipartResponder( + open(backup_fname, "rb"), media_info, boundary + ) return FileResponder(open(backup_fname, "rb")) return None |