From 999bd77d3abb7b0a4430f31f5912956c3bc100ee Mon Sep 17 00:00:00 2001 From: Sumner Evans Date: Wed, 15 Nov 2023 07:19:24 -0700 Subject: Asynchronous Uploads (#15503) Support asynchronous uploads as defined in MSC2246. --- synapse/rest/media/create_resource.py | 83 +++++++++++++++++++++++++ synapse/rest/media/download_resource.py | 22 ++++--- synapse/rest/media/media_repository_resource.py | 8 ++- synapse/rest/media/thumbnail_resource.py | 69 ++++++++++++-------- synapse/rest/media/upload_resource.py | 75 +++++++++++++++++++--- 5 files changed, 215 insertions(+), 42 deletions(-) create mode 100644 synapse/rest/media/create_resource.py (limited to 'synapse/rest') diff --git a/synapse/rest/media/create_resource.py b/synapse/rest/media/create_resource.py new file mode 100644 index 0000000000..994afdf13c --- /dev/null +++ b/synapse/rest/media/create_resource.py @@ -0,0 +1,83 @@ +# Copyright 2023 Beeper Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import re +from typing import TYPE_CHECKING + +from synapse.api.errors import LimitExceededError +from synapse.api.ratelimiting import Ratelimiter +from synapse.http.server import respond_with_json +from synapse.http.servlet import RestServlet +from synapse.http.site import SynapseRequest + +if TYPE_CHECKING: + from synapse.media.media_repository import MediaRepository + from synapse.server import HomeServer + +logger = logging.getLogger(__name__) + + +class CreateResource(RestServlet): + PATTERNS = [re.compile("/_matrix/media/v1/create")] + + def __init__(self, hs: "HomeServer", media_repo: "MediaRepository"): + super().__init__() + + self.media_repo = media_repo + self.clock = hs.get_clock() + self.auth = hs.get_auth() + self.max_pending_media_uploads = hs.config.media.max_pending_media_uploads + + # A rate limiter for creating new media IDs. + self._create_media_rate_limiter = Ratelimiter( + store=hs.get_datastores().main, + clock=self.clock, + cfg=hs.config.ratelimiting.rc_media_create, + ) + + async def on_POST(self, request: SynapseRequest) -> None: + requester = await self.auth.get_user_by_req(request) + + # If the create media requests for the user are over the limit, drop them. + await self._create_media_rate_limiter.ratelimit(requester) + + ( + reached_pending_limit, + first_expiration_ts, + ) = await self.media_repo.reached_pending_media_limit(requester.user) + if reached_pending_limit: + raise LimitExceededError( + limiter_name="max_pending_media_uploads", + retry_after_ms=first_expiration_ts - self.clock.time_msec(), + ) + + content_uri, unused_expires_at = await self.media_repo.create_media_id( + requester.user + ) + + logger.info( + "Created Media URI %r that if unused will expire at %d", + content_uri, + unused_expires_at, + ) + respond_with_json( + request, + 200, + { + "content_uri": content_uri, + "unused_expires_at": unused_expires_at, + }, + send_cors=True, + ) diff --git a/synapse/rest/media/download_resource.py b/synapse/rest/media/download_resource.py index 65b9ff52fa..60cd87548c 100644 --- a/synapse/rest/media/download_resource.py +++ b/synapse/rest/media/download_resource.py @@ -17,9 +17,13 @@ import re from typing import TYPE_CHECKING, Optional from synapse.http.server import set_corp_headers, set_cors_headers -from synapse.http.servlet import RestServlet, parse_boolean +from synapse.http.servlet import RestServlet, parse_boolean, parse_integer from synapse.http.site import SynapseRequest -from synapse.media._base import respond_404 +from synapse.media._base import ( + DEFAULT_MAX_TIMEOUT_MS, + MAXIMUM_ALLOWED_MAX_TIMEOUT_MS, + respond_404, +) from synapse.util.stringutils import parse_and_validate_server_name if TYPE_CHECKING: @@ -65,12 +69,16 @@ class DownloadResource(RestServlet): ) # Limited non-standard form of CSP for IE11 request.setHeader(b"X-Content-Security-Policy", b"sandbox;") - request.setHeader( - b"Referrer-Policy", - b"no-referrer", + request.setHeader(b"Referrer-Policy", b"no-referrer") + max_timeout_ms = parse_integer( + request, "timeout_ms", default=DEFAULT_MAX_TIMEOUT_MS ) + max_timeout_ms = min(max_timeout_ms, MAXIMUM_ALLOWED_MAX_TIMEOUT_MS) + if self._is_mine_server_name(server_name): - await self.media_repo.get_local_media(request, media_id, file_name) + await self.media_repo.get_local_media( + request, media_id, file_name, max_timeout_ms + ) else: allow_remote = parse_boolean(request, "allow_remote", default=True) if not allow_remote: @@ -83,5 +91,5 @@ class DownloadResource(RestServlet): return await self.media_repo.get_remote_media( - request, server_name, media_id, file_name + request, server_name, media_id, file_name, max_timeout_ms ) diff --git a/synapse/rest/media/media_repository_resource.py b/synapse/rest/media/media_repository_resource.py index 2089bb1029..ca65116b84 100644 --- a/synapse/rest/media/media_repository_resource.py +++ b/synapse/rest/media/media_repository_resource.py @@ -18,10 +18,11 @@ from synapse.config._base import ConfigError from synapse.http.server import HttpServer, JsonResource from .config_resource import MediaConfigResource +from .create_resource import CreateResource from .download_resource import DownloadResource from .preview_url_resource import PreviewUrlResource from .thumbnail_resource import ThumbnailResource -from .upload_resource import UploadResource +from .upload_resource import AsyncUploadServlet, UploadServlet if TYPE_CHECKING: from synapse.server import HomeServer @@ -91,8 +92,9 @@ class MediaRepositoryResource(JsonResource): # Note that many of these should not exist as v1 endpoints, but empirically # a lot of traffic still goes to them. - - UploadResource(hs, media_repo).register(http_server) + CreateResource(hs, media_repo).register(http_server) + UploadServlet(hs, media_repo).register(http_server) + AsyncUploadServlet(hs, media_repo).register(http_server) DownloadResource(hs, media_repo).register(http_server) ThumbnailResource(hs, media_repo, media_repo.media_storage).register( http_server diff --git a/synapse/rest/media/thumbnail_resource.py b/synapse/rest/media/thumbnail_resource.py index efda8b4ab4..681f2a5a27 100644 --- a/synapse/rest/media/thumbnail_resource.py +++ b/synapse/rest/media/thumbnail_resource.py @@ -23,6 +23,8 @@ from synapse.http.server import respond_with_json, set_corp_headers, set_cors_he from synapse.http.servlet import RestServlet, parse_integer, parse_string from synapse.http.site import SynapseRequest from synapse.media._base import ( + DEFAULT_MAX_TIMEOUT_MS, + MAXIMUM_ALLOWED_MAX_TIMEOUT_MS, FileInfo, ThumbnailInfo, respond_404, @@ -75,15 +77,19 @@ class ThumbnailResource(RestServlet): method = parse_string(request, "method", "scale") # TODO Parse the Accept header to get an prioritised list of thumbnail types. m_type = "image/png" + max_timeout_ms = parse_integer( + request, "timeout_ms", default=DEFAULT_MAX_TIMEOUT_MS + ) + max_timeout_ms = min(max_timeout_ms, MAXIMUM_ALLOWED_MAX_TIMEOUT_MS) if self._is_mine_server_name(server_name): if self.dynamic_thumbnails: await self._select_or_generate_local_thumbnail( - request, media_id, width, height, method, m_type + request, media_id, width, height, method, m_type, max_timeout_ms ) else: await self._respond_local_thumbnail( - request, media_id, width, height, method, m_type + request, media_id, width, height, method, m_type, max_timeout_ms ) self.media_repo.mark_recently_accessed(None, media_id) else: @@ -95,14 +101,21 @@ class ThumbnailResource(RestServlet): respond_404(request) return - if self.dynamic_thumbnails: - await self._select_or_generate_remote_thumbnail( - request, server_name, media_id, width, height, method, m_type - ) - else: - await self._respond_remote_thumbnail( - request, server_name, media_id, width, height, method, m_type - ) + remote_resp_function = ( + self._select_or_generate_remote_thumbnail + if self.dynamic_thumbnails + else self._respond_remote_thumbnail + ) + await remote_resp_function( + request, + server_name, + media_id, + width, + height, + method, + m_type, + max_timeout_ms, + ) self.media_repo.mark_recently_accessed(server_name, media_id) async def _respond_local_thumbnail( @@ -113,15 +126,12 @@ class ThumbnailResource(RestServlet): height: int, method: str, m_type: str, + max_timeout_ms: int, ) -> None: - media_info = await self.store.get_local_media(media_id) - + media_info = await self.media_repo.get_local_media_info( + request, media_id, max_timeout_ms + ) if not media_info: - respond_404(request) - return - if media_info.quarantined_by: - logger.info("Media is quarantined") - respond_404(request) return thumbnail_infos = await self.store.get_local_media_thumbnails(media_id) @@ -146,15 +156,13 @@ class ThumbnailResource(RestServlet): desired_height: int, desired_method: str, desired_type: str, + max_timeout_ms: int, ) -> None: - media_info = await self.store.get_local_media(media_id) + media_info = await self.media_repo.get_local_media_info( + request, media_id, max_timeout_ms + ) if not media_info: - respond_404(request) - return - if media_info.quarantined_by: - logger.info("Media is quarantined") - respond_404(request) return thumbnail_infos = await self.store.get_local_media_thumbnails(media_id) @@ -206,8 +214,14 @@ class ThumbnailResource(RestServlet): desired_height: int, desired_method: str, desired_type: str, + max_timeout_ms: int, ) -> None: - media_info = await self.media_repo.get_remote_media_info(server_name, media_id) + media_info = await self.media_repo.get_remote_media_info( + server_name, media_id, max_timeout_ms + ) + if not media_info: + respond_404(request) + return thumbnail_infos = await self.store.get_remote_media_thumbnails( server_name, media_id @@ -263,11 +277,16 @@ class ThumbnailResource(RestServlet): height: int, method: str, m_type: str, + max_timeout_ms: int, ) -> None: # TODO: Don't download the whole remote file # We should proxy the thumbnail from the remote server instead of # downloading the remote file and generating our own thumbnails. - media_info = await self.media_repo.get_remote_media_info(server_name, media_id) + media_info = await self.media_repo.get_remote_media_info( + server_name, media_id, max_timeout_ms + ) + if not media_info: + return thumbnail_infos = await self.store.get_remote_media_thumbnails( server_name, media_id diff --git a/synapse/rest/media/upload_resource.py b/synapse/rest/media/upload_resource.py index 949326d85d..62d3e228a8 100644 --- a/synapse/rest/media/upload_resource.py +++ b/synapse/rest/media/upload_resource.py @@ -15,7 +15,7 @@ import logging import re -from typing import IO, TYPE_CHECKING, Dict, List, Optional +from typing import IO, TYPE_CHECKING, Dict, List, Optional, Tuple from synapse.api.errors import Codes, SynapseError from synapse.http.server import respond_with_json @@ -29,23 +29,24 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) +# The name of the lock to use when uploading media. +_UPLOAD_MEDIA_LOCK_NAME = "upload_media" -class UploadResource(RestServlet): - PATTERNS = [re.compile("/_matrix/media/(r0|v3|v1)/upload")] +class BaseUploadServlet(RestServlet): def __init__(self, hs: "HomeServer", media_repo: "MediaRepository"): super().__init__() self.media_repo = media_repo self.filepaths = media_repo.filepaths self.store = hs.get_datastores().main - self.clock = hs.get_clock() + self.server_name = hs.hostname self.auth = hs.get_auth() self.max_upload_size = hs.config.media.max_upload_size - self.clock = hs.get_clock() - async def on_POST(self, request: SynapseRequest) -> None: - requester = await self.auth.get_user_by_req(request) + def _get_file_metadata( + self, request: SynapseRequest + ) -> Tuple[int, Optional[str], str]: raw_content_length = request.getHeader("Content-Length") if raw_content_length is None: raise SynapseError(msg="Request must specify a Content-Length", code=400) @@ -88,6 +89,16 @@ class UploadResource(RestServlet): # disposition = headers.getRawHeaders(b"Content-Disposition")[0] # TODO(markjh): parse content-dispostion + return content_length, upload_name, media_type + + +class UploadServlet(BaseUploadServlet): + PATTERNS = [re.compile("/_matrix/media/(r0|v3|v1)/upload$")] + + async def on_POST(self, request: SynapseRequest) -> None: + requester = await self.auth.get_user_by_req(request) + content_length, upload_name, media_type = self._get_file_metadata(request) + try: content: IO = request.content # type: ignore content_uri = await self.media_repo.create_content( @@ -103,3 +114,53 @@ class UploadResource(RestServlet): respond_with_json( request, 200, {"content_uri": str(content_uri)}, send_cors=True ) + + +class AsyncUploadServlet(BaseUploadServlet): + PATTERNS = [ + re.compile( + "/_matrix/media/v3/upload/(?P[^/]*)/(?P[^/]*)$" + ) + ] + + async def on_PUT( + self, request: SynapseRequest, server_name: str, media_id: str + ) -> None: + requester = await self.auth.get_user_by_req(request) + + if server_name != self.server_name: + raise SynapseError( + 404, + "Non-local server name specified", + errcode=Codes.NOT_FOUND, + ) + + lock = await self.store.try_acquire_lock(_UPLOAD_MEDIA_LOCK_NAME, media_id) + if not lock: + raise SynapseError( + 409, + "Media ID cannot be overwritten", + errcode=Codes.CANNOT_OVERWRITE_MEDIA, + ) + + async with lock: + await self.media_repo.verify_can_upload(media_id, requester.user) + content_length, upload_name, media_type = self._get_file_metadata(request) + + try: + content: IO = request.content # type: ignore + await self.media_repo.update_content( + media_id, + media_type, + upload_name, + content, + content_length, + requester.user, + ) + except SpamMediaException: + # For uploading of media we want to respond with a 400, instead of + # the default 404, as that would just be confusing. + raise SynapseError(400, "Bad content") + + logger.info("Uploaded content for media ID %r", media_id) + respond_with_json(request, 200, {}, send_cors=True) -- cgit 1.4.1