From 4fc8875876374ec8f97a3b3cc344a4e3abcf769f Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Mon, 27 Feb 2023 08:26:05 -0500 Subject: Refactor media modules. (#15146) * Removes the `v1` directory from `test.rest.media.v1`. * Moves the non-REST code from `synapse.rest.media.v1` to `synapse.media`. * Flatten the `v1` directory from `synapse.rest.media`, but leave compatiblity with 3rd party media repositories and spam checkers. --- synapse/rest/media/v1/filepath.py | 410 -------------------------------------- 1 file changed, 410 deletions(-) delete mode 100644 synapse/rest/media/v1/filepath.py (limited to 'synapse/rest/media/v1/filepath.py') diff --git a/synapse/rest/media/v1/filepath.py b/synapse/rest/media/v1/filepath.py deleted file mode 100644 index 1f6441c412..0000000000 --- a/synapse/rest/media/v1/filepath.py +++ /dev/null @@ -1,410 +0,0 @@ -# Copyright 2014-2016 OpenMarket Ltd -# Copyright 2020-2021 The Matrix.org Foundation C.I.C. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import functools -import os -import re -import string -from typing import Any, Callable, List, TypeVar, Union, cast - -NEW_FORMAT_ID_RE = re.compile(r"^\d\d\d\d-\d\d-\d\d") - - -F = TypeVar("F", bound=Callable[..., str]) - - -def _wrap_in_base_path(func: F) -> F: - """Takes a function that returns a relative path and turns it into an - absolute path based on the location of the primary media store - """ - - @functools.wraps(func) - def _wrapped(self: "MediaFilePaths", *args: Any, **kwargs: Any) -> str: - path = func(self, *args, **kwargs) - return os.path.join(self.base_path, path) - - return cast(F, _wrapped) - - -GetPathMethod = TypeVar( - "GetPathMethod", bound=Union[Callable[..., str], Callable[..., List[str]]] -) - - -def _wrap_with_jail_check(relative: bool) -> Callable[[GetPathMethod], GetPathMethod]: - """Wraps a path-returning method to check that the returned path(s) do not escape - the media store directory. - - The path-returning method may return either a single path, or a list of paths. - - The check is not expected to ever fail, unless `func` is missing a call to - `_validate_path_component`, or `_validate_path_component` is buggy. - - Args: - relative: A boolean indicating whether the wrapped method returns paths relative - to the media store directory. - - Returns: - A method which will wrap a path-returning method, adding a check to ensure that - the returned path(s) lie within the media store directory. The check will raise - a `ValueError` if it fails. - """ - - def _wrap_with_jail_check_inner(func: GetPathMethod) -> GetPathMethod: - @functools.wraps(func) - def _wrapped( - self: "MediaFilePaths", *args: Any, **kwargs: Any - ) -> Union[str, List[str]]: - path_or_paths = func(self, *args, **kwargs) - - if isinstance(path_or_paths, list): - paths_to_check = path_or_paths - else: - paths_to_check = [path_or_paths] - - for path in paths_to_check: - # Construct the path that will ultimately be used. - # We cannot guess whether `path` is relative to the media store - # directory, since the media store directory may itself be a relative - # path. - if relative: - path = os.path.join(self.base_path, path) - normalized_path = os.path.normpath(path) - - # Now that `normpath` has eliminated `../`s and `./`s from the path, - # `os.path.commonpath` can be used to check whether it lies within the - # media store directory. - if ( - os.path.commonpath([normalized_path, self.normalized_base_path]) - != self.normalized_base_path - ): - # The path resolves to outside the media store directory, - # or `self.base_path` is `.`, which is an unlikely configuration. - raise ValueError(f"Invalid media store path: {path!r}") - - # Note that `os.path.normpath`/`abspath` has a subtle caveat: - # `a/b/c/../c` will normalize to `a/b/c`, but the former refers to a - # different path if `a/b/c` is a symlink. That is, the check above is - # not perfect and may allow a certain restricted subset of untrustworthy - # paths through. Since the check above is secondary to the main - # `_validate_path_component` checks, it's less important for it to be - # perfect. - # - # As an alternative, `os.path.realpath` will resolve symlinks, but - # proves problematic if there are symlinks inside the media store. - # eg. if `url_store/` is symlinked to elsewhere, its canonical path - # won't match that of the main media store directory. - - return path_or_paths - - return cast(GetPathMethod, _wrapped) - - return _wrap_with_jail_check_inner - - -ALLOWED_CHARACTERS = set( - string.ascii_letters - + string.digits - + "_-" - + ".[]:" # Domain names, IPv6 addresses and ports in server names -) -FORBIDDEN_NAMES = { - "", - os.path.curdir, # "." for the current platform - os.path.pardir, # ".." for the current platform -} - - -def _validate_path_component(name: str) -> str: - """Checks that the given string can be safely used as a path component - - Args: - name: The path component to check. - - Returns: - The path component if valid. - - Raises: - ValueError: If `name` cannot be safely used as a path component. - """ - if not ALLOWED_CHARACTERS.issuperset(name) or name in FORBIDDEN_NAMES: - raise ValueError(f"Invalid path component: {name!r}") - - return name - - -class MediaFilePaths: - """Describes where files are stored on disk. - - Most of the functions have a `*_rel` variant which returns a file path that - is relative to the base media store path. This is mainly used when we want - to write to the backup media store (when one is configured) - """ - - def __init__(self, primary_base_path: str): - self.base_path = primary_base_path - self.normalized_base_path = os.path.normpath(self.base_path) - - # Refuse to initialize if paths cannot be validated correctly for the current - # platform. - assert os.path.sep not in ALLOWED_CHARACTERS - assert os.path.altsep not in ALLOWED_CHARACTERS - # On Windows, paths have all sorts of weirdness which `_validate_path_component` - # does not consider. In any case, the remote media store can't work correctly - # for certain homeservers there, since ":"s aren't allowed in paths. - assert os.name == "posix" - - @_wrap_with_jail_check(relative=True) - def local_media_filepath_rel(self, media_id: str) -> str: - return os.path.join( - "local_content", - _validate_path_component(media_id[0:2]), - _validate_path_component(media_id[2:4]), - _validate_path_component(media_id[4:]), - ) - - local_media_filepath = _wrap_in_base_path(local_media_filepath_rel) - - @_wrap_with_jail_check(relative=True) - def local_media_thumbnail_rel( - self, media_id: str, width: int, height: int, content_type: str, method: str - ) -> str: - top_level_type, sub_type = content_type.split("/") - file_name = "%i-%i-%s-%s-%s" % (width, height, top_level_type, sub_type, method) - return os.path.join( - "local_thumbnails", - _validate_path_component(media_id[0:2]), - _validate_path_component(media_id[2:4]), - _validate_path_component(media_id[4:]), - _validate_path_component(file_name), - ) - - local_media_thumbnail = _wrap_in_base_path(local_media_thumbnail_rel) - - @_wrap_with_jail_check(relative=False) - def local_media_thumbnail_dir(self, media_id: str) -> str: - """ - Retrieve the local store path of thumbnails of a given media_id - - Args: - media_id: The media ID to query. - Returns: - Path of local_thumbnails from media_id - """ - return os.path.join( - self.base_path, - "local_thumbnails", - _validate_path_component(media_id[0:2]), - _validate_path_component(media_id[2:4]), - _validate_path_component(media_id[4:]), - ) - - @_wrap_with_jail_check(relative=True) - def remote_media_filepath_rel(self, server_name: str, file_id: str) -> str: - return os.path.join( - "remote_content", - _validate_path_component(server_name), - _validate_path_component(file_id[0:2]), - _validate_path_component(file_id[2:4]), - _validate_path_component(file_id[4:]), - ) - - remote_media_filepath = _wrap_in_base_path(remote_media_filepath_rel) - - @_wrap_with_jail_check(relative=True) - def remote_media_thumbnail_rel( - self, - server_name: str, - file_id: str, - width: int, - height: int, - content_type: str, - method: str, - ) -> str: - top_level_type, sub_type = content_type.split("/") - file_name = "%i-%i-%s-%s-%s" % (width, height, top_level_type, sub_type, method) - return os.path.join( - "remote_thumbnail", - _validate_path_component(server_name), - _validate_path_component(file_id[0:2]), - _validate_path_component(file_id[2:4]), - _validate_path_component(file_id[4:]), - _validate_path_component(file_name), - ) - - remote_media_thumbnail = _wrap_in_base_path(remote_media_thumbnail_rel) - - # Legacy path that was used to store thumbnails previously. - # Should be removed after some time, when most of the thumbnails are stored - # using the new path. - @_wrap_with_jail_check(relative=True) - def remote_media_thumbnail_rel_legacy( - self, server_name: str, file_id: str, width: int, height: int, content_type: str - ) -> str: - top_level_type, sub_type = content_type.split("/") - file_name = "%i-%i-%s-%s" % (width, height, top_level_type, sub_type) - return os.path.join( - "remote_thumbnail", - _validate_path_component(server_name), - _validate_path_component(file_id[0:2]), - _validate_path_component(file_id[2:4]), - _validate_path_component(file_id[4:]), - _validate_path_component(file_name), - ) - - @_wrap_with_jail_check(relative=False) - def remote_media_thumbnail_dir(self, server_name: str, file_id: str) -> str: - return os.path.join( - self.base_path, - "remote_thumbnail", - _validate_path_component(server_name), - _validate_path_component(file_id[0:2]), - _validate_path_component(file_id[2:4]), - _validate_path_component(file_id[4:]), - ) - - @_wrap_with_jail_check(relative=True) - def url_cache_filepath_rel(self, media_id: str) -> str: - if NEW_FORMAT_ID_RE.match(media_id): - # Media id is of the form - # E.g.: 2017-09-28-fsdRDt24DS234dsf - return os.path.join( - "url_cache", - _validate_path_component(media_id[:10]), - _validate_path_component(media_id[11:]), - ) - else: - return os.path.join( - "url_cache", - _validate_path_component(media_id[0:2]), - _validate_path_component(media_id[2:4]), - _validate_path_component(media_id[4:]), - ) - - url_cache_filepath = _wrap_in_base_path(url_cache_filepath_rel) - - @_wrap_with_jail_check(relative=False) - def url_cache_filepath_dirs_to_delete(self, media_id: str) -> List[str]: - "The dirs to try and remove if we delete the media_id file" - if NEW_FORMAT_ID_RE.match(media_id): - return [ - os.path.join( - self.base_path, "url_cache", _validate_path_component(media_id[:10]) - ) - ] - else: - return [ - os.path.join( - self.base_path, - "url_cache", - _validate_path_component(media_id[0:2]), - _validate_path_component(media_id[2:4]), - ), - os.path.join( - self.base_path, "url_cache", _validate_path_component(media_id[0:2]) - ), - ] - - @_wrap_with_jail_check(relative=True) - def url_cache_thumbnail_rel( - self, media_id: str, width: int, height: int, content_type: str, method: str - ) -> str: - # Media id is of the form - # E.g.: 2017-09-28-fsdRDt24DS234dsf - - top_level_type, sub_type = content_type.split("/") - file_name = "%i-%i-%s-%s-%s" % (width, height, top_level_type, sub_type, method) - - if NEW_FORMAT_ID_RE.match(media_id): - return os.path.join( - "url_cache_thumbnails", - _validate_path_component(media_id[:10]), - _validate_path_component(media_id[11:]), - _validate_path_component(file_name), - ) - else: - return os.path.join( - "url_cache_thumbnails", - _validate_path_component(media_id[0:2]), - _validate_path_component(media_id[2:4]), - _validate_path_component(media_id[4:]), - _validate_path_component(file_name), - ) - - url_cache_thumbnail = _wrap_in_base_path(url_cache_thumbnail_rel) - - @_wrap_with_jail_check(relative=True) - def url_cache_thumbnail_directory_rel(self, media_id: str) -> str: - # Media id is of the form - # E.g.: 2017-09-28-fsdRDt24DS234dsf - - if NEW_FORMAT_ID_RE.match(media_id): - return os.path.join( - "url_cache_thumbnails", - _validate_path_component(media_id[:10]), - _validate_path_component(media_id[11:]), - ) - else: - return os.path.join( - "url_cache_thumbnails", - _validate_path_component(media_id[0:2]), - _validate_path_component(media_id[2:4]), - _validate_path_component(media_id[4:]), - ) - - url_cache_thumbnail_directory = _wrap_in_base_path( - url_cache_thumbnail_directory_rel - ) - - @_wrap_with_jail_check(relative=False) - def url_cache_thumbnail_dirs_to_delete(self, media_id: str) -> List[str]: - "The dirs to try and remove if we delete the media_id thumbnails" - # Media id is of the form - # E.g.: 2017-09-28-fsdRDt24DS234dsf - if NEW_FORMAT_ID_RE.match(media_id): - return [ - os.path.join( - self.base_path, - "url_cache_thumbnails", - _validate_path_component(media_id[:10]), - _validate_path_component(media_id[11:]), - ), - os.path.join( - self.base_path, - "url_cache_thumbnails", - _validate_path_component(media_id[:10]), - ), - ] - else: - return [ - os.path.join( - self.base_path, - "url_cache_thumbnails", - _validate_path_component(media_id[0:2]), - _validate_path_component(media_id[2:4]), - _validate_path_component(media_id[4:]), - ), - os.path.join( - self.base_path, - "url_cache_thumbnails", - _validate_path_component(media_id[0:2]), - _validate_path_component(media_id[2:4]), - ), - os.path.join( - self.base_path, - "url_cache_thumbnails", - _validate_path_component(media_id[0:2]), - ), - ] -- cgit 1.4.1