diff options
author | Sean Quah <8349537+squahtx@users.noreply.github.com> | 2021-12-02 16:05:24 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-12-02 16:05:24 +0000 |
commit | 858d80bf0f9f656a03992794874081b806e49222 (patch) | |
tree | b6be5ccb0c49765e9b0ccdd94122b63562aab839 /synapse/rest/media | |
parent | Add type annotations to `tests.storage.test_appservice`. (#11488) (diff) | |
download | synapse-858d80bf0f9f656a03992794874081b806e49222.tar.xz |
Fix media repository failing when media store path contains symlinks (#11446)
Diffstat (limited to 'synapse/rest/media')
-rw-r--r-- | synapse/rest/media/v1/filepath.py | 115 |
1 files changed, 71 insertions, 44 deletions
diff --git a/synapse/rest/media/v1/filepath.py b/synapse/rest/media/v1/filepath.py index c0e15c6513..1f6441c412 100644 --- a/synapse/rest/media/v1/filepath.py +++ b/synapse/rest/media/v1/filepath.py @@ -43,47 +43,75 @@ GetPathMethod = TypeVar( ) -def _wrap_with_jail_check(func: GetPathMethod) -> GetPathMethod: +def _wrap_with_jail_check(relative: bool) -> Callable[[GetPathMethod], GetPathMethod]: """Wraps a path-returning method to check that the returned path(s) do not escape the media store directory. + The path-returning method may return either a single path, or a list of paths. + The check is not expected to ever fail, unless `func` is missing a call to `_validate_path_component`, or `_validate_path_component` is buggy. Args: - func: The `MediaFilePaths` method to wrap. The method may return either a single - path, or a list of paths. Returned paths may be either absolute or relative. + relative: A boolean indicating whether the wrapped method returns paths relative + to the media store directory. Returns: - The method, wrapped with a check to ensure that the returned path(s) lie within - the media store directory. Raises a `ValueError` if the check fails. + A method which will wrap a path-returning method, adding a check to ensure that + the returned path(s) lie within the media store directory. The check will raise + a `ValueError` if it fails. """ - @functools.wraps(func) - def _wrapped( - self: "MediaFilePaths", *args: Any, **kwargs: Any - ) -> Union[str, List[str]]: - path_or_paths = func(self, *args, **kwargs) - - if isinstance(path_or_paths, list): - paths_to_check = path_or_paths - else: - paths_to_check = [path_or_paths] - - for path in paths_to_check: - # path may be an absolute or relative path, depending on the method being - # wrapped. When "appending" an absolute path, `os.path.join` discards the - # previous path, which is desired here. - normalized_path = os.path.normpath(os.path.join(self.real_base_path, path)) - if ( - os.path.commonpath([normalized_path, self.real_base_path]) - != self.real_base_path - ): - raise ValueError(f"Invalid media store path: {path!r}") - - return path_or_paths - - return cast(GetPathMethod, _wrapped) + def _wrap_with_jail_check_inner(func: GetPathMethod) -> GetPathMethod: + @functools.wraps(func) + def _wrapped( + self: "MediaFilePaths", *args: Any, **kwargs: Any + ) -> Union[str, List[str]]: + path_or_paths = func(self, *args, **kwargs) + + if isinstance(path_or_paths, list): + paths_to_check = path_or_paths + else: + paths_to_check = [path_or_paths] + + for path in paths_to_check: + # Construct the path that will ultimately be used. + # We cannot guess whether `path` is relative to the media store + # directory, since the media store directory may itself be a relative + # path. + if relative: + path = os.path.join(self.base_path, path) + normalized_path = os.path.normpath(path) + + # Now that `normpath` has eliminated `../`s and `./`s from the path, + # `os.path.commonpath` can be used to check whether it lies within the + # media store directory. + if ( + os.path.commonpath([normalized_path, self.normalized_base_path]) + != self.normalized_base_path + ): + # The path resolves to outside the media store directory, + # or `self.base_path` is `.`, which is an unlikely configuration. + raise ValueError(f"Invalid media store path: {path!r}") + + # Note that `os.path.normpath`/`abspath` has a subtle caveat: + # `a/b/c/../c` will normalize to `a/b/c`, but the former refers to a + # different path if `a/b/c` is a symlink. That is, the check above is + # not perfect and may allow a certain restricted subset of untrustworthy + # paths through. Since the check above is secondary to the main + # `_validate_path_component` checks, it's less important for it to be + # perfect. + # + # As an alternative, `os.path.realpath` will resolve symlinks, but + # proves problematic if there are symlinks inside the media store. + # eg. if `url_store/` is symlinked to elsewhere, its canonical path + # won't match that of the main media store directory. + + return path_or_paths + + return cast(GetPathMethod, _wrapped) + + return _wrap_with_jail_check_inner ALLOWED_CHARACTERS = set( @@ -127,9 +155,7 @@ class MediaFilePaths: def __init__(self, primary_base_path: str): self.base_path = primary_base_path - - # The media store directory, with all symlinks resolved. - self.real_base_path = os.path.realpath(primary_base_path) + self.normalized_base_path = os.path.normpath(self.base_path) # Refuse to initialize if paths cannot be validated correctly for the current # platform. @@ -140,7 +166,7 @@ class MediaFilePaths: # for certain homeservers there, since ":"s aren't allowed in paths. assert os.name == "posix" - @_wrap_with_jail_check + @_wrap_with_jail_check(relative=True) def local_media_filepath_rel(self, media_id: str) -> str: return os.path.join( "local_content", @@ -151,7 +177,7 @@ class MediaFilePaths: local_media_filepath = _wrap_in_base_path(local_media_filepath_rel) - @_wrap_with_jail_check + @_wrap_with_jail_check(relative=True) def local_media_thumbnail_rel( self, media_id: str, width: int, height: int, content_type: str, method: str ) -> str: @@ -167,7 +193,7 @@ class MediaFilePaths: local_media_thumbnail = _wrap_in_base_path(local_media_thumbnail_rel) - @_wrap_with_jail_check + @_wrap_with_jail_check(relative=False) def local_media_thumbnail_dir(self, media_id: str) -> str: """ Retrieve the local store path of thumbnails of a given media_id @@ -185,7 +211,7 @@ class MediaFilePaths: _validate_path_component(media_id[4:]), ) - @_wrap_with_jail_check + @_wrap_with_jail_check(relative=True) def remote_media_filepath_rel(self, server_name: str, file_id: str) -> str: return os.path.join( "remote_content", @@ -197,7 +223,7 @@ class MediaFilePaths: remote_media_filepath = _wrap_in_base_path(remote_media_filepath_rel) - @_wrap_with_jail_check + @_wrap_with_jail_check(relative=True) def remote_media_thumbnail_rel( self, server_name: str, @@ -223,7 +249,7 @@ class MediaFilePaths: # Legacy path that was used to store thumbnails previously. # Should be removed after some time, when most of the thumbnails are stored # using the new path. - @_wrap_with_jail_check + @_wrap_with_jail_check(relative=True) def remote_media_thumbnail_rel_legacy( self, server_name: str, file_id: str, width: int, height: int, content_type: str ) -> str: @@ -238,6 +264,7 @@ class MediaFilePaths: _validate_path_component(file_name), ) + @_wrap_with_jail_check(relative=False) def remote_media_thumbnail_dir(self, server_name: str, file_id: str) -> str: return os.path.join( self.base_path, @@ -248,7 +275,7 @@ class MediaFilePaths: _validate_path_component(file_id[4:]), ) - @_wrap_with_jail_check + @_wrap_with_jail_check(relative=True) def url_cache_filepath_rel(self, media_id: str) -> str: if NEW_FORMAT_ID_RE.match(media_id): # Media id is of the form <DATE><RANDOM_STRING> @@ -268,7 +295,7 @@ class MediaFilePaths: url_cache_filepath = _wrap_in_base_path(url_cache_filepath_rel) - @_wrap_with_jail_check + @_wrap_with_jail_check(relative=False) def url_cache_filepath_dirs_to_delete(self, media_id: str) -> List[str]: "The dirs to try and remove if we delete the media_id file" if NEW_FORMAT_ID_RE.match(media_id): @@ -290,7 +317,7 @@ class MediaFilePaths: ), ] - @_wrap_with_jail_check + @_wrap_with_jail_check(relative=True) def url_cache_thumbnail_rel( self, media_id: str, width: int, height: int, content_type: str, method: str ) -> str: @@ -318,7 +345,7 @@ class MediaFilePaths: url_cache_thumbnail = _wrap_in_base_path(url_cache_thumbnail_rel) - @_wrap_with_jail_check + @_wrap_with_jail_check(relative=True) def url_cache_thumbnail_directory_rel(self, media_id: str) -> str: # Media id is of the form <DATE><RANDOM_STRING> # E.g.: 2017-09-28-fsdRDt24DS234dsf @@ -341,7 +368,7 @@ class MediaFilePaths: url_cache_thumbnail_directory_rel ) - @_wrap_with_jail_check + @_wrap_with_jail_check(relative=False) def url_cache_thumbnail_dirs_to_delete(self, media_id: str) -> List[str]: "The dirs to try and remove if we delete the media_id thumbnails" # Media id is of the form <DATE><RANDOM_STRING> |