diff --git a/changelog.d/11446.bugfix b/changelog.d/11446.bugfix
new file mode 100644
index 0000000000..fa5e055d50
--- /dev/null
+++ b/changelog.d/11446.bugfix
@@ -0,0 +1 @@
+Fix a bug introduced in 1.47.1 where the media repository would fail to work if the media store path contained any symbolic links.
diff --git a/synapse/rest/media/v1/filepath.py b/synapse/rest/media/v1/filepath.py
index c0e15c6513..1f6441c412 100644
--- a/synapse/rest/media/v1/filepath.py
+++ b/synapse/rest/media/v1/filepath.py
@@ -43,47 +43,75 @@ GetPathMethod = TypeVar(
)
-def _wrap_with_jail_check(func: GetPathMethod) -> GetPathMethod:
+def _wrap_with_jail_check(relative: bool) -> Callable[[GetPathMethod], GetPathMethod]:
"""Wraps a path-returning method to check that the returned path(s) do not escape
the media store directory.
+ The path-returning method may return either a single path, or a list of paths.
+
The check is not expected to ever fail, unless `func` is missing a call to
`_validate_path_component`, or `_validate_path_component` is buggy.
Args:
- func: The `MediaFilePaths` method to wrap. The method may return either a single
- path, or a list of paths. Returned paths may be either absolute or relative.
+ relative: A boolean indicating whether the wrapped method returns paths relative
+ to the media store directory.
Returns:
- The method, wrapped with a check to ensure that the returned path(s) lie within
- the media store directory. Raises a `ValueError` if the check fails.
+ A method which will wrap a path-returning method, adding a check to ensure that
+ the returned path(s) lie within the media store directory. The check will raise
+ a `ValueError` if it fails.
"""
- @functools.wraps(func)
- def _wrapped(
- self: "MediaFilePaths", *args: Any, **kwargs: Any
- ) -> Union[str, List[str]]:
- path_or_paths = func(self, *args, **kwargs)
-
- if isinstance(path_or_paths, list):
- paths_to_check = path_or_paths
- else:
- paths_to_check = [path_or_paths]
-
- for path in paths_to_check:
- # path may be an absolute or relative path, depending on the method being
- # wrapped. When "appending" an absolute path, `os.path.join` discards the
- # previous path, which is desired here.
- normalized_path = os.path.normpath(os.path.join(self.real_base_path, path))
- if (
- os.path.commonpath([normalized_path, self.real_base_path])
- != self.real_base_path
- ):
- raise ValueError(f"Invalid media store path: {path!r}")
-
- return path_or_paths
-
- return cast(GetPathMethod, _wrapped)
+ def _wrap_with_jail_check_inner(func: GetPathMethod) -> GetPathMethod:
+ @functools.wraps(func)
+ def _wrapped(
+ self: "MediaFilePaths", *args: Any, **kwargs: Any
+ ) -> Union[str, List[str]]:
+ path_or_paths = func(self, *args, **kwargs)
+
+ if isinstance(path_or_paths, list):
+ paths_to_check = path_or_paths
+ else:
+ paths_to_check = [path_or_paths]
+
+ for path in paths_to_check:
+ # Construct the path that will ultimately be used.
+ # We cannot guess whether `path` is relative to the media store
+ # directory, since the media store directory may itself be a relative
+ # path.
+ if relative:
+ path = os.path.join(self.base_path, path)
+ normalized_path = os.path.normpath(path)
+
+ # Now that `normpath` has eliminated `../`s and `./`s from the path,
+ # `os.path.commonpath` can be used to check whether it lies within the
+ # media store directory.
+ if (
+ os.path.commonpath([normalized_path, self.normalized_base_path])
+ != self.normalized_base_path
+ ):
+ # The path resolves to outside the media store directory,
+ # or `self.base_path` is `.`, which is an unlikely configuration.
+ raise ValueError(f"Invalid media store path: {path!r}")
+
+ # Note that `os.path.normpath`/`abspath` has a subtle caveat:
+ # `a/b/c/../c` will normalize to `a/b/c`, but the former refers to a
+ # different path if `a/b/c` is a symlink. That is, the check above is
+ # not perfect and may allow a certain restricted subset of untrustworthy
+ # paths through. Since the check above is secondary to the main
+ # `_validate_path_component` checks, it's less important for it to be
+ # perfect.
+ #
+ # As an alternative, `os.path.realpath` will resolve symlinks, but
+ # proves problematic if there are symlinks inside the media store.
+ # eg. if `url_store/` is symlinked to elsewhere, its canonical path
+ # won't match that of the main media store directory.
+
+ return path_or_paths
+
+ return cast(GetPathMethod, _wrapped)
+
+ return _wrap_with_jail_check_inner
ALLOWED_CHARACTERS = set(
@@ -127,9 +155,7 @@ class MediaFilePaths:
def __init__(self, primary_base_path: str):
self.base_path = primary_base_path
-
- # The media store directory, with all symlinks resolved.
- self.real_base_path = os.path.realpath(primary_base_path)
+ self.normalized_base_path = os.path.normpath(self.base_path)
# Refuse to initialize if paths cannot be validated correctly for the current
# platform.
@@ -140,7 +166,7 @@ class MediaFilePaths:
# for certain homeservers there, since ":"s aren't allowed in paths.
assert os.name == "posix"
- @_wrap_with_jail_check
+ @_wrap_with_jail_check(relative=True)
def local_media_filepath_rel(self, media_id: str) -> str:
return os.path.join(
"local_content",
@@ -151,7 +177,7 @@ class MediaFilePaths:
local_media_filepath = _wrap_in_base_path(local_media_filepath_rel)
- @_wrap_with_jail_check
+ @_wrap_with_jail_check(relative=True)
def local_media_thumbnail_rel(
self, media_id: str, width: int, height: int, content_type: str, method: str
) -> str:
@@ -167,7 +193,7 @@ class MediaFilePaths:
local_media_thumbnail = _wrap_in_base_path(local_media_thumbnail_rel)
- @_wrap_with_jail_check
+ @_wrap_with_jail_check(relative=False)
def local_media_thumbnail_dir(self, media_id: str) -> str:
"""
Retrieve the local store path of thumbnails of a given media_id
@@ -185,7 +211,7 @@ class MediaFilePaths:
_validate_path_component(media_id[4:]),
)
- @_wrap_with_jail_check
+ @_wrap_with_jail_check(relative=True)
def remote_media_filepath_rel(self, server_name: str, file_id: str) -> str:
return os.path.join(
"remote_content",
@@ -197,7 +223,7 @@ class MediaFilePaths:
remote_media_filepath = _wrap_in_base_path(remote_media_filepath_rel)
- @_wrap_with_jail_check
+ @_wrap_with_jail_check(relative=True)
def remote_media_thumbnail_rel(
self,
server_name: str,
@@ -223,7 +249,7 @@ class MediaFilePaths:
# Legacy path that was used to store thumbnails previously.
# Should be removed after some time, when most of the thumbnails are stored
# using the new path.
- @_wrap_with_jail_check
+ @_wrap_with_jail_check(relative=True)
def remote_media_thumbnail_rel_legacy(
self, server_name: str, file_id: str, width: int, height: int, content_type: str
) -> str:
@@ -238,6 +264,7 @@ class MediaFilePaths:
_validate_path_component(file_name),
)
+ @_wrap_with_jail_check(relative=False)
def remote_media_thumbnail_dir(self, server_name: str, file_id: str) -> str:
return os.path.join(
self.base_path,
@@ -248,7 +275,7 @@ class MediaFilePaths:
_validate_path_component(file_id[4:]),
)
- @_wrap_with_jail_check
+ @_wrap_with_jail_check(relative=True)
def url_cache_filepath_rel(self, media_id: str) -> str:
if NEW_FORMAT_ID_RE.match(media_id):
# Media id is of the form <DATE><RANDOM_STRING>
@@ -268,7 +295,7 @@ class MediaFilePaths:
url_cache_filepath = _wrap_in_base_path(url_cache_filepath_rel)
- @_wrap_with_jail_check
+ @_wrap_with_jail_check(relative=False)
def url_cache_filepath_dirs_to_delete(self, media_id: str) -> List[str]:
"The dirs to try and remove if we delete the media_id file"
if NEW_FORMAT_ID_RE.match(media_id):
@@ -290,7 +317,7 @@ class MediaFilePaths:
),
]
- @_wrap_with_jail_check
+ @_wrap_with_jail_check(relative=True)
def url_cache_thumbnail_rel(
self, media_id: str, width: int, height: int, content_type: str, method: str
) -> str:
@@ -318,7 +345,7 @@ class MediaFilePaths:
url_cache_thumbnail = _wrap_in_base_path(url_cache_thumbnail_rel)
- @_wrap_with_jail_check
+ @_wrap_with_jail_check(relative=True)
def url_cache_thumbnail_directory_rel(self, media_id: str) -> str:
# Media id is of the form <DATE><RANDOM_STRING>
# E.g.: 2017-09-28-fsdRDt24DS234dsf
@@ -341,7 +368,7 @@ class MediaFilePaths:
url_cache_thumbnail_directory_rel
)
- @_wrap_with_jail_check
+ @_wrap_with_jail_check(relative=False)
def url_cache_thumbnail_dirs_to_delete(self, media_id: str) -> List[str]:
"The dirs to try and remove if we delete the media_id thumbnails"
# Media id is of the form <DATE><RANDOM_STRING>
diff --git a/tests/rest/media/v1/test_filepath.py b/tests/rest/media/v1/test_filepath.py
index 8fe94f7d85..913bc530aa 100644
--- a/tests/rest/media/v1/test_filepath.py
+++ b/tests/rest/media/v1/test_filepath.py
@@ -12,9 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import inspect
+import os
from typing import Iterable
-from synapse.rest.media.v1.filepath import MediaFilePaths
+from synapse.rest.media.v1.filepath import MediaFilePaths, _wrap_with_jail_check
from tests import unittest
@@ -486,3 +487,109 @@ class MediaFilePathsTestCase(unittest.TestCase):
f"{value!r} unexpectedly passed validation: "
f"{method} returned {path_or_list!r}"
)
+
+
+class MediaFilePathsJailTestCase(unittest.TestCase):
+ def _check_relative_path(self, filepaths: MediaFilePaths, path: str) -> None:
+ """Passes a relative path through the jail check.
+
+ Args:
+ filepaths: The `MediaFilePaths` instance.
+ path: A path relative to the media store directory.
+
+ Raises:
+ ValueError: If the jail check fails.
+ """
+
+ @_wrap_with_jail_check(relative=True)
+ def _make_relative_path(self: MediaFilePaths, path: str) -> str:
+ return path
+
+ _make_relative_path(filepaths, path)
+
+ def _check_absolute_path(self, filepaths: MediaFilePaths, path: str) -> None:
+ """Passes an absolute path through the jail check.
+
+ Args:
+ filepaths: The `MediaFilePaths` instance.
+ path: A path relative to the media store directory.
+
+ Raises:
+ ValueError: If the jail check fails.
+ """
+
+ @_wrap_with_jail_check(relative=False)
+ def _make_absolute_path(self: MediaFilePaths, path: str) -> str:
+ return os.path.join(self.base_path, path)
+
+ _make_absolute_path(filepaths, path)
+
+ def test_traversal_inside(self) -> None:
+ """Test the jail check for paths that stay within the media directory."""
+ # Despite the `../`s, these paths still lie within the media directory and it's
+ # expected for the jail check to allow them through.
+ # These paths ought to trip the other checks in place and should never be
+ # returned.
+ filepaths = MediaFilePaths("/media_store")
+ path = "url_cache/2020-01-02/../../GerZNDnDZVjsOtar"
+ self._check_relative_path(filepaths, path)
+ self._check_absolute_path(filepaths, path)
+
+ def test_traversal_outside(self) -> None:
+ """Test that the jail check fails for paths that escape the media directory."""
+ filepaths = MediaFilePaths("/media_store")
+ path = "url_cache/2020-01-02/../../../GerZNDnDZVjsOtar"
+ with self.assertRaises(ValueError):
+ self._check_relative_path(filepaths, path)
+ with self.assertRaises(ValueError):
+ self._check_absolute_path(filepaths, path)
+
+ def test_traversal_reentry(self) -> None:
+ """Test the jail check for paths that exit and re-enter the media directory."""
+ # These paths lie outside the media directory if it is a symlink, and inside
+ # otherwise. Ideally the check should fail, but this proves difficult.
+ # This test documents the behaviour for this edge case.
+ # These paths ought to trip the other checks in place and should never be
+ # returned.
+ filepaths = MediaFilePaths("/media_store")
+ path = "url_cache/2020-01-02/../../../media_store/GerZNDnDZVjsOtar"
+ self._check_relative_path(filepaths, path)
+ self._check_absolute_path(filepaths, path)
+
+ def test_symlink(self) -> None:
+ """Test that a symlink does not cause the jail check to fail."""
+ media_store_path = self.mktemp()
+
+ # symlink the media store directory
+ os.symlink("/mnt/synapse/media_store", media_store_path)
+
+ # Test that relative and absolute paths don't trip the check
+ # NB: `media_store_path` is a relative path
+ filepaths = MediaFilePaths(media_store_path)
+ self._check_relative_path(filepaths, "url_cache/2020-01-02/GerZNDnDZVjsOtar")
+ self._check_absolute_path(filepaths, "url_cache/2020-01-02/GerZNDnDZVjsOtar")
+
+ filepaths = MediaFilePaths(os.path.abspath(media_store_path))
+ self._check_relative_path(filepaths, "url_cache/2020-01-02/GerZNDnDZVjsOtar")
+ self._check_absolute_path(filepaths, "url_cache/2020-01-02/GerZNDnDZVjsOtar")
+
+ def test_symlink_subdirectory(self) -> None:
+ """Test that a symlinked subdirectory does not cause the jail check to fail."""
+ media_store_path = self.mktemp()
+ os.mkdir(media_store_path)
+
+ # symlink `url_cache/`
+ os.symlink(
+ "/mnt/synapse/media_store_url_cache",
+ os.path.join(media_store_path, "url_cache"),
+ )
+
+ # Test that relative and absolute paths don't trip the check
+ # NB: `media_store_path` is a relative path
+ filepaths = MediaFilePaths(media_store_path)
+ self._check_relative_path(filepaths, "url_cache/2020-01-02/GerZNDnDZVjsOtar")
+ self._check_absolute_path(filepaths, "url_cache/2020-01-02/GerZNDnDZVjsOtar")
+
+ filepaths = MediaFilePaths(os.path.abspath(media_store_path))
+ self._check_relative_path(filepaths, "url_cache/2020-01-02/GerZNDnDZVjsOtar")
+ self._check_absolute_path(filepaths, "url_cache/2020-01-02/GerZNDnDZVjsOtar")
|