diff options
Diffstat (limited to 'synapse/rest/media')
-rw-r--r-- | synapse/rest/media/v0/content_repository.py | 18 | ||||
-rw-r--r-- | synapse/rest/media/v1/_base.py | 169 | ||||
-rw-r--r-- | synapse/rest/media/v1/download_resource.py | 86 | ||||
-rw-r--r-- | synapse/rest/media/v1/filepath.py | 176 | ||||
-rw-r--r-- | synapse/rest/media/v1/identicon_resource.py | 7 | ||||
-rw-r--r-- | synapse/rest/media/v1/media_repository.py | 798 | ||||
-rw-r--r-- | synapse/rest/media/v1/media_storage.py | 265 | ||||
-rw-r--r-- | synapse/rest/media/v1/preview_url_resource.py | 303 | ||||
-rw-r--r-- | synapse/rest/media/v1/storage_provider.py | 144 | ||||
-rw-r--r-- | synapse/rest/media/v1/thumbnail_resource.py | 237 | ||||
-rw-r--r-- | synapse/rest/media/v1/thumbnailer.py | 29 | ||||
-rw-r--r-- | synapse/rest/media/v1/upload_resource.py | 26 |
12 files changed, 1564 insertions, 694 deletions
diff --git a/synapse/rest/media/v0/content_repository.py b/synapse/rest/media/v0/content_repository.py index 956bd5da75..f255f2883f 100644 --- a/synapse/rest/media/v0/content_repository.py +++ b/synapse/rest/media/v0/content_repository.py @@ -13,21 +13,19 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.http.server import respond_with_json_bytes, finish_request - -from synapse.api.errors import ( - Codes, cs_error -) - -from twisted.protocols.basic import FileSender -from twisted.web import server, resource - import base64 -import simplejson as json import logging import os import re +from canonicaljson import json + +from twisted.protocols.basic import FileSender +from twisted.web import resource, server + +from synapse.api.errors import Codes, cs_error +from synapse.http.server import finish_request, respond_with_json_bytes + logger = logging.getLogger(__name__) diff --git a/synapse/rest/media/v1/_base.py b/synapse/rest/media/v1/_base.py index b9600f2167..65f4bd2910 100644 --- a/synapse/rest/media/v1/_base.py +++ b/synapse/rest/media/v1/_base.py @@ -13,22 +13,20 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.http.server import respond_with_json, finish_request -from synapse.api.errors import ( - cs_error, Codes, SynapseError -) +import logging +import os +import urllib + +from six.moves.urllib import parse as urlparse from twisted.internet import defer from twisted.protocols.basic import FileSender +from synapse.api.errors import Codes, SynapseError, cs_error +from synapse.http.server import finish_request, respond_with_json +from synapse.util import logcontext from synapse.util.stringutils import is_ascii -import os - -import logging -import urllib -import urlparse - logger = logging.getLogger(__name__) @@ -44,7 +42,7 @@ def parse_media_id(request): except UnicodeDecodeError: pass return server_name, media_id, file_name - except: + except Exception: raise SynapseError( 404, "Invalid media id token %r" % (request.postpath,), @@ -69,42 +67,133 @@ def respond_with_file(request, media_type, file_path, logger.debug("Responding with %r", file_path) if os.path.isfile(file_path): - request.setHeader(b"Content-Type", media_type.encode("UTF-8")) - if upload_name: - if is_ascii(upload_name): - request.setHeader( - b"Content-Disposition", - b"inline; filename=%s" % ( - urllib.quote(upload_name.encode("utf-8")), - ), - ) - else: - request.setHeader( - b"Content-Disposition", - b"inline; filename*=utf-8''%s" % ( - urllib.quote(upload_name.encode("utf-8")), - ), - ) - - # cache for at least a day. - # XXX: we might want to turn this off for data we don't want to - # recommend caching as it's sensitive or private - or at least - # select private. don't bother setting Expires as all our - # clients are smart enough to be happy with Cache-Control - request.setHeader( - b"Cache-Control", b"public,max-age=86400,s-maxage=86400" - ) if file_size is None: stat = os.stat(file_path) file_size = stat.st_size - request.setHeader( - b"Content-Length", b"%d" % (file_size,) - ) + add_file_headers(request, media_type, file_size, upload_name) with open(file_path, "rb") as f: - yield FileSender().beginFileTransfer(f, request) + yield logcontext.make_deferred_yieldable( + FileSender().beginFileTransfer(f, request) + ) finish_request(request) else: respond_404(request) + + +def add_file_headers(request, media_type, file_size, upload_name): + """Adds the correct response headers in preparation for responding with the + media. + + Args: + request (twisted.web.http.Request) + media_type (str): The media/content type. + file_size (int): Size in bytes of the media, if known. + upload_name (str): The name of the requested file, if any. + """ + request.setHeader(b"Content-Type", media_type.encode("UTF-8")) + if upload_name: + if is_ascii(upload_name): + request.setHeader( + b"Content-Disposition", + b"inline; filename=%s" % ( + urllib.quote(upload_name.encode("utf-8")), + ), + ) + else: + request.setHeader( + b"Content-Disposition", + b"inline; filename*=utf-8''%s" % ( + urllib.quote(upload_name.encode("utf-8")), + ), + ) + + # cache for at least a day. + # XXX: we might want to turn this off for data we don't want to + # recommend caching as it's sensitive or private - or at least + # select private. don't bother setting Expires as all our + # clients are smart enough to be happy with Cache-Control + request.setHeader( + b"Cache-Control", b"public,max-age=86400,s-maxage=86400" + ) + + request.setHeader( + b"Content-Length", b"%d" % (file_size,) + ) + + +@defer.inlineCallbacks +def respond_with_responder(request, responder, media_type, file_size, upload_name=None): + """Responds to the request with given responder. If responder is None then + returns 404. + + Args: + request (twisted.web.http.Request) + responder (Responder|None) + media_type (str): The media/content type. + file_size (int|None): Size in bytes of the media. If not known it should be None + upload_name (str|None): The name of the requested file, if any. + """ + if not responder: + respond_404(request) + return + + logger.debug("Responding to media request with responder %s") + add_file_headers(request, media_type, file_size, upload_name) + with responder: + yield responder.write_to_consumer(request) + finish_request(request) + + +class Responder(object): + """Represents a response that can be streamed to the requester. + + Responder is a context manager which *must* be used, so that any resources + held can be cleaned up. + """ + def write_to_consumer(self, consumer): + """Stream response into consumer + + Args: + consumer (IConsumer) + + Returns: + Deferred: Resolves once the response has finished being written + """ + pass + + def __enter__(self): + pass + + def __exit__(self, exc_type, exc_val, exc_tb): + pass + + +class FileInfo(object): + """Details about a requested/uploaded file. + + Attributes: + server_name (str): The server name where the media originated from, + or None if local. + file_id (str): The local ID of the file. For local files this is the + same as the media_id + url_cache (bool): If the file is for the url preview cache + thumbnail (bool): Whether the file is a thumbnail or not. + thumbnail_width (int) + thumbnail_height (int) + thumbnail_method (str) + thumbnail_type (str): Content type of thumbnail, e.g. image/png + """ + def __init__(self, server_name, file_id, url_cache=False, + thumbnail=False, thumbnail_width=None, thumbnail_height=None, + thumbnail_method=None, thumbnail_type=None): + self.server_name = server_name + self.file_id = file_id + self.url_cache = url_cache + self.thumbnail = thumbnail + self.thumbnail_width = thumbnail_width + self.thumbnail_height = thumbnail_height + self.thumbnail_method = thumbnail_method + self.thumbnail_type = thumbnail_type diff --git a/synapse/rest/media/v1/download_resource.py b/synapse/rest/media/v1/download_resource.py index 6879249c8a..fbfa85f74f 100644 --- a/synapse/rest/media/v1/download_resource.py +++ b/synapse/rest/media/v1/download_resource.py @@ -12,16 +12,16 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import synapse.http.servlet +import logging -from ._base import parse_media_id, respond_with_file, respond_404 +from twisted.internet import defer from twisted.web.resource import Resource -from synapse.http.server import request_handler, set_cors_headers - from twisted.web.server import NOT_DONE_YET -from twisted.internet import defer -import logging +import synapse.http.servlet +from synapse.http.server import set_cors_headers, wrap_json_request_handler + +from ._base import parse_media_id, respond_404 logger = logging.getLogger(__name__) @@ -32,18 +32,17 @@ class DownloadResource(Resource): def __init__(self, hs, media_repo): Resource.__init__(self) - self.filepaths = media_repo.filepaths self.media_repo = media_repo self.server_name = hs.hostname - self.store = hs.get_datastore() - self.version_string = hs.version_string + + # this is expected by @wrap_json_request_handler self.clock = hs.get_clock() def render_GET(self, request): self._async_render_GET(request) return NOT_DONE_YET - @request_handler() + @wrap_json_request_handler @defer.inlineCallbacks def _async_render_GET(self, request): set_cors_headers(request) @@ -57,59 +56,16 @@ class DownloadResource(Resource): ) server_name, media_id, name = parse_media_id(request) if server_name == self.server_name: - yield self._respond_local_file(request, media_id, name) + yield self.media_repo.get_local_media(request, media_id, name) else: - yield self._respond_remote_file( - request, server_name, media_id, name - ) - - @defer.inlineCallbacks - def _respond_local_file(self, request, media_id, name): - media_info = yield self.store.get_local_media(media_id) - if not media_info or media_info["quarantined_by"]: - respond_404(request) - return - - media_type = media_info["media_type"] - media_length = media_info["media_length"] - upload_name = name if name else media_info["upload_name"] - if media_info["url_cache"]: - # TODO: Check the file still exists, if it doesn't we can redownload - # it from the url `media_info["url_cache"]` - file_path = self.filepaths.url_cache_filepath(media_id) - else: - file_path = self.filepaths.local_media_filepath(media_id) - - yield respond_with_file( - request, media_type, file_path, media_length, - upload_name=upload_name, - ) - - @defer.inlineCallbacks - def _respond_remote_file(self, request, server_name, media_id, name): - # don't forward requests for remote media if allow_remote is false - allow_remote = synapse.http.servlet.parse_boolean( - request, "allow_remote", default=True) - if not allow_remote: - logger.info( - "Rejecting request for remote media %s/%s due to allow_remote", - server_name, media_id, - ) - respond_404(request) - return - - media_info = yield self.media_repo.get_remote_media(server_name, media_id) - - media_type = media_info["media_type"] - media_length = media_info["media_length"] - filesystem_id = media_info["filesystem_id"] - upload_name = name if name else media_info["upload_name"] - - file_path = self.filepaths.remote_media_filepath( - server_name, filesystem_id - ) - - yield respond_with_file( - request, media_type, file_path, media_length, - upload_name=upload_name, - ) + allow_remote = synapse.http.servlet.parse_boolean( + request, "allow_remote", default=True) + if not allow_remote: + logger.info( + "Rejecting request for remote media %s/%s due to allow_remote", + server_name, media_id, + ) + respond_404(request) + return + + yield self.media_repo.get_remote_media(request, server_name, media_id, name) diff --git a/synapse/rest/media/v1/filepath.py b/synapse/rest/media/v1/filepath.py index d92b7ff337..c8586fa280 100644 --- a/synapse/rest/media/v1/filepath.py +++ b/synapse/rest/media/v1/filepath.py @@ -13,79 +13,201 @@ # See the License for the specific language governing permissions and # limitations under the License. +import functools import os +import re + +NEW_FORMAT_ID_RE = re.compile(r"^\d\d\d\d-\d\d-\d\d") + + +def _wrap_in_base_path(func): + """Takes a function that returns a relative path and turns it into an + absolute path based on the location of the primary media store + """ + @functools.wraps(func) + def _wrapped(self, *args, **kwargs): + path = func(self, *args, **kwargs) + return os.path.join(self.base_path, path) + + return _wrapped class MediaFilePaths(object): + """Describes where files are stored on disk. - def __init__(self, base_path): - self.base_path = base_path + Most of the functions have a `*_rel` variant which returns a file path that + is relative to the base media store path. This is mainly used when we want + to write to the backup media store (when one is configured) + """ - def default_thumbnail(self, default_top_level, default_sub_type, width, - height, content_type, method): + def __init__(self, primary_base_path): + self.base_path = primary_base_path + + def default_thumbnail_rel(self, default_top_level, default_sub_type, width, + height, content_type, method): top_level_type, sub_type = content_type.split("/") file_name = "%i-%i-%s-%s-%s" % ( width, height, top_level_type, sub_type, method ) return os.path.join( - self.base_path, "default_thumbnails", default_top_level, + "default_thumbnails", default_top_level, default_sub_type, file_name ) - def local_media_filepath(self, media_id): + default_thumbnail = _wrap_in_base_path(default_thumbnail_rel) + + def local_media_filepath_rel(self, media_id): return os.path.join( - self.base_path, "local_content", + "local_content", media_id[0:2], media_id[2:4], media_id[4:] ) - def local_media_thumbnail(self, media_id, width, height, content_type, - method): + local_media_filepath = _wrap_in_base_path(local_media_filepath_rel) + + def local_media_thumbnail_rel(self, media_id, width, height, content_type, + method): top_level_type, sub_type = content_type.split("/") file_name = "%i-%i-%s-%s-%s" % ( width, height, top_level_type, sub_type, method ) return os.path.join( - self.base_path, "local_thumbnails", + "local_thumbnails", media_id[0:2], media_id[2:4], media_id[4:], file_name ) - def remote_media_filepath(self, server_name, file_id): + local_media_thumbnail = _wrap_in_base_path(local_media_thumbnail_rel) + + def remote_media_filepath_rel(self, server_name, file_id): return os.path.join( - self.base_path, "remote_content", server_name, + "remote_content", server_name, file_id[0:2], file_id[2:4], file_id[4:] ) - def remote_media_thumbnail(self, server_name, file_id, width, height, - content_type, method): + remote_media_filepath = _wrap_in_base_path(remote_media_filepath_rel) + + def remote_media_thumbnail_rel(self, server_name, file_id, width, height, + content_type, method): top_level_type, sub_type = content_type.split("/") file_name = "%i-%i-%s-%s" % (width, height, top_level_type, sub_type) return os.path.join( - self.base_path, "remote_thumbnail", server_name, + "remote_thumbnail", server_name, file_id[0:2], file_id[2:4], file_id[4:], file_name ) + remote_media_thumbnail = _wrap_in_base_path(remote_media_thumbnail_rel) + def remote_media_thumbnail_dir(self, server_name, file_id): return os.path.join( self.base_path, "remote_thumbnail", server_name, file_id[0:2], file_id[2:4], file_id[4:], ) - def url_cache_filepath(self, media_id): - return os.path.join( - self.base_path, "url_cache", - media_id[0:2], media_id[2:4], media_id[4:] - ) + def url_cache_filepath_rel(self, media_id): + if NEW_FORMAT_ID_RE.match(media_id): + # Media id is of the form <DATE><RANDOM_STRING> + # E.g.: 2017-09-28-fsdRDt24DS234dsf + return os.path.join( + "url_cache", + media_id[:10], media_id[11:] + ) + else: + return os.path.join( + "url_cache", + media_id[0:2], media_id[2:4], media_id[4:], + ) + + url_cache_filepath = _wrap_in_base_path(url_cache_filepath_rel) + + def url_cache_filepath_dirs_to_delete(self, media_id): + "The dirs to try and remove if we delete the media_id file" + if NEW_FORMAT_ID_RE.match(media_id): + return [ + os.path.join( + self.base_path, "url_cache", + media_id[:10], + ), + ] + else: + return [ + os.path.join( + self.base_path, "url_cache", + media_id[0:2], media_id[2:4], + ), + os.path.join( + self.base_path, "url_cache", + media_id[0:2], + ), + ] + + def url_cache_thumbnail_rel(self, media_id, width, height, content_type, + method): + # Media id is of the form <DATE><RANDOM_STRING> + # E.g.: 2017-09-28-fsdRDt24DS234dsf - def url_cache_thumbnail(self, media_id, width, height, content_type, - method): top_level_type, sub_type = content_type.split("/") file_name = "%i-%i-%s-%s-%s" % ( width, height, top_level_type, sub_type, method ) - return os.path.join( - self.base_path, "url_cache_thumbnails", - media_id[0:2], media_id[2:4], media_id[4:], - file_name - ) + + if NEW_FORMAT_ID_RE.match(media_id): + return os.path.join( + "url_cache_thumbnails", + media_id[:10], media_id[11:], + file_name + ) + else: + return os.path.join( + "url_cache_thumbnails", + media_id[0:2], media_id[2:4], media_id[4:], + file_name + ) + + url_cache_thumbnail = _wrap_in_base_path(url_cache_thumbnail_rel) + + def url_cache_thumbnail_directory(self, media_id): + # Media id is of the form <DATE><RANDOM_STRING> + # E.g.: 2017-09-28-fsdRDt24DS234dsf + + if NEW_FORMAT_ID_RE.match(media_id): + return os.path.join( + self.base_path, "url_cache_thumbnails", + media_id[:10], media_id[11:], + ) + else: + return os.path.join( + self.base_path, "url_cache_thumbnails", + media_id[0:2], media_id[2:4], media_id[4:], + ) + + def url_cache_thumbnail_dirs_to_delete(self, media_id): + "The dirs to try and remove if we delete the media_id thumbnails" + # Media id is of the form <DATE><RANDOM_STRING> + # E.g.: 2017-09-28-fsdRDt24DS234dsf + if NEW_FORMAT_ID_RE.match(media_id): + return [ + os.path.join( + self.base_path, "url_cache_thumbnails", + media_id[:10], media_id[11:], + ), + os.path.join( + self.base_path, "url_cache_thumbnails", + media_id[:10], + ), + ] + else: + return [ + os.path.join( + self.base_path, "url_cache_thumbnails", + media_id[0:2], media_id[2:4], media_id[4:], + ), + os.path.join( + self.base_path, "url_cache_thumbnails", + media_id[0:2], media_id[2:4], + ), + os.path.join( + self.base_path, "url_cache_thumbnails", + media_id[0:2], + ), + ] diff --git a/synapse/rest/media/v1/identicon_resource.py b/synapse/rest/media/v1/identicon_resource.py index 66f2b6bd30..bdbd8d50dd 100644 --- a/synapse/rest/media/v1/identicon_resource.py +++ b/synapse/rest/media/v1/identicon_resource.py @@ -13,8 +13,11 @@ # limitations under the License. from pydenticon import Generator + from twisted.web.resource import Resource +from synapse.http.servlet import parse_integer + FOREGROUND = [ "rgb(45,79,255)", "rgb(254,180,44)", @@ -55,8 +58,8 @@ class IdenticonResource(Resource): def render_GET(self, request): name = "/".join(request.postpath) - width = int(request.args.get("width", [96])[0]) - height = int(request.args.get("height", [96])[0]) + width = parse_integer(request, "width", default=96) + height = parse_integer(request, "height", default=96) identicon_bytes = self.generate_identicon(name, width, height) request.setHeader(b"Content-Type", b"image/png") request.setHeader( diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index 0ea1248ce6..30242c525a 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,45 +14,52 @@ # See the License for the specific language governing permissions and # limitations under the License. -from twisted.internet import defer, threads +import cgi +import errno +import logging +import os +import shutil + +from six import iteritems +from six.moves.urllib import parse as urlparse + import twisted.internet.error import twisted.web.http +from twisted.internet import defer, threads from twisted.web.resource import Resource -from .upload_resource import UploadResource -from .download_resource import DownloadResource -from .thumbnail_resource import ThumbnailResource -from .identicon_resource import IdenticonResource -from .preview_url_resource import PreviewUrlResource -from .filepath import MediaFilePaths -from .thumbnailer import Thumbnailer - +from synapse.api.errors import ( + FederationDeniedError, + HttpResponseException, + NotFoundError, + SynapseError, +) from synapse.http.matrixfederationclient import MatrixFederationHttpClient -from synapse.util.stringutils import random_string -from synapse.api.errors import SynapseError, HttpResponseException, \ - NotFoundError - from synapse.util.async import Linearizer -from synapse.util.stringutils import is_ascii -from synapse.util.logcontext import preserve_context_over_fn +from synapse.util.logcontext import make_deferred_yieldable from synapse.util.retryutils import NotRetryingDestination +from synapse.util.stringutils import is_ascii, random_string -import os -import errno -import shutil - -import cgi -import logging -import urlparse +from ._base import FileInfo, respond_404, respond_with_responder +from .download_resource import DownloadResource +from .filepath import MediaFilePaths +from .identicon_resource import IdenticonResource +from .media_storage import MediaStorage +from .preview_url_resource import PreviewUrlResource +from .storage_provider import StorageProviderWrapper +from .thumbnail_resource import ThumbnailResource +from .thumbnailer import Thumbnailer +from .upload_resource import UploadResource logger = logging.getLogger(__name__) -UPDATE_RECENTLY_ACCESSED_REMOTES_TS = 60 * 1000 +UPDATE_RECENTLY_ACCESSED_TS = 60 * 1000 class MediaRepository(object): def __init__(self, hs): + self.hs = hs self.auth = hs.get_auth() self.client = MatrixFederationHttpClient(hs) self.clock = hs.get_clock() @@ -59,46 +67,90 @@ class MediaRepository(object): self.store = hs.get_datastore() self.max_upload_size = hs.config.max_upload_size self.max_image_pixels = hs.config.max_image_pixels - self.filepaths = MediaFilePaths(hs.config.media_store_path) + + self.primary_base_path = hs.config.media_store_path + self.filepaths = MediaFilePaths(self.primary_base_path) + self.dynamic_thumbnails = hs.config.dynamic_thumbnails self.thumbnail_requirements = hs.config.thumbnail_requirements self.remote_media_linearizer = Linearizer(name="media_remote") self.recently_accessed_remotes = set() + self.recently_accessed_locals = set() + + self.federation_domain_whitelist = hs.config.federation_domain_whitelist + + # List of StorageProviders where we should search for media and + # potentially upload to. + storage_providers = [] + + for clz, provider_config, wrapper_config in hs.config.media_storage_providers: + backend = clz(hs, provider_config) + provider = StorageProviderWrapper( + backend, + store_local=wrapper_config.store_local, + store_remote=wrapper_config.store_remote, + store_synchronous=wrapper_config.store_synchronous, + ) + storage_providers.append(provider) + + self.media_storage = MediaStorage( + self.hs, self.primary_base_path, self.filepaths, storage_providers, + ) self.clock.looping_call( - self._update_recently_accessed_remotes, - UPDATE_RECENTLY_ACCESSED_REMOTES_TS + self._update_recently_accessed, + UPDATE_RECENTLY_ACCESSED_TS, ) @defer.inlineCallbacks - def _update_recently_accessed_remotes(self): - media = self.recently_accessed_remotes + def _update_recently_accessed(self): + remote_media = self.recently_accessed_remotes self.recently_accessed_remotes = set() + local_media = self.recently_accessed_locals + self.recently_accessed_locals = set() + yield self.store.update_cached_last_access_time( - media, self.clock.time_msec() + local_media, remote_media, self.clock.time_msec() ) - @staticmethod - def _makedirs(filepath): - dirname = os.path.dirname(filepath) - if not os.path.exists(dirname): - os.makedirs(dirname) + def mark_recently_accessed(self, server_name, media_id): + """Mark the given media as recently accessed. + + Args: + server_name (str|None): Origin server of media, or None if local + media_id (str): The media ID of the content + """ + if server_name: + self.recently_accessed_remotes.add((server_name, media_id)) + else: + self.recently_accessed_locals.add(media_id) @defer.inlineCallbacks def create_content(self, media_type, upload_name, content, content_length, auth_user): + """Store uploaded content for a local user and return the mxc URL + + Args: + media_type(str): The content type of the file + upload_name(str): The name of the file + content: A file like object that is the content to store + content_length(int): The length of the content + auth_user(str): The user_id of the uploader + + Returns: + Deferred[str]: The mxc url of the stored content + """ media_id = random_string(24) - fname = self.filepaths.local_media_filepath(media_id) - self._makedirs(fname) + file_info = FileInfo( + server_name=None, + file_id=media_id, + ) - # This shouldn't block for very long because the content will have - # already been uploaded at this point. - with open(fname, "wb") as f: - f.write(content) + fname = yield self.media_storage.store_file(content, file_info) logger.info("Stored local media in file %r", fname) @@ -110,131 +162,275 @@ class MediaRepository(object): media_length=content_length, user_id=auth_user, ) - media_info = { - "media_type": media_type, - "media_length": content_length, - } - yield self._generate_local_thumbnails(media_id, media_info) + yield self._generate_thumbnails( + None, media_id, media_id, media_type, + ) defer.returnValue("mxc://%s/%s" % (self.server_name, media_id)) @defer.inlineCallbacks - def get_remote_media(self, server_name, media_id): + def get_local_media(self, request, media_id, name): + """Responds to reqests for local media, if exists, or returns 404. + + Args: + request(twisted.web.http.Request) + media_id (str): The media ID of the content. (This is the same as + the file_id for local content.) + name (str|None): Optional name that, if specified, will be used as + the filename in the Content-Disposition header of the response. + + Returns: + Deferred: Resolves once a response has successfully been written + to request + """ + media_info = yield self.store.get_local_media(media_id) + if not media_info or media_info["quarantined_by"]: + respond_404(request) + return + + self.mark_recently_accessed(None, media_id) + + media_type = media_info["media_type"] + media_length = media_info["media_length"] + upload_name = name if name else media_info["upload_name"] + url_cache = media_info["url_cache"] + + file_info = FileInfo( + None, media_id, + url_cache=url_cache, + ) + + responder = yield self.media_storage.fetch_media(file_info) + yield respond_with_responder( + request, responder, media_type, media_length, upload_name, + ) + + @defer.inlineCallbacks + def get_remote_media(self, request, server_name, media_id, name): + """Respond to requests for remote media. + + Args: + request(twisted.web.http.Request) + server_name (str): Remote server_name where the media originated. + media_id (str): The media ID of the content (as defined by the + remote server). + name (str|None): Optional name that, if specified, will be used as + the filename in the Content-Disposition header of the response. + + Returns: + Deferred: Resolves once a response has successfully been written + to request + """ + if ( + self.federation_domain_whitelist is not None and + server_name not in self.federation_domain_whitelist + ): + raise FederationDeniedError(server_name) + + self.mark_recently_accessed(server_name, media_id) + + # We linearize here to ensure that we don't try and download remote + # media multiple times concurrently + key = (server_name, media_id) + with (yield self.remote_media_linearizer.queue(key)): + responder, media_info = yield self._get_remote_media_impl( + server_name, media_id, + ) + + # We deliberately stream the file outside the lock + if responder: + media_type = media_info["media_type"] + media_length = media_info["media_length"] + upload_name = name if name else media_info["upload_name"] + yield respond_with_responder( + request, responder, media_type, media_length, upload_name, + ) + else: + respond_404(request) + + @defer.inlineCallbacks + def get_remote_media_info(self, server_name, media_id): + """Gets the media info associated with the remote file, downloading + if necessary. + + Args: + server_name (str): Remote server_name where the media originated. + media_id (str): The media ID of the content (as defined by the + remote server). + + Returns: + Deferred[dict]: The media_info of the file + """ + if ( + self.federation_domain_whitelist is not None and + server_name not in self.federation_domain_whitelist + ): + raise FederationDeniedError(server_name) + + # We linearize here to ensure that we don't try and download remote + # media multiple times concurrently key = (server_name, media_id) with (yield self.remote_media_linearizer.queue(key)): - media_info = yield self._get_remote_media_impl(server_name, media_id) + responder, media_info = yield self._get_remote_media_impl( + server_name, media_id, + ) + + # Ensure we actually use the responder so that it releases resources + if responder: + with responder: + pass + defer.returnValue(media_info) @defer.inlineCallbacks def _get_remote_media_impl(self, server_name, media_id): + """Looks for media in local cache, if not there then attempt to + download from remote server. + + Args: + server_name (str): Remote server_name where the media originated. + media_id (str): The media ID of the content (as defined by the + remote server). + + Returns: + Deferred[(Responder, media_info)] + """ media_info = yield self.store.get_cached_remote_media( server_name, media_id ) - if not media_info: - media_info = yield self._download_remote_file( - server_name, media_id - ) - elif media_info["quarantined_by"]: - raise NotFoundError() + + # file_id is the ID we use to track the file locally. If we've already + # seen the file then reuse the existing ID, otherwise genereate a new + # one. + if media_info: + file_id = media_info["filesystem_id"] else: - self.recently_accessed_remotes.add((server_name, media_id)) - yield self.store.update_cached_last_access_time( - [(server_name, media_id)], self.clock.time_msec() - ) - defer.returnValue(media_info) + file_id = random_string(24) - @defer.inlineCallbacks - def _download_remote_file(self, server_name, media_id): - file_id = random_string(24) + file_info = FileInfo(server_name, file_id) + + # If we have an entry in the DB, try and look for it + if media_info: + if media_info["quarantined_by"]: + logger.info("Media is quarantined") + raise NotFoundError() + + responder = yield self.media_storage.fetch_media(file_info) + if responder: + defer.returnValue((responder, media_info)) + + # Failed to find the file anywhere, lets download it. - fname = self.filepaths.remote_media_filepath( - server_name, file_id + media_info = yield self._download_remote_file( + server_name, media_id, file_id ) - self._makedirs(fname) - try: - with open(fname, "wb") as f: - request_path = "/".join(( - "/_matrix/media/v1/download", server_name, media_id, - )) + responder = yield self.media_storage.fetch_media(file_info) + defer.returnValue((responder, media_info)) + + @defer.inlineCallbacks + def _download_remote_file(self, server_name, media_id, file_id): + """Attempt to download the remote file from the given server name, + using the given file_id as the local id. + + Args: + server_name (str): Originating server + media_id (str): The media ID of the content (as defined by the + remote server). This is different than the file_id, which is + locally generated. + file_id (str): Local file ID + + Returns: + Deferred[MediaInfo] + """ + + file_info = FileInfo( + server_name=server_name, + file_id=file_id, + ) + + with self.media_storage.store_into_file(file_info) as (f, fname, finish): + request_path = "/".join(( + "/_matrix/media/v1/download", server_name, media_id, + )) + try: + length, headers = yield self.client.get_file( + server_name, request_path, output_stream=f, + max_size=self.max_upload_size, args={ + # tell the remote server to 404 if it doesn't + # recognise the server_name, to make sure we don't + # end up with a routing loop. + "allow_remote": "false", + } + ) + except twisted.internet.error.DNSLookupError as e: + logger.warn("HTTP error fetching remote media %s/%s: %r", + server_name, media_id, e) + raise NotFoundError() + + except HttpResponseException as e: + logger.warn("HTTP error fetching remote media %s/%s: %s", + server_name, media_id, e.response) + if e.code == twisted.web.http.NOT_FOUND: + raise SynapseError.from_http_response_exception(e) + raise SynapseError(502, "Failed to fetch remote media") + + except SynapseError: + logger.exception("Failed to fetch remote media %s/%s", + server_name, media_id) + raise + except NotRetryingDestination: + logger.warn("Not retrying destination %r", server_name) + raise SynapseError(502, "Failed to fetch remote media") + except Exception: + logger.exception("Failed to fetch remote media %s/%s", + server_name, media_id) + raise SynapseError(502, "Failed to fetch remote media") + + yield finish() + + media_type = headers["Content-Type"][0] + + time_now_ms = self.clock.time_msec() + + content_disposition = headers.get("Content-Disposition", None) + if content_disposition: + _, params = cgi.parse_header(content_disposition[0],) + upload_name = None + + # First check if there is a valid UTF-8 filename + upload_name_utf8 = params.get("filename*", None) + if upload_name_utf8: + if upload_name_utf8.lower().startswith("utf-8''"): + upload_name = upload_name_utf8[7:] + + # If there isn't check for an ascii name. + if not upload_name: + upload_name_ascii = params.get("filename", None) + if upload_name_ascii and is_ascii(upload_name_ascii): + upload_name = upload_name_ascii + + if upload_name: + upload_name = urlparse.unquote(upload_name) try: - length, headers = yield self.client.get_file( - server_name, request_path, output_stream=f, - max_size=self.max_upload_size, args={ - # tell the remote server to 404 if it doesn't - # recognise the server_name, to make sure we don't - # end up with a routing loop. - "allow_remote": "false", - } - ) - except twisted.internet.error.DNSLookupError as e: - logger.warn("HTTP error fetching remote media %s/%s: %r", - server_name, media_id, e) - raise NotFoundError() - - except HttpResponseException as e: - logger.warn("HTTP error fetching remote media %s/%s: %s", - server_name, media_id, e.response) - if e.code == twisted.web.http.NOT_FOUND: - raise SynapseError.from_http_response_exception(e) - raise SynapseError(502, "Failed to fetch remote media") - - except SynapseError: - logger.exception("Failed to fetch remote media %s/%s", - server_name, media_id) - raise - except NotRetryingDestination: - logger.warn("Not retrying destination %r", server_name) - raise SynapseError(502, "Failed to fetch remote media") - except Exception: - logger.exception("Failed to fetch remote media %s/%s", - server_name, media_id) - raise SynapseError(502, "Failed to fetch remote media") - - media_type = headers["Content-Type"][0] - time_now_ms = self.clock.time_msec() - - content_disposition = headers.get("Content-Disposition", None) - if content_disposition: - _, params = cgi.parse_header(content_disposition[0],) - upload_name = None - - # First check if there is a valid UTF-8 filename - upload_name_utf8 = params.get("filename*", None) - if upload_name_utf8: - if upload_name_utf8.lower().startswith("utf-8''"): - upload_name = upload_name_utf8[7:] - - # If there isn't check for an ascii name. - if not upload_name: - upload_name_ascii = params.get("filename", None) - if upload_name_ascii and is_ascii(upload_name_ascii): - upload_name = upload_name_ascii - - if upload_name: - upload_name = urlparse.unquote(upload_name) - try: - upload_name = upload_name.decode("utf-8") - except UnicodeDecodeError: - upload_name = None - else: - upload_name = None - - logger.info("Stored remote media in file %r", fname) - - yield self.store.store_cached_remote_media( - origin=server_name, - media_id=media_id, - media_type=media_type, - time_now_ms=self.clock.time_msec(), - upload_name=upload_name, - media_length=length, - filesystem_id=file_id, - ) - except: - os.remove(fname) - raise + upload_name = upload_name.decode("utf-8") + except UnicodeDecodeError: + upload_name = None + else: + upload_name = None + + logger.info("Stored remote media in file %r", fname) + + yield self.store.store_cached_remote_media( + origin=server_name, + media_id=media_id, + media_type=media_type, + time_now_ms=self.clock.time_msec(), + upload_name=upload_name, + media_length=length, + filesystem_id=file_id, + ) media_info = { "media_type": media_type, @@ -244,8 +440,8 @@ class MediaRepository(object): "filesystem_id": file_id, } - yield self._generate_remote_thumbnails( - server_name, media_id, media_info + yield self._generate_thumbnails( + server_name, media_id, file_id, media_type, ) defer.returnValue(media_info) @@ -253,9 +449,8 @@ class MediaRepository(object): def _get_thumbnail_requirements(self, media_type): return self.thumbnail_requirements.get(media_type, ()) - def _generate_thumbnail(self, input_path, t_path, t_width, t_height, + def _generate_thumbnail(self, thumbnailer, t_width, t_height, t_method, t_type): - thumbnailer = Thumbnailer(input_path) m_width = thumbnailer.width m_height = thumbnailer.height @@ -267,75 +462,125 @@ class MediaRepository(object): return if t_method == "crop": - t_len = thumbnailer.crop(t_path, t_width, t_height, t_type) + t_byte_source = thumbnailer.crop(t_width, t_height, t_type) elif t_method == "scale": t_width, t_height = thumbnailer.aspect(t_width, t_height) t_width = min(m_width, t_width) t_height = min(m_height, t_height) - t_len = thumbnailer.scale(t_path, t_width, t_height, t_type) + t_byte_source = thumbnailer.scale(t_width, t_height, t_type) else: - t_len = None + t_byte_source = None - return t_len + return t_byte_source @defer.inlineCallbacks def generate_local_exact_thumbnail(self, media_id, t_width, t_height, - t_method, t_type): - input_path = self.filepaths.local_media_filepath(media_id) - - t_path = self.filepaths.local_media_thumbnail( - media_id, t_width, t_height, t_type, t_method - ) - self._makedirs(t_path) + t_method, t_type, url_cache): + input_path = yield self.media_storage.ensure_media_is_in_local_cache(FileInfo( + None, media_id, url_cache=url_cache, + )) - t_len = yield preserve_context_over_fn( - threads.deferToThread, + thumbnailer = Thumbnailer(input_path) + t_byte_source = yield make_deferred_yieldable(threads.deferToThread( self._generate_thumbnail, - input_path, t_path, t_width, t_height, t_method, t_type - ) + thumbnailer, t_width, t_height, t_method, t_type + )) + + if t_byte_source: + try: + file_info = FileInfo( + server_name=None, + file_id=media_id, + url_cache=url_cache, + thumbnail=True, + thumbnail_width=t_width, + thumbnail_height=t_height, + thumbnail_method=t_method, + thumbnail_type=t_type, + ) + + output_path = yield self.media_storage.store_file( + t_byte_source, file_info, + ) + finally: + t_byte_source.close() + + logger.info("Stored thumbnail in file %r", output_path) + + t_len = os.path.getsize(output_path) - if t_len: yield self.store.store_local_thumbnail( media_id, t_width, t_height, t_type, t_method, t_len ) - defer.returnValue(t_path) + defer.returnValue(output_path) @defer.inlineCallbacks def generate_remote_exact_thumbnail(self, server_name, file_id, media_id, t_width, t_height, t_method, t_type): - input_path = self.filepaths.remote_media_filepath(server_name, file_id) - - t_path = self.filepaths.remote_media_thumbnail( - server_name, file_id, t_width, t_height, t_type, t_method - ) - self._makedirs(t_path) + input_path = yield self.media_storage.ensure_media_is_in_local_cache(FileInfo( + server_name, file_id, url_cache=False, + )) - t_len = yield preserve_context_over_fn( - threads.deferToThread, + thumbnailer = Thumbnailer(input_path) + t_byte_source = yield make_deferred_yieldable(threads.deferToThread( self._generate_thumbnail, - input_path, t_path, t_width, t_height, t_method, t_type - ) + thumbnailer, t_width, t_height, t_method, t_type + )) + + if t_byte_source: + try: + file_info = FileInfo( + server_name=server_name, + file_id=media_id, + thumbnail=True, + thumbnail_width=t_width, + thumbnail_height=t_height, + thumbnail_method=t_method, + thumbnail_type=t_type, + ) + + output_path = yield self.media_storage.store_file( + t_byte_source, file_info, + ) + finally: + t_byte_source.close() + + logger.info("Stored thumbnail in file %r", output_path) + + t_len = os.path.getsize(output_path) - if t_len: yield self.store.store_remote_media_thumbnail( server_name, media_id, file_id, t_width, t_height, t_type, t_method, t_len ) - defer.returnValue(t_path) + defer.returnValue(output_path) @defer.inlineCallbacks - def _generate_local_thumbnails(self, media_id, media_info, url_cache=False): - media_type = media_info["media_type"] + def _generate_thumbnails(self, server_name, media_id, file_id, media_type, + url_cache=False): + """Generate and store thumbnails for an image. + + Args: + server_name (str|None): The server name if remote media, else None if local + media_id (str): The media ID of the content. (This is the same as + the file_id for local content) + file_id (str): Local file ID + media_type (str): The content type of the file + url_cache (bool): If we are thumbnailing images downloaded for the URL cache, + used exclusively by the url previewer + + Returns: + Deferred[dict]: Dict with "width" and "height" keys of original image + """ requirements = self._get_thumbnail_requirements(media_type) if not requirements: return - if url_cache: - input_path = self.filepaths.url_cache_filepath(media_id) - else: - input_path = self.filepaths.local_media_filepath(media_id) + input_path = yield self.media_storage.ensure_media_is_in_local_cache(FileInfo( + server_name, file_id, url_cache=url_cache, + )) thumbnailer = Thumbnailer(input_path) m_width = thumbnailer.width @@ -348,135 +593,68 @@ class MediaRepository(object): ) return - local_thumbnails = [] - - def generate_thumbnails(): - scales = set() - crops = set() - for r_width, r_height, r_method, r_type in requirements: - if r_method == "scale": - t_width, t_height = thumbnailer.aspect(r_width, r_height) - scales.add(( - min(m_width, t_width), min(m_height, t_height), r_type, - )) - elif r_method == "crop": - crops.add((r_width, r_height, r_type)) - - for t_width, t_height, t_type in scales: - t_method = "scale" - if url_cache: - t_path = self.filepaths.url_cache_thumbnail( - media_id, t_width, t_height, t_type, t_method - ) - else: - t_path = self.filepaths.local_media_thumbnail( - media_id, t_width, t_height, t_type, t_method - ) - self._makedirs(t_path) - t_len = thumbnailer.scale(t_path, t_width, t_height, t_type) - - local_thumbnails.append(( - media_id, t_width, t_height, t_type, t_method, t_len + # We deduplicate the thumbnail sizes by ignoring the cropped versions if + # they have the same dimensions of a scaled one. + thumbnails = {} + for r_width, r_height, r_method, r_type in requirements: + if r_method == "crop": + thumbnails.setdefault((r_width, r_height, r_type), r_method) + elif r_method == "scale": + t_width, t_height = thumbnailer.aspect(r_width, r_height) + t_width = min(m_width, t_width) + t_height = min(m_height, t_height) + thumbnails[(t_width, t_height, r_type)] = r_method + + # Now we generate the thumbnails for each dimension, store it + for (t_width, t_height, t_type), t_method in iteritems(thumbnails): + # Generate the thumbnail + if t_method == "crop": + t_byte_source = yield make_deferred_yieldable(threads.deferToThread( + thumbnailer.crop, + t_width, t_height, t_type, )) - - for t_width, t_height, t_type in crops: - if (t_width, t_height, t_type) in scales: - # If the aspect ratio of the cropped thumbnail matches a purely - # scaled one then there is no point in calculating a separate - # thumbnail. - continue - t_method = "crop" - if url_cache: - t_path = self.filepaths.url_cache_thumbnail( - media_id, t_width, t_height, t_type, t_method - ) - else: - t_path = self.filepaths.local_media_thumbnail( - media_id, t_width, t_height, t_type, t_method - ) - self._makedirs(t_path) - t_len = thumbnailer.crop(t_path, t_width, t_height, t_type) - local_thumbnails.append(( - media_id, t_width, t_height, t_type, t_method, t_len + elif t_method == "scale": + t_byte_source = yield make_deferred_yieldable(threads.deferToThread( + thumbnailer.scale, + t_width, t_height, t_type, )) + else: + logger.error("Unrecognized method: %r", t_method) + continue + + if not t_byte_source: + continue + + try: + file_info = FileInfo( + server_name=server_name, + file_id=file_id, + thumbnail=True, + thumbnail_width=t_width, + thumbnail_height=t_height, + thumbnail_method=t_method, + thumbnail_type=t_type, + url_cache=url_cache, + ) - yield preserve_context_over_fn(threads.deferToThread, generate_thumbnails) - - for l in local_thumbnails: - yield self.store.store_local_thumbnail(*l) - - defer.returnValue({ - "width": m_width, - "height": m_height, - }) - - @defer.inlineCallbacks - def _generate_remote_thumbnails(self, server_name, media_id, media_info): - media_type = media_info["media_type"] - file_id = media_info["filesystem_id"] - requirements = self._get_thumbnail_requirements(media_type) - if not requirements: - return - - remote_thumbnails = [] + output_path = yield self.media_storage.store_file( + t_byte_source, file_info, + ) + finally: + t_byte_source.close() - input_path = self.filepaths.remote_media_filepath(server_name, file_id) - thumbnailer = Thumbnailer(input_path) - m_width = thumbnailer.width - m_height = thumbnailer.height + t_len = os.path.getsize(output_path) - def generate_thumbnails(): - if m_width * m_height >= self.max_image_pixels: - logger.info( - "Image too large to thumbnail %r x %r > %r", - m_width, m_height, self.max_image_pixels - ) - return - - scales = set() - crops = set() - for r_width, r_height, r_method, r_type in requirements: - if r_method == "scale": - t_width, t_height = thumbnailer.aspect(r_width, r_height) - scales.add(( - min(m_width, t_width), min(m_height, t_height), r_type, - )) - elif r_method == "crop": - crops.add((r_width, r_height, r_type)) - - for t_width, t_height, t_type in scales: - t_method = "scale" - t_path = self.filepaths.remote_media_thumbnail( - server_name, file_id, t_width, t_height, t_type, t_method - ) - self._makedirs(t_path) - t_len = thumbnailer.scale(t_path, t_width, t_height, t_type) - remote_thumbnails.append([ + # Write to database + if server_name: + yield self.store.store_remote_media_thumbnail( server_name, media_id, file_id, t_width, t_height, t_type, t_method, t_len - ]) - - for t_width, t_height, t_type in crops: - if (t_width, t_height, t_type) in scales: - # If the aspect ratio of the cropped thumbnail matches a purely - # scaled one then there is no point in calculating a separate - # thumbnail. - continue - t_method = "crop" - t_path = self.filepaths.remote_media_thumbnail( - server_name, file_id, t_width, t_height, t_type, t_method ) - self._makedirs(t_path) - t_len = thumbnailer.crop(t_path, t_width, t_height, t_type) - remote_thumbnails.append([ - server_name, media_id, file_id, - t_width, t_height, t_type, t_method, t_len - ]) - - yield preserve_context_over_fn(threads.deferToThread, generate_thumbnails) - - for r in remote_thumbnails: - yield self.store.store_remote_media_thumbnail(*r) + else: + yield self.store.store_local_thumbnail( + media_id, t_width, t_height, t_type, t_method, t_len + ) defer.returnValue({ "width": m_width, @@ -497,6 +675,8 @@ class MediaRepository(object): logger.info("Deleting: %r", key) + # TODO: Should we delete from the backup store + with (yield self.remote_media_linearizer.queue(key)): full_path = self.filepaths.remote_media_filepath(origin, file_id) try: @@ -571,7 +751,11 @@ class MediaRepositoryResource(Resource): self.putChild("upload", UploadResource(hs, media_repo)) self.putChild("download", DownloadResource(hs, media_repo)) - self.putChild("thumbnail", ThumbnailResource(hs, media_repo)) + self.putChild("thumbnail", ThumbnailResource( + hs, media_repo, media_repo.media_storage, + )) self.putChild("identicon", IdenticonResource()) if hs.config.url_preview_enabled: - self.putChild("preview_url", PreviewUrlResource(hs, media_repo)) + self.putChild("preview_url", PreviewUrlResource( + hs, media_repo, media_repo.media_storage, + )) diff --git a/synapse/rest/media/v1/media_storage.py b/synapse/rest/media/v1/media_storage.py new file mode 100644 index 0000000000..b25993fcb5 --- /dev/null +++ b/synapse/rest/media/v1/media_storage.py @@ -0,0 +1,265 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vecotr Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import contextlib +import logging +import os +import shutil +import sys + +import six + +from twisted.internet import defer, threads +from twisted.protocols.basic import FileSender + +from synapse.util.file_consumer import BackgroundFileConsumer +from synapse.util.logcontext import make_deferred_yieldable + +from ._base import Responder + +logger = logging.getLogger(__name__) + + +class MediaStorage(object): + """Responsible for storing/fetching files from local sources. + + Args: + hs (synapse.server.Homeserver) + local_media_directory (str): Base path where we store media on disk + filepaths (MediaFilePaths) + storage_providers ([StorageProvider]): List of StorageProvider that are + used to fetch and store files. + """ + + def __init__(self, hs, local_media_directory, filepaths, storage_providers): + self.hs = hs + self.local_media_directory = local_media_directory + self.filepaths = filepaths + self.storage_providers = storage_providers + + @defer.inlineCallbacks + def store_file(self, source, file_info): + """Write `source` to the on disk media store, and also any other + configured storage providers + + Args: + source: A file like object that should be written + file_info (FileInfo): Info about the file to store + + Returns: + Deferred[str]: the file path written to in the primary media store + """ + + with self.store_into_file(file_info) as (f, fname, finish_cb): + # Write to the main repository + yield make_deferred_yieldable(threads.deferToThread( + _write_file_synchronously, source, f, + )) + yield finish_cb() + + defer.returnValue(fname) + + @contextlib.contextmanager + def store_into_file(self, file_info): + """Context manager used to get a file like object to write into, as + described by file_info. + + Actually yields a 3-tuple (file, fname, finish_cb), where file is a file + like object that can be written to, fname is the absolute path of file + on disk, and finish_cb is a function that returns a Deferred. + + fname can be used to read the contents from after upload, e.g. to + generate thumbnails. + + finish_cb must be called and waited on after the file has been + successfully been written to. Should not be called if there was an + error. + + Args: + file_info (FileInfo): Info about the file to store + + Example: + + with media_storage.store_into_file(info) as (f, fname, finish_cb): + # .. write into f ... + yield finish_cb() + """ + + path = self._file_info_to_path(file_info) + fname = os.path.join(self.local_media_directory, path) + + dirname = os.path.dirname(fname) + if not os.path.exists(dirname): + os.makedirs(dirname) + + finished_called = [False] + + @defer.inlineCallbacks + def finish(): + for provider in self.storage_providers: + yield provider.store_file(path, file_info) + + finished_called[0] = True + + try: + with open(fname, "wb") as f: + yield f, fname, finish + except Exception: + t, v, tb = sys.exc_info() + try: + os.remove(fname) + except Exception: + pass + six.reraise(t, v, tb) + + if not finished_called: + raise Exception("Finished callback not called") + + @defer.inlineCallbacks + def fetch_media(self, file_info): + """Attempts to fetch media described by file_info from the local cache + and configured storage providers. + + Args: + file_info (FileInfo) + + Returns: + Deferred[Responder|None]: Returns a Responder if the file was found, + otherwise None. + """ + + path = self._file_info_to_path(file_info) + local_path = os.path.join(self.local_media_directory, path) + if os.path.exists(local_path): + defer.returnValue(FileResponder(open(local_path, "rb"))) + + for provider in self.storage_providers: + res = yield provider.fetch(path, file_info) + if res: + defer.returnValue(res) + + defer.returnValue(None) + + @defer.inlineCallbacks + def ensure_media_is_in_local_cache(self, file_info): + """Ensures that the given file is in the local cache. Attempts to + download it from storage providers if it isn't. + + Args: + file_info (FileInfo) + + Returns: + Deferred[str]: Full path to local file + """ + path = self._file_info_to_path(file_info) + local_path = os.path.join(self.local_media_directory, path) + if os.path.exists(local_path): + defer.returnValue(local_path) + + dirname = os.path.dirname(local_path) + if not os.path.exists(dirname): + os.makedirs(dirname) + + for provider in self.storage_providers: + res = yield provider.fetch(path, file_info) + if res: + with res: + consumer = BackgroundFileConsumer( + open(local_path, "w"), self.hs.get_reactor()) + yield res.write_to_consumer(consumer) + yield consumer.wait() + defer.returnValue(local_path) + + raise Exception("file could not be found") + + def _file_info_to_path(self, file_info): + """Converts file_info into a relative path. + + The path is suitable for storing files under a directory, e.g. used to + store files on local FS under the base media repository directory. + + Args: + file_info (FileInfo) + + Returns: + str + """ + if file_info.url_cache: + if file_info.thumbnail: + return self.filepaths.url_cache_thumbnail_rel( + media_id=file_info.file_id, + width=file_info.thumbnail_width, + height=file_info.thumbnail_height, + content_type=file_info.thumbnail_type, + method=file_info.thumbnail_method, + ) + return self.filepaths.url_cache_filepath_rel(file_info.file_id) + + if file_info.server_name: + if file_info.thumbnail: + return self.filepaths.remote_media_thumbnail_rel( + server_name=file_info.server_name, + file_id=file_info.file_id, + width=file_info.thumbnail_width, + height=file_info.thumbnail_height, + content_type=file_info.thumbnail_type, + method=file_info.thumbnail_method + ) + return self.filepaths.remote_media_filepath_rel( + file_info.server_name, file_info.file_id, + ) + + if file_info.thumbnail: + return self.filepaths.local_media_thumbnail_rel( + media_id=file_info.file_id, + width=file_info.thumbnail_width, + height=file_info.thumbnail_height, + content_type=file_info.thumbnail_type, + method=file_info.thumbnail_method + ) + return self.filepaths.local_media_filepath_rel( + file_info.file_id, + ) + + +def _write_file_synchronously(source, dest): + """Write `source` to the file like `dest` synchronously. Should be called + from a thread. + + Args: + source: A file like object that's to be written + dest: A file like object to be written to + """ + source.seek(0) # Ensure we read from the start of the file + shutil.copyfileobj(source, dest) + + +class FileResponder(Responder): + """Wraps an open file that can be sent to a request. + + Args: + open_file (file): A file like object to be streamed ot the client, + is closed when finished streaming. + """ + def __init__(self, open_file): + self.open_file = open_file + + def write_to_consumer(self, consumer): + return make_deferred_yieldable( + FileSender().beginFileTransfer(self.open_file, consumer) + ) + + def __exit__(self, exc_type, exc_val, exc_tb): + self.open_file.close() diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index b81a336c5d..b70b15c4c2 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -12,80 +12,98 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import cgi +import datetime +import errno +import fnmatch +import itertools +import logging +import os +import re +import shutil +import sys +import traceback + +from six import string_types +from six.moves import urllib_parse as urlparse + +from canonicaljson import json -from twisted.web.server import NOT_DONE_YET from twisted.internet import defer from twisted.web.resource import Resource +from twisted.web.server import NOT_DONE_YET -from synapse.api.errors import ( - SynapseError, Codes, -) -from synapse.util.stringutils import random_string -from synapse.util.caches.expiringcache import ExpiringCache +from synapse.api.errors import Codes, SynapseError from synapse.http.client import SpiderHttpClient from synapse.http.server import ( - request_handler, respond_with_json_bytes + respond_with_json, + respond_with_json_bytes, + wrap_json_request_handler, ) +from synapse.http.servlet import parse_integer, parse_string from synapse.util.async import ObservableDeferred -from synapse.util.stringutils import is_ascii +from synapse.util.caches.expiringcache import ExpiringCache +from synapse.util.logcontext import make_deferred_yieldable, run_in_background +from synapse.util.stringutils import is_ascii, random_string -import os -import re -import fnmatch -import cgi -import ujson as json -import urlparse -import itertools +from ._base import FileInfo -import logging logger = logging.getLogger(__name__) class PreviewUrlResource(Resource): isLeaf = True - def __init__(self, hs, media_repo): + def __init__(self, hs, media_repo, media_storage): Resource.__init__(self) self.auth = hs.get_auth() self.clock = hs.get_clock() - self.version_string = hs.version_string self.filepaths = media_repo.filepaths self.max_spider_size = hs.config.max_spider_size self.server_name = hs.hostname self.store = hs.get_datastore() self.client = SpiderHttpClient(hs) self.media_repo = media_repo + self.primary_base_path = media_repo.primary_base_path + self.media_storage = media_storage self.url_preview_url_blacklist = hs.config.url_preview_url_blacklist - # simple memory cache mapping urls to OG metadata - self.cache = ExpiringCache( + # memory cache mapping urls to an ObservableDeferred returning + # JSON-encoded OG metadata + self._cache = ExpiringCache( cache_name="url_previews", clock=self.clock, # don't spider URLs more often than once an hour expiry_ms=60 * 60 * 1000, ) - self.cache.start() + self._cache.start() - self.downloads = {} + self._cleaner_loop = self.clock.looping_call( + self._expire_url_cache_data, 10 * 1000 + ) + + def render_OPTIONS(self, request): + return respond_with_json(request, 200, {}, send_cors=True) def render_GET(self, request): self._async_render_GET(request) return NOT_DONE_YET - @request_handler() + @wrap_json_request_handler @defer.inlineCallbacks def _async_render_GET(self, request): # XXX: if get_user_by_req fails, what should we do in an async render? requester = yield self.auth.get_user_by_req(request) - url = request.args.get("url")[0] + url = parse_string(request, "url") if "ts" in request.args: - ts = int(request.args.get("ts")[0]) + ts = parse_integer(request, "ts") else: ts = self.clock.time_msec() + # XXX: we could move this into _do_preview if we wanted. url_tuple = urlparse.urlsplit(url) for entry in self.url_preview_url_blacklist: match = True @@ -118,53 +136,62 @@ class PreviewUrlResource(Resource): Codes.UNKNOWN ) - # first check the memory cache - good to handle all the clients on this - # HS thundering away to preview the same URL at the same time. - og = self.cache.get(url) - if og: - respond_with_json_bytes(request, 200, json.dumps(og), send_cors=True) - return + # the in-memory cache: + # * ensures that only one request is active at a time + # * takes load off the DB for the thundering herds + # * also caches any failures (unlike the DB) so we don't keep + # requesting the same endpoint + + observable = self._cache.get(url) + + if not observable: + download = run_in_background( + self._do_preview, + url, requester.user, ts, + ) + observable = ObservableDeferred( + download, + consumeErrors=True + ) + self._cache[url] = observable + else: + logger.info("Returning cached response") + + og = yield make_deferred_yieldable(observable.observe()) + respond_with_json_bytes(request, 200, og, send_cors=True) - # then check the URL cache in the DB (which will also provide us with + @defer.inlineCallbacks + def _do_preview(self, url, user, ts): + """Check the db, and download the URL and build a preview + + Args: + url (str): + user (str): + ts (int): + + Returns: + Deferred[str]: json-encoded og data + """ + # check the URL cache in the DB (which will also provide us with # historical previews, if we have any) cache_result = yield self.store.get_url_cache(url, ts) if ( cache_result and - cache_result["download_ts"] + cache_result["expires"] > ts and + cache_result["expires_ts"] > ts and cache_result["response_code"] / 100 == 2 ): - respond_with_json_bytes( - request, 200, cache_result["og"].encode('utf-8'), - send_cors=True - ) + defer.returnValue(cache_result["og"]) return - # Ensure only one download for a given URL is active at a time - download = self.downloads.get(url) - if download is None: - download = self._download_url(url, requester.user) - download = ObservableDeferred( - download, - consumeErrors=True - ) - self.downloads[url] = download - - @download.addBoth - def callback(media_info): - del self.downloads[url] - return media_info - media_info = yield download.observe() - - # FIXME: we should probably update our cache now anyway, so that - # even if the OG calculation raises, we don't keep hammering on the - # remote server. For now, leave it uncached to aid debugging OG - # calculation problems + media_info = yield self._download_url(url, user) logger.debug("got media_info of '%s'" % media_info) if _is_media(media_info['media_type']): - dims = yield self.media_repo._generate_local_thumbnails( - media_info['filesystem_id'], media_info, url_cache=True, + file_id = media_info['filesystem_id'] + dims = yield self.media_repo._generate_thumbnails( + None, file_id, file_id, media_info["media_type"], + url_cache=True, ) og = { @@ -204,13 +231,15 @@ class PreviewUrlResource(Resource): # just rely on the caching on the master request to speed things up. if 'og:image' in og and og['og:image']: image_info = yield self._download_url( - _rebase_url(og['og:image'], media_info['uri']), requester.user + _rebase_url(og['og:image'], media_info['uri']), user ) if _is_media(image_info['media_type']): # TODO: make sure we don't choke on white-on-transparent images - dims = yield self.media_repo._generate_local_thumbnails( - image_info['filesystem_id'], image_info, url_cache=True, + file_id = image_info['filesystem_id'] + dims = yield self.media_repo._generate_thumbnails( + None, file_id, file_id, image_info["media_type"], + url_cache=True, ) if dims: og["og:image:width"] = dims['width'] @@ -231,21 +260,20 @@ class PreviewUrlResource(Resource): logger.debug("Calculated OG for %s as %s" % (url, og)) - # store OG in ephemeral in-memory cache - self.cache[url] = og + jsonog = json.dumps(og) # store OG in history-aware DB cache yield self.store.store_url_cache( url, media_info["response_code"], media_info["etag"], - media_info["expires"], - json.dumps(og), + media_info["expires"] + media_info["created_ts"], + jsonog, media_info["filesystem_id"], media_info["created_ts"], ) - respond_with_json_bytes(request, 200, json.dumps(og), send_cors=True) + defer.returnValue(jsonog) @defer.inlineCallbacks def _download_url(self, url, user): @@ -253,21 +281,36 @@ class PreviewUrlResource(Resource): # we're most likely being explicitly triggered by a human rather than a # bot, so are we really a robot? - # XXX: horrible duplication with base_resource's _download_remote_file() - file_id = random_string(24) + file_id = datetime.date.today().isoformat() + '_' + random_string(16) - fname = self.filepaths.url_cache_filepath(file_id) - self.media_repo._makedirs(fname) + file_info = FileInfo( + server_name=None, + file_id=file_id, + url_cache=True, + ) - try: - with open(fname, "wb") as f: + with self.media_storage.store_into_file(file_info) as (f, fname, finish): + try: logger.debug("Trying to get url '%s'" % url) length, headers, uri, code = yield self.client.get_file( url, output_stream=f, max_size=self.max_spider_size, ) + except Exception as e: # FIXME: pass through 404s and other error messages nicely + logger.warn("Error downloading %s: %r", url, e) + raise SynapseError( + 500, "Failed to download content: %s" % ( + traceback.format_exception_only(sys.exc_type, e), + ), + Codes.UNKNOWN, + ) + yield finish() - media_type = headers["Content-Type"][0] + try: + if "Content-Type" in headers: + media_type = headers["Content-Type"][0] + else: + media_type = "application/octet-stream" time_now_ms = self.clock.time_msec() content_disposition = headers.get("Content-Disposition", None) @@ -307,11 +350,11 @@ class PreviewUrlResource(Resource): ) except Exception as e: - os.remove(fname) - raise SynapseError( - 500, ("Failed to download content: %s" % e), - Codes.UNKNOWN - ) + logger.error("Error handling downloaded %s: %r", url, e) + # TODO: we really ought to delete the downloaded file in this + # case, since we won't have recorded it in the db, and will + # therefore not expire it. + raise defer.returnValue({ "media_type": media_type, @@ -328,6 +371,95 @@ class PreviewUrlResource(Resource): "etag": headers["ETag"][0] if "ETag" in headers else None, }) + @defer.inlineCallbacks + def _expire_url_cache_data(self): + """Clean up expired url cache content, media and thumbnails. + """ + # TODO: Delete from backup media store + + now = self.clock.time_msec() + + logger.info("Running url preview cache expiry") + + if not (yield self.store.has_completed_background_updates()): + logger.info("Still running DB updates; skipping expiry") + return + + # First we delete expired url cache entries + media_ids = yield self.store.get_expired_url_cache(now) + + removed_media = [] + for media_id in media_ids: + fname = self.filepaths.url_cache_filepath(media_id) + try: + os.remove(fname) + except OSError as e: + # If the path doesn't exist, meh + if e.errno != errno.ENOENT: + logger.warn("Failed to remove media: %r: %s", media_id, e) + continue + + removed_media.append(media_id) + + try: + dirs = self.filepaths.url_cache_filepath_dirs_to_delete(media_id) + for dir in dirs: + os.rmdir(dir) + except Exception: + pass + + yield self.store.delete_url_cache(removed_media) + + if removed_media: + logger.info("Deleted %d entries from url cache", len(removed_media)) + + # Now we delete old images associated with the url cache. + # These may be cached for a bit on the client (i.e., they + # may have a room open with a preview url thing open). + # So we wait a couple of days before deleting, just in case. + expire_before = now - 2 * 24 * 60 * 60 * 1000 + media_ids = yield self.store.get_url_cache_media_before(expire_before) + + removed_media = [] + for media_id in media_ids: + fname = self.filepaths.url_cache_filepath(media_id) + try: + os.remove(fname) + except OSError as e: + # If the path doesn't exist, meh + if e.errno != errno.ENOENT: + logger.warn("Failed to remove media: %r: %s", media_id, e) + continue + + try: + dirs = self.filepaths.url_cache_filepath_dirs_to_delete(media_id) + for dir in dirs: + os.rmdir(dir) + except Exception: + pass + + thumbnail_dir = self.filepaths.url_cache_thumbnail_directory(media_id) + try: + shutil.rmtree(thumbnail_dir) + except OSError as e: + # If the path doesn't exist, meh + if e.errno != errno.ENOENT: + logger.warn("Failed to remove media: %r: %s", media_id, e) + continue + + removed_media.append(media_id) + + try: + dirs = self.filepaths.url_cache_thumbnail_dirs_to_delete(media_id) + for dir in dirs: + os.rmdir(dir) + except Exception: + pass + + yield self.store.delete_url_cache_media(removed_media) + + logger.info("Deleted %d media from url cache", len(removed_media)) + def decode_and_calc_og(body, media_uri, request_encoding=None): from lxml import etree @@ -425,7 +557,14 @@ def _calc_og(tree, media_uri): from lxml import etree TAGS_TO_REMOVE = ( - "header", "nav", "aside", "footer", "script", "style", etree.Comment + "header", + "nav", + "aside", + "footer", + "script", + "noscript", + "style", + etree.Comment ) # Split all the text nodes into paragraphs (by splitting on new @@ -452,8 +591,8 @@ def _iterate_over_text(tree, *tags_to_ignore): # to be returned. elements = iter([tree]) while True: - el = elements.next() - if isinstance(el, basestring): + el = next(elements) + if isinstance(el, string_types): yield el elif el is not None and el.tag not in tags_to_ignore: # el.text is the text before the first child, so we can immediately diff --git a/synapse/rest/media/v1/storage_provider.py b/synapse/rest/media/v1/storage_provider.py new file mode 100644 index 0000000000..7b9f8b4d79 --- /dev/null +++ b/synapse/rest/media/v1/storage_provider.py @@ -0,0 +1,144 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import os +import shutil + +from twisted.internet import defer, threads + +from synapse.config._base import Config +from synapse.util.logcontext import run_in_background + +from .media_storage import FileResponder + +logger = logging.getLogger(__name__) + + +class StorageProvider(object): + """A storage provider is a service that can store uploaded media and + retrieve them. + """ + def store_file(self, path, file_info): + """Store the file described by file_info. The actual contents can be + retrieved by reading the file in file_info.upload_path. + + Args: + path (str): Relative path of file in local cache + file_info (FileInfo) + + Returns: + Deferred + """ + pass + + def fetch(self, path, file_info): + """Attempt to fetch the file described by file_info and stream it + into writer. + + Args: + path (str): Relative path of file in local cache + file_info (FileInfo) + + Returns: + Deferred(Responder): Returns a Responder if the provider has the file, + otherwise returns None. + """ + pass + + +class StorageProviderWrapper(StorageProvider): + """Wraps a storage provider and provides various config options + + Args: + backend (StorageProvider) + store_local (bool): Whether to store new local files or not. + store_synchronous (bool): Whether to wait for file to be successfully + uploaded, or todo the upload in the backgroud. + store_remote (bool): Whether remote media should be uploaded + """ + def __init__(self, backend, store_local, store_synchronous, store_remote): + self.backend = backend + self.store_local = store_local + self.store_synchronous = store_synchronous + self.store_remote = store_remote + + def store_file(self, path, file_info): + if not file_info.server_name and not self.store_local: + return defer.succeed(None) + + if file_info.server_name and not self.store_remote: + return defer.succeed(None) + + if self.store_synchronous: + return self.backend.store_file(path, file_info) + else: + # TODO: Handle errors. + def store(): + try: + return self.backend.store_file(path, file_info) + except Exception: + logger.exception("Error storing file") + run_in_background(store) + return defer.succeed(None) + + def fetch(self, path, file_info): + return self.backend.fetch(path, file_info) + + +class FileStorageProviderBackend(StorageProvider): + """A storage provider that stores files in a directory on a filesystem. + + Args: + hs (HomeServer) + config: The config returned by `parse_config`. + """ + + def __init__(self, hs, config): + self.cache_directory = hs.config.media_store_path + self.base_directory = config + + def store_file(self, path, file_info): + """See StorageProvider.store_file""" + + primary_fname = os.path.join(self.cache_directory, path) + backup_fname = os.path.join(self.base_directory, path) + + dirname = os.path.dirname(backup_fname) + if not os.path.exists(dirname): + os.makedirs(dirname) + + return threads.deferToThread( + shutil.copyfile, primary_fname, backup_fname, + ) + + def fetch(self, path, file_info): + """See StorageProvider.fetch""" + + backup_fname = os.path.join(self.base_directory, path) + if os.path.isfile(backup_fname): + return FileResponder(open(backup_fname, "rb")) + + @staticmethod + def parse_config(config): + """Called on startup to parse config supplied. This should parse + the config and raise if there is a problem. + + The returned value is passed into the constructor. + + In this case we only care about a single param, the directory, so let's + just pull that out. + """ + return Config.ensure_directory(config["directory"]) diff --git a/synapse/rest/media/v1/thumbnail_resource.py b/synapse/rest/media/v1/thumbnail_resource.py index 68d56b2b10..5305e9175f 100644 --- a/synapse/rest/media/v1/thumbnail_resource.py +++ b/synapse/rest/media/v1/thumbnail_resource.py @@ -14,15 +14,22 @@ # limitations under the License. -from ._base import parse_media_id, respond_404, respond_with_file -from twisted.web.resource import Resource -from synapse.http.servlet import parse_string, parse_integer -from synapse.http.server import request_handler, set_cors_headers +import logging -from twisted.web.server import NOT_DONE_YET from twisted.internet import defer +from twisted.web.resource import Resource +from twisted.web.server import NOT_DONE_YET -import logging +from synapse.http.server import set_cors_headers, wrap_json_request_handler +from synapse.http.servlet import parse_integer, parse_string + +from ._base import ( + FileInfo, + parse_media_id, + respond_404, + respond_with_file, + respond_with_responder, +) logger = logging.getLogger(__name__) @@ -30,22 +37,21 @@ logger = logging.getLogger(__name__) class ThumbnailResource(Resource): isLeaf = True - def __init__(self, hs, media_repo): + def __init__(self, hs, media_repo, media_storage): Resource.__init__(self) self.store = hs.get_datastore() - self.filepaths = media_repo.filepaths self.media_repo = media_repo + self.media_storage = media_storage self.dynamic_thumbnails = hs.config.dynamic_thumbnails self.server_name = hs.hostname - self.version_string = hs.version_string self.clock = hs.get_clock() def render_GET(self, request): self._async_render_GET(request) return NOT_DONE_YET - @request_handler() + @wrap_json_request_handler @defer.inlineCallbacks def _async_render_GET(self, request): set_cors_headers(request) @@ -64,6 +70,7 @@ class ThumbnailResource(Resource): yield self._respond_local_thumbnail( request, media_id, width, height, method, m_type ) + self.media_repo.mark_recently_accessed(None, media_id) else: if self.dynamic_thumbnails: yield self._select_or_generate_remote_thumbnail( @@ -75,20 +82,20 @@ class ThumbnailResource(Resource): request, server_name, media_id, width, height, method, m_type ) + self.media_repo.mark_recently_accessed(server_name, media_id) @defer.inlineCallbacks def _respond_local_thumbnail(self, request, media_id, width, height, method, m_type): media_info = yield self.store.get_local_media(media_id) - if not media_info or media_info["quarantined_by"]: + if not media_info: + respond_404(request) + return + if media_info["quarantined_by"]: + logger.info("Media is quarantined") respond_404(request) return - - # if media_info["media_type"] == "image/svg+xml": - # file_path = self.filepaths.local_media_filepath(media_id) - # yield respond_with_file(request, media_info["media_type"], file_path) - # return thumbnail_infos = yield self.store.get_local_media_thumbnails(media_id) @@ -96,42 +103,39 @@ class ThumbnailResource(Resource): thumbnail_info = self._select_thumbnail( width, height, method, m_type, thumbnail_infos ) - t_width = thumbnail_info["thumbnail_width"] - t_height = thumbnail_info["thumbnail_height"] - t_type = thumbnail_info["thumbnail_type"] - t_method = thumbnail_info["thumbnail_method"] - - if media_info["url_cache"]: - # TODO: Check the file still exists, if it doesn't we can redownload - # it from the url `media_info["url_cache"]` - file_path = self.filepaths.url_cache_thumbnail( - media_id, t_width, t_height, t_type, t_method, - ) - else: - file_path = self.filepaths.local_media_thumbnail( - media_id, t_width, t_height, t_type, t_method, - ) - yield respond_with_file(request, t_type, file_path) - else: - yield self._respond_default_thumbnail( - request, media_info, width, height, method, m_type, + file_info = FileInfo( + server_name=None, file_id=media_id, + url_cache=media_info["url_cache"], + thumbnail=True, + thumbnail_width=thumbnail_info["thumbnail_width"], + thumbnail_height=thumbnail_info["thumbnail_height"], + thumbnail_type=thumbnail_info["thumbnail_type"], + thumbnail_method=thumbnail_info["thumbnail_method"], ) + t_type = file_info.thumbnail_type + t_length = thumbnail_info["thumbnail_length"] + + responder = yield self.media_storage.fetch_media(file_info) + yield respond_with_responder(request, responder, t_type, t_length) + else: + logger.info("Couldn't find any generated thumbnails") + respond_404(request) + @defer.inlineCallbacks def _select_or_generate_local_thumbnail(self, request, media_id, desired_width, desired_height, desired_method, desired_type): media_info = yield self.store.get_local_media(media_id) - if not media_info or media_info["quarantined_by"]: + if not media_info: + respond_404(request) + return + if media_info["quarantined_by"]: + logger.info("Media is quarantined") respond_404(request) return - - # if media_info["media_type"] == "image/svg+xml": - # file_path = self.filepaths.local_media_filepath(media_id) - # yield respond_with_file(request, media_info["media_type"], file_path) - # return thumbnail_infos = yield self.store.get_local_media_thumbnails(media_id) for info in thumbnail_infos: @@ -141,46 +145,43 @@ class ThumbnailResource(Resource): t_type = info["thumbnail_type"] == desired_type if t_w and t_h and t_method and t_type: - if media_info["url_cache"]: - # TODO: Check the file still exists, if it doesn't we can redownload - # it from the url `media_info["url_cache"]` - file_path = self.filepaths.url_cache_thumbnail( - media_id, desired_width, desired_height, desired_type, - desired_method, - ) - else: - file_path = self.filepaths.local_media_thumbnail( - media_id, desired_width, desired_height, desired_type, - desired_method, - ) - yield respond_with_file(request, desired_type, file_path) - return - - logger.debug("We don't have a local thumbnail of that size. Generating") + file_info = FileInfo( + server_name=None, file_id=media_id, + url_cache=media_info["url_cache"], + thumbnail=True, + thumbnail_width=info["thumbnail_width"], + thumbnail_height=info["thumbnail_height"], + thumbnail_type=info["thumbnail_type"], + thumbnail_method=info["thumbnail_method"], + ) + + t_type = file_info.thumbnail_type + t_length = info["thumbnail_length"] + + responder = yield self.media_storage.fetch_media(file_info) + if responder: + yield respond_with_responder(request, responder, t_type, t_length) + return + + logger.debug("We don't have a thumbnail of that size. Generating") # Okay, so we generate one. file_path = yield self.media_repo.generate_local_exact_thumbnail( - media_id, desired_width, desired_height, desired_method, desired_type + media_id, desired_width, desired_height, desired_method, desired_type, + url_cache=media_info["url_cache"], ) if file_path: yield respond_with_file(request, desired_type, file_path) else: - yield self._respond_default_thumbnail( - request, media_info, desired_width, desired_height, - desired_method, desired_type, - ) + logger.warn("Failed to generate thumbnail") + respond_404(request) @defer.inlineCallbacks def _select_or_generate_remote_thumbnail(self, request, server_name, media_id, desired_width, desired_height, desired_method, desired_type): - media_info = yield self.media_repo.get_remote_media(server_name, media_id) - - # if media_info["media_type"] == "image/svg+xml": - # file_path = self.filepaths.remote_media_filepath(server_name, media_id) - # yield respond_with_file(request, media_info["media_type"], file_path) - # return + media_info = yield self.media_repo.get_remote_media_info(server_name, media_id) thumbnail_infos = yield self.store.get_remote_media_thumbnails( server_name, media_id, @@ -195,14 +196,24 @@ class ThumbnailResource(Resource): t_type = info["thumbnail_type"] == desired_type if t_w and t_h and t_method and t_type: - file_path = self.filepaths.remote_media_thumbnail( - server_name, file_id, desired_width, desired_height, - desired_type, desired_method, + file_info = FileInfo( + server_name=server_name, file_id=media_info["filesystem_id"], + thumbnail=True, + thumbnail_width=info["thumbnail_width"], + thumbnail_height=info["thumbnail_height"], + thumbnail_type=info["thumbnail_type"], + thumbnail_method=info["thumbnail_method"], ) - yield respond_with_file(request, desired_type, file_path) - return - logger.debug("We don't have a local thumbnail of that size. Generating") + t_type = file_info.thumbnail_type + t_length = info["thumbnail_length"] + + responder = yield self.media_storage.fetch_media(file_info) + if responder: + yield respond_with_responder(request, responder, t_type, t_length) + return + + logger.debug("We don't have a thumbnail of that size. Generating") # Okay, so we generate one. file_path = yield self.media_repo.generate_remote_exact_thumbnail( @@ -213,22 +224,16 @@ class ThumbnailResource(Resource): if file_path: yield respond_with_file(request, desired_type, file_path) else: - yield self._respond_default_thumbnail( - request, media_info, desired_width, desired_height, - desired_method, desired_type, - ) + logger.warn("Failed to generate thumbnail") + respond_404(request) @defer.inlineCallbacks def _respond_remote_thumbnail(self, request, server_name, media_id, width, height, method, m_type): # TODO: Don't download the whole remote file - # We should proxy the thumbnail from the remote server instead. - media_info = yield self.media_repo.get_remote_media(server_name, media_id) - - # if media_info["media_type"] == "image/svg+xml": - # file_path = self.filepaths.remote_media_filepath(server_name, media_id) - # yield respond_with_file(request, media_info["media_type"], file_path) - # return + # We should proxy the thumbnail from the remote server instead of + # downloading the remote file and generating our own thumbnails. + media_info = yield self.media_repo.get_remote_media_info(server_name, media_id) thumbnail_infos = yield self.store.get_remote_media_thumbnails( server_name, media_id, @@ -238,59 +243,23 @@ class ThumbnailResource(Resource): thumbnail_info = self._select_thumbnail( width, height, method, m_type, thumbnail_infos ) - t_width = thumbnail_info["thumbnail_width"] - t_height = thumbnail_info["thumbnail_height"] - t_type = thumbnail_info["thumbnail_type"] - t_method = thumbnail_info["thumbnail_method"] - file_id = thumbnail_info["filesystem_id"] + file_info = FileInfo( + server_name=server_name, file_id=media_info["filesystem_id"], + thumbnail=True, + thumbnail_width=thumbnail_info["thumbnail_width"], + thumbnail_height=thumbnail_info["thumbnail_height"], + thumbnail_type=thumbnail_info["thumbnail_type"], + thumbnail_method=thumbnail_info["thumbnail_method"], + ) + + t_type = file_info.thumbnail_type t_length = thumbnail_info["thumbnail_length"] - file_path = self.filepaths.remote_media_thumbnail( - server_name, file_id, t_width, t_height, t_type, t_method, - ) - yield respond_with_file(request, t_type, file_path, t_length) + responder = yield self.media_storage.fetch_media(file_info) + yield respond_with_responder(request, responder, t_type, t_length) else: - yield self._respond_default_thumbnail( - request, media_info, width, height, method, m_type, - ) - - @defer.inlineCallbacks - def _respond_default_thumbnail(self, request, media_info, width, height, - method, m_type): - # XXX: how is this meant to work? store.get_default_thumbnails - # appears to always return [] so won't this always 404? - media_type = media_info["media_type"] - top_level_type = media_type.split("/")[0] - sub_type = media_type.split("/")[-1].split(";")[0] - thumbnail_infos = yield self.store.get_default_thumbnails( - top_level_type, sub_type, - ) - if not thumbnail_infos: - thumbnail_infos = yield self.store.get_default_thumbnails( - top_level_type, "_default", - ) - if not thumbnail_infos: - thumbnail_infos = yield self.store.get_default_thumbnails( - "_default", "_default", - ) - if not thumbnail_infos: + logger.info("Failed to find any generated thumbnails") respond_404(request) - return - - thumbnail_info = self._select_thumbnail( - width, height, "crop", m_type, thumbnail_infos - ) - - t_width = thumbnail_info["thumbnail_width"] - t_height = thumbnail_info["thumbnail_height"] - t_type = thumbnail_info["thumbnail_type"] - t_method = thumbnail_info["thumbnail_method"] - t_length = thumbnail_info["thumbnail_length"] - - file_path = self.filepaths.default_thumbnail( - top_level_type, sub_type, t_width, t_height, t_type, t_method, - ) - yield respond_with_file(request, t_type, file_path, t_length) def _select_thumbnail(self, desired_width, desired_height, desired_method, desired_type, thumbnail_infos): diff --git a/synapse/rest/media/v1/thumbnailer.py b/synapse/rest/media/v1/thumbnailer.py index 3868d4f65f..a4b26c2587 100644 --- a/synapse/rest/media/v1/thumbnailer.py +++ b/synapse/rest/media/v1/thumbnailer.py @@ -13,10 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -import PIL.Image as Image +import logging from io import BytesIO -import logging +import PIL.Image as Image logger = logging.getLogger(__name__) @@ -50,12 +50,16 @@ class Thumbnailer(object): else: return ((max_height * self.width) // self.height, max_height) - def scale(self, output_path, width, height, output_type): - """Rescales the image to the given dimensions""" + def scale(self, width, height, output_type): + """Rescales the image to the given dimensions. + + Returns: + BytesIO: the bytes of the encoded image ready to be written to disk + """ scaled = self.image.resize((width, height), Image.ANTIALIAS) - return self.save_image(scaled, output_type, output_path) + return self._encode_image(scaled, output_type) - def crop(self, output_path, width, height, output_type): + def crop(self, width, height, output_type): """Rescales and crops the image to the given dimensions preserving aspect:: (w_in / h_in) = (w_scaled / h_scaled) @@ -65,6 +69,9 @@ class Thumbnailer(object): Args: max_width: The largest possible width. max_height: The larget possible height. + + Returns: + BytesIO: the bytes of the encoded image ready to be written to disk """ if width * self.height > height * self.width: scaled_height = (width * self.height) // self.width @@ -82,13 +89,9 @@ class Thumbnailer(object): crop_left = (scaled_width - width) // 2 crop_right = width + crop_left cropped = scaled_image.crop((crop_left, 0, crop_right, height)) - return self.save_image(cropped, output_type, output_path) + return self._encode_image(cropped, output_type) - def save_image(self, output_image, output_type, output_path): + def _encode_image(self, output_image, output_type): output_bytes_io = BytesIO() output_image.save(output_bytes_io, self.FORMATS[output_type], quality=80) - output_bytes = output_bytes_io.getvalue() - with open(output_path, "wb") as output_file: - output_file.write(output_bytes) - logger.info("Stored thumbnail in file %r", output_path) - return len(output_bytes) + return output_bytes_io diff --git a/synapse/rest/media/v1/upload_resource.py b/synapse/rest/media/v1/upload_resource.py index 4ab33f73bf..9b22d204a6 100644 --- a/synapse/rest/media/v1/upload_resource.py +++ b/synapse/rest/media/v1/upload_resource.py @@ -13,16 +13,15 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.http.server import respond_with_json, request_handler - -from synapse.api.errors import SynapseError +import logging -from twisted.web.server import NOT_DONE_YET from twisted.internet import defer - from twisted.web.resource import Resource +from twisted.web.server import NOT_DONE_YET -import logging +from synapse.api.errors import SynapseError +from synapse.http.server import respond_with_json, wrap_json_request_handler +from synapse.http.servlet import parse_string logger = logging.getLogger(__name__) @@ -40,7 +39,6 @@ class UploadResource(Resource): self.server_name = hs.hostname self.auth = hs.get_auth() self.max_upload_size = hs.config.max_upload_size - self.version_string = hs.version_string self.clock = hs.get_clock() def render_POST(self, request): @@ -51,7 +49,7 @@ class UploadResource(Resource): respond_with_json(request, 200, {}, send_cors=True) return NOT_DONE_YET - @request_handler() + @wrap_json_request_handler @defer.inlineCallbacks def _async_render_POST(self, request): requester = yield self.auth.get_user_by_req(request) @@ -68,10 +66,10 @@ class UploadResource(Resource): code=413, ) - upload_name = request.args.get("filename", None) + upload_name = parse_string(request, "filename") if upload_name: try: - upload_name = upload_name[0].decode('UTF-8') + upload_name = upload_name.decode('UTF-8') except UnicodeDecodeError: raise SynapseError( msg="Invalid UTF-8 filename parameter: %r" % (upload_name), @@ -81,19 +79,19 @@ class UploadResource(Resource): headers = request.requestHeaders if headers.hasHeader("Content-Type"): - media_type = headers.getRawHeaders("Content-Type")[0] + media_type = headers.getRawHeaders(b"Content-Type")[0] else: raise SynapseError( msg="Upload request missing 'Content-Type'", code=400, ) - # if headers.hasHeader("Content-Disposition"): - # disposition = headers.getRawHeaders("Content-Disposition")[0] + # if headers.hasHeader(b"Content-Disposition"): + # disposition = headers.getRawHeaders(b"Content-Disposition")[0] # TODO(markjh): parse content-dispostion content_uri = yield self.media_repo.create_content( - media_type, upload_name, request.content.read(), + media_type, upload_name, request.content, content_length, requester.user ) |