diff options
Diffstat (limited to 'synapse/rest/media/v1/filepath.py')
-rw-r--r-- | synapse/rest/media/v1/filepath.py | 115 |
1 files changed, 44 insertions, 71 deletions
diff --git a/synapse/rest/media/v1/filepath.py b/synapse/rest/media/v1/filepath.py index 1f6441c412..c0e15c6513 100644 --- a/synapse/rest/media/v1/filepath.py +++ b/synapse/rest/media/v1/filepath.py @@ -43,75 +43,47 @@ GetPathMethod = TypeVar( ) -def _wrap_with_jail_check(relative: bool) -> Callable[[GetPathMethod], GetPathMethod]: +def _wrap_with_jail_check(func: GetPathMethod) -> GetPathMethod: """Wraps a path-returning method to check that the returned path(s) do not escape the media store directory. - The path-returning method may return either a single path, or a list of paths. - The check is not expected to ever fail, unless `func` is missing a call to `_validate_path_component`, or `_validate_path_component` is buggy. Args: - relative: A boolean indicating whether the wrapped method returns paths relative - to the media store directory. + func: The `MediaFilePaths` method to wrap. The method may return either a single + path, or a list of paths. Returned paths may be either absolute or relative. Returns: - A method which will wrap a path-returning method, adding a check to ensure that - the returned path(s) lie within the media store directory. The check will raise - a `ValueError` if it fails. + The method, wrapped with a check to ensure that the returned path(s) lie within + the media store directory. Raises a `ValueError` if the check fails. """ - def _wrap_with_jail_check_inner(func: GetPathMethod) -> GetPathMethod: - @functools.wraps(func) - def _wrapped( - self: "MediaFilePaths", *args: Any, **kwargs: Any - ) -> Union[str, List[str]]: - path_or_paths = func(self, *args, **kwargs) - - if isinstance(path_or_paths, list): - paths_to_check = path_or_paths - else: - paths_to_check = [path_or_paths] - - for path in paths_to_check: - # Construct the path that will ultimately be used. - # We cannot guess whether `path` is relative to the media store - # directory, since the media store directory may itself be a relative - # path. - if relative: - path = os.path.join(self.base_path, path) - normalized_path = os.path.normpath(path) - - # Now that `normpath` has eliminated `../`s and `./`s from the path, - # `os.path.commonpath` can be used to check whether it lies within the - # media store directory. - if ( - os.path.commonpath([normalized_path, self.normalized_base_path]) - != self.normalized_base_path - ): - # The path resolves to outside the media store directory, - # or `self.base_path` is `.`, which is an unlikely configuration. - raise ValueError(f"Invalid media store path: {path!r}") - - # Note that `os.path.normpath`/`abspath` has a subtle caveat: - # `a/b/c/../c` will normalize to `a/b/c`, but the former refers to a - # different path if `a/b/c` is a symlink. That is, the check above is - # not perfect and may allow a certain restricted subset of untrustworthy - # paths through. Since the check above is secondary to the main - # `_validate_path_component` checks, it's less important for it to be - # perfect. - # - # As an alternative, `os.path.realpath` will resolve symlinks, but - # proves problematic if there are symlinks inside the media store. - # eg. if `url_store/` is symlinked to elsewhere, its canonical path - # won't match that of the main media store directory. - - return path_or_paths - - return cast(GetPathMethod, _wrapped) - - return _wrap_with_jail_check_inner + @functools.wraps(func) + def _wrapped( + self: "MediaFilePaths", *args: Any, **kwargs: Any + ) -> Union[str, List[str]]: + path_or_paths = func(self, *args, **kwargs) + + if isinstance(path_or_paths, list): + paths_to_check = path_or_paths + else: + paths_to_check = [path_or_paths] + + for path in paths_to_check: + # path may be an absolute or relative path, depending on the method being + # wrapped. When "appending" an absolute path, `os.path.join` discards the + # previous path, which is desired here. + normalized_path = os.path.normpath(os.path.join(self.real_base_path, path)) + if ( + os.path.commonpath([normalized_path, self.real_base_path]) + != self.real_base_path + ): + raise ValueError(f"Invalid media store path: {path!r}") + + return path_or_paths + + return cast(GetPathMethod, _wrapped) ALLOWED_CHARACTERS = set( @@ -155,7 +127,9 @@ class MediaFilePaths: def __init__(self, primary_base_path: str): self.base_path = primary_base_path - self.normalized_base_path = os.path.normpath(self.base_path) + + # The media store directory, with all symlinks resolved. + self.real_base_path = os.path.realpath(primary_base_path) # Refuse to initialize if paths cannot be validated correctly for the current # platform. @@ -166,7 +140,7 @@ class MediaFilePaths: # for certain homeservers there, since ":"s aren't allowed in paths. assert os.name == "posix" - @_wrap_with_jail_check(relative=True) + @_wrap_with_jail_check def local_media_filepath_rel(self, media_id: str) -> str: return os.path.join( "local_content", @@ -177,7 +151,7 @@ class MediaFilePaths: local_media_filepath = _wrap_in_base_path(local_media_filepath_rel) - @_wrap_with_jail_check(relative=True) + @_wrap_with_jail_check def local_media_thumbnail_rel( self, media_id: str, width: int, height: int, content_type: str, method: str ) -> str: @@ -193,7 +167,7 @@ class MediaFilePaths: local_media_thumbnail = _wrap_in_base_path(local_media_thumbnail_rel) - @_wrap_with_jail_check(relative=False) + @_wrap_with_jail_check def local_media_thumbnail_dir(self, media_id: str) -> str: """ Retrieve the local store path of thumbnails of a given media_id @@ -211,7 +185,7 @@ class MediaFilePaths: _validate_path_component(media_id[4:]), ) - @_wrap_with_jail_check(relative=True) + @_wrap_with_jail_check def remote_media_filepath_rel(self, server_name: str, file_id: str) -> str: return os.path.join( "remote_content", @@ -223,7 +197,7 @@ class MediaFilePaths: remote_media_filepath = _wrap_in_base_path(remote_media_filepath_rel) - @_wrap_with_jail_check(relative=True) + @_wrap_with_jail_check def remote_media_thumbnail_rel( self, server_name: str, @@ -249,7 +223,7 @@ class MediaFilePaths: # Legacy path that was used to store thumbnails previously. # Should be removed after some time, when most of the thumbnails are stored # using the new path. - @_wrap_with_jail_check(relative=True) + @_wrap_with_jail_check def remote_media_thumbnail_rel_legacy( self, server_name: str, file_id: str, width: int, height: int, content_type: str ) -> str: @@ -264,7 +238,6 @@ class MediaFilePaths: _validate_path_component(file_name), ) - @_wrap_with_jail_check(relative=False) def remote_media_thumbnail_dir(self, server_name: str, file_id: str) -> str: return os.path.join( self.base_path, @@ -275,7 +248,7 @@ class MediaFilePaths: _validate_path_component(file_id[4:]), ) - @_wrap_with_jail_check(relative=True) + @_wrap_with_jail_check def url_cache_filepath_rel(self, media_id: str) -> str: if NEW_FORMAT_ID_RE.match(media_id): # Media id is of the form <DATE><RANDOM_STRING> @@ -295,7 +268,7 @@ class MediaFilePaths: url_cache_filepath = _wrap_in_base_path(url_cache_filepath_rel) - @_wrap_with_jail_check(relative=False) + @_wrap_with_jail_check def url_cache_filepath_dirs_to_delete(self, media_id: str) -> List[str]: "The dirs to try and remove if we delete the media_id file" if NEW_FORMAT_ID_RE.match(media_id): @@ -317,7 +290,7 @@ class MediaFilePaths: ), ] - @_wrap_with_jail_check(relative=True) + @_wrap_with_jail_check def url_cache_thumbnail_rel( self, media_id: str, width: int, height: int, content_type: str, method: str ) -> str: @@ -345,7 +318,7 @@ class MediaFilePaths: url_cache_thumbnail = _wrap_in_base_path(url_cache_thumbnail_rel) - @_wrap_with_jail_check(relative=True) + @_wrap_with_jail_check def url_cache_thumbnail_directory_rel(self, media_id: str) -> str: # Media id is of the form <DATE><RANDOM_STRING> # E.g.: 2017-09-28-fsdRDt24DS234dsf @@ -368,7 +341,7 @@ class MediaFilePaths: url_cache_thumbnail_directory_rel ) - @_wrap_with_jail_check(relative=False) + @_wrap_with_jail_check def url_cache_thumbnail_dirs_to_delete(self, media_id: str) -> List[str]: "The dirs to try and remove if we delete the media_id thumbnails" # Media id is of the form <DATE><RANDOM_STRING> |