diff --git a/synapse/rest/media/v1/filepath.py b/synapse/rest/media/v1/filepath.py
index bec77088ee..1f6441c412 100644
--- a/synapse/rest/media/v1/filepath.py
+++ b/synapse/rest/media/v1/filepath.py
@@ -16,7 +16,8 @@
import functools
import os
import re
-from typing import Any, Callable, List, TypeVar, cast
+import string
+from typing import Any, Callable, List, TypeVar, Union, cast
NEW_FORMAT_ID_RE = re.compile(r"^\d\d\d\d-\d\d-\d\d")
@@ -37,6 +38,113 @@ def _wrap_in_base_path(func: F) -> F:
return cast(F, _wrapped)
+GetPathMethod = TypeVar(
+ "GetPathMethod", bound=Union[Callable[..., str], Callable[..., List[str]]]
+)
+
+
+def _wrap_with_jail_check(relative: bool) -> Callable[[GetPathMethod], GetPathMethod]:
+ """Wraps a path-returning method to check that the returned path(s) do not escape
+ the media store directory.
+
+ The path-returning method may return either a single path, or a list of paths.
+
+ The check is not expected to ever fail, unless `func` is missing a call to
+ `_validate_path_component`, or `_validate_path_component` is buggy.
+
+ Args:
+ relative: A boolean indicating whether the wrapped method returns paths relative
+ to the media store directory.
+
+ Returns:
+ A method which will wrap a path-returning method, adding a check to ensure that
+ the returned path(s) lie within the media store directory. The check will raise
+ a `ValueError` if it fails.
+ """
+
+ def _wrap_with_jail_check_inner(func: GetPathMethod) -> GetPathMethod:
+ @functools.wraps(func)
+ def _wrapped(
+ self: "MediaFilePaths", *args: Any, **kwargs: Any
+ ) -> Union[str, List[str]]:
+ path_or_paths = func(self, *args, **kwargs)
+
+ if isinstance(path_or_paths, list):
+ paths_to_check = path_or_paths
+ else:
+ paths_to_check = [path_or_paths]
+
+ for path in paths_to_check:
+ # Construct the path that will ultimately be used.
+ # We cannot guess whether `path` is relative to the media store
+ # directory, since the media store directory may itself be a relative
+ # path.
+ if relative:
+ path = os.path.join(self.base_path, path)
+ normalized_path = os.path.normpath(path)
+
+ # Now that `normpath` has eliminated `../`s and `./`s from the path,
+ # `os.path.commonpath` can be used to check whether it lies within the
+ # media store directory.
+ if (
+ os.path.commonpath([normalized_path, self.normalized_base_path])
+ != self.normalized_base_path
+ ):
+ # The path resolves to outside the media store directory,
+ # or `self.base_path` is `.`, which is an unlikely configuration.
+ raise ValueError(f"Invalid media store path: {path!r}")
+
+ # Note that `os.path.normpath`/`abspath` has a subtle caveat:
+ # `a/b/c/../c` will normalize to `a/b/c`, but the former refers to a
+ # different path if `a/b/c` is a symlink. That is, the check above is
+ # not perfect and may allow a certain restricted subset of untrustworthy
+ # paths through. Since the check above is secondary to the main
+ # `_validate_path_component` checks, it's less important for it to be
+ # perfect.
+ #
+ # As an alternative, `os.path.realpath` will resolve symlinks, but
+ # proves problematic if there are symlinks inside the media store.
+ # eg. if `url_store/` is symlinked to elsewhere, its canonical path
+ # won't match that of the main media store directory.
+
+ return path_or_paths
+
+ return cast(GetPathMethod, _wrapped)
+
+ return _wrap_with_jail_check_inner
+
+
+ALLOWED_CHARACTERS = set(
+ string.ascii_letters
+ + string.digits
+ + "_-"
+ + ".[]:" # Domain names, IPv6 addresses and ports in server names
+)
+FORBIDDEN_NAMES = {
+ "",
+ os.path.curdir, # "." for the current platform
+ os.path.pardir, # ".." for the current platform
+}
+
+
+def _validate_path_component(name: str) -> str:
+ """Checks that the given string can be safely used as a path component
+
+ Args:
+ name: The path component to check.
+
+ Returns:
+ The path component if valid.
+
+ Raises:
+ ValueError: If `name` cannot be safely used as a path component.
+ """
+ if not ALLOWED_CHARACTERS.issuperset(name) or name in FORBIDDEN_NAMES:
+ raise ValueError(f"Invalid path component: {name!r}")
+
+ return name
+
+
class MediaFilePaths:
"""Describes where files are stored on disk.
@@ -47,23 +155,45 @@ class MediaFilePaths:
def __init__(self, primary_base_path: str):
self.base_path = primary_base_path
-
+ self.normalized_base_path = os.path.normpath(self.base_path)
+
+ # Refuse to initialize if paths cannot be validated correctly for the current
+ # platform.
+ assert os.path.sep not in ALLOWED_CHARACTERS
+ assert os.path.altsep not in ALLOWED_CHARACTERS
+ # On Windows, paths have all sorts of weirdness which `_validate_path_component`
+ # does not consider. In any case, the remote media store can't work correctly
+ # for certain homeservers there, since ":"s aren't allowed in paths.
+ assert os.name == "posix"
+
+ @_wrap_with_jail_check(relative=True)
def local_media_filepath_rel(self, media_id: str) -> str:
- return os.path.join("local_content", media_id[0:2], media_id[2:4], media_id[4:])
+ return os.path.join(
+ "local_content",
+ _validate_path_component(media_id[0:2]),
+ _validate_path_component(media_id[2:4]),
+ _validate_path_component(media_id[4:]),
+ )
local_media_filepath = _wrap_in_base_path(local_media_filepath_rel)
+ @_wrap_with_jail_check(relative=True)
def local_media_thumbnail_rel(
self, media_id: str, width: int, height: int, content_type: str, method: str
) -> str:
top_level_type, sub_type = content_type.split("/")
file_name = "%i-%i-%s-%s-%s" % (width, height, top_level_type, sub_type, method)
return os.path.join(
- "local_thumbnails", media_id[0:2], media_id[2:4], media_id[4:], file_name
+ "local_thumbnails",
+ _validate_path_component(media_id[0:2]),
+ _validate_path_component(media_id[2:4]),
+ _validate_path_component(media_id[4:]),
+ _validate_path_component(file_name),
)
local_media_thumbnail = _wrap_in_base_path(local_media_thumbnail_rel)
+ @_wrap_with_jail_check(relative=False)
def local_media_thumbnail_dir(self, media_id: str) -> str:
"""
Retrieve the local store path of thumbnails of a given media_id
@@ -76,18 +206,24 @@ class MediaFilePaths:
return os.path.join(
self.base_path,
"local_thumbnails",
- media_id[0:2],
- media_id[2:4],
- media_id[4:],
+ _validate_path_component(media_id[0:2]),
+ _validate_path_component(media_id[2:4]),
+ _validate_path_component(media_id[4:]),
)
+ @_wrap_with_jail_check(relative=True)
def remote_media_filepath_rel(self, server_name: str, file_id: str) -> str:
return os.path.join(
- "remote_content", server_name, file_id[0:2], file_id[2:4], file_id[4:]
+ "remote_content",
+ _validate_path_component(server_name),
+ _validate_path_component(file_id[0:2]),
+ _validate_path_component(file_id[2:4]),
+ _validate_path_component(file_id[4:]),
)
remote_media_filepath = _wrap_in_base_path(remote_media_filepath_rel)
+ @_wrap_with_jail_check(relative=True)
def remote_media_thumbnail_rel(
self,
server_name: str,
@@ -101,11 +237,11 @@ class MediaFilePaths:
file_name = "%i-%i-%s-%s-%s" % (width, height, top_level_type, sub_type, method)
return os.path.join(
"remote_thumbnail",
- server_name,
- file_id[0:2],
- file_id[2:4],
- file_id[4:],
- file_name,
+ _validate_path_component(server_name),
+ _validate_path_component(file_id[0:2]),
+ _validate_path_component(file_id[2:4]),
+ _validate_path_component(file_id[4:]),
+ _validate_path_component(file_name),
)
remote_media_thumbnail = _wrap_in_base_path(remote_media_thumbnail_rel)
@@ -113,6 +249,7 @@ class MediaFilePaths:
# Legacy path that was used to store thumbnails previously.
# Should be removed after some time, when most of the thumbnails are stored
# using the new path.
+ @_wrap_with_jail_check(relative=True)
def remote_media_thumbnail_rel_legacy(
self, server_name: str, file_id: str, width: int, height: int, content_type: str
) -> str:
@@ -120,43 +257,67 @@ class MediaFilePaths:
file_name = "%i-%i-%s-%s" % (width, height, top_level_type, sub_type)
return os.path.join(
"remote_thumbnail",
- server_name,
- file_id[0:2],
- file_id[2:4],
- file_id[4:],
- file_name,
+ _validate_path_component(server_name),
+ _validate_path_component(file_id[0:2]),
+ _validate_path_component(file_id[2:4]),
+ _validate_path_component(file_id[4:]),
+ _validate_path_component(file_name),
)
+ @_wrap_with_jail_check(relative=False)
def remote_media_thumbnail_dir(self, server_name: str, file_id: str) -> str:
return os.path.join(
self.base_path,
"remote_thumbnail",
- server_name,
- file_id[0:2],
- file_id[2:4],
- file_id[4:],
+ _validate_path_component(server_name),
+ _validate_path_component(file_id[0:2]),
+ _validate_path_component(file_id[2:4]),
+ _validate_path_component(file_id[4:]),
)
+ @_wrap_with_jail_check(relative=True)
def url_cache_filepath_rel(self, media_id: str) -> str:
if NEW_FORMAT_ID_RE.match(media_id):
# Media id is of the form <DATE><RANDOM_STRING>
# E.g.: 2017-09-28-fsdRDt24DS234dsf
- return os.path.join("url_cache", media_id[:10], media_id[11:])
+ return os.path.join(
+ "url_cache",
+ _validate_path_component(media_id[:10]),
+ _validate_path_component(media_id[11:]),
+ )
else:
- return os.path.join("url_cache", media_id[0:2], media_id[2:4], media_id[4:])
+ return os.path.join(
+ "url_cache",
+ _validate_path_component(media_id[0:2]),
+ _validate_path_component(media_id[2:4]),
+ _validate_path_component(media_id[4:]),
+ )
url_cache_filepath = _wrap_in_base_path(url_cache_filepath_rel)
+ @_wrap_with_jail_check(relative=False)
def url_cache_filepath_dirs_to_delete(self, media_id: str) -> List[str]:
"The dirs to try and remove if we delete the media_id file"
if NEW_FORMAT_ID_RE.match(media_id):
- return [os.path.join(self.base_path, "url_cache", media_id[:10])]
+ return [
+ os.path.join(
+ self.base_path, "url_cache", _validate_path_component(media_id[:10])
+ )
+ ]
else:
return [
- os.path.join(self.base_path, "url_cache", media_id[0:2], media_id[2:4]),
- os.path.join(self.base_path, "url_cache", media_id[0:2]),
+ os.path.join(
+ self.base_path,
+ "url_cache",
+ _validate_path_component(media_id[0:2]),
+ _validate_path_component(media_id[2:4]),
+ ),
+ os.path.join(
+ self.base_path, "url_cache", _validate_path_component(media_id[0:2])
+ ),
]
+ @_wrap_with_jail_check(relative=True)
def url_cache_thumbnail_rel(
self, media_id: str, width: int, height: int, content_type: str, method: str
) -> str:
@@ -168,37 +329,46 @@ class MediaFilePaths:
if NEW_FORMAT_ID_RE.match(media_id):
return os.path.join(
- "url_cache_thumbnails", media_id[:10], media_id[11:], file_name
+ "url_cache_thumbnails",
+ _validate_path_component(media_id[:10]),
+ _validate_path_component(media_id[11:]),
+ _validate_path_component(file_name),
)
else:
return os.path.join(
"url_cache_thumbnails",
- media_id[0:2],
- media_id[2:4],
- media_id[4:],
- file_name,
+ _validate_path_component(media_id[0:2]),
+ _validate_path_component(media_id[2:4]),
+ _validate_path_component(media_id[4:]),
+ _validate_path_component(file_name),
)
url_cache_thumbnail = _wrap_in_base_path(url_cache_thumbnail_rel)
+ @_wrap_with_jail_check(relative=True)
def url_cache_thumbnail_directory_rel(self, media_id: str) -> str:
# Media id is of the form <DATE><RANDOM_STRING>
# E.g.: 2017-09-28-fsdRDt24DS234dsf
if NEW_FORMAT_ID_RE.match(media_id):
- return os.path.join("url_cache_thumbnails", media_id[:10], media_id[11:])
+ return os.path.join(
+ "url_cache_thumbnails",
+ _validate_path_component(media_id[:10]),
+ _validate_path_component(media_id[11:]),
+ )
else:
return os.path.join(
"url_cache_thumbnails",
- media_id[0:2],
- media_id[2:4],
- media_id[4:],
+ _validate_path_component(media_id[0:2]),
+ _validate_path_component(media_id[2:4]),
+ _validate_path_component(media_id[4:]),
)
url_cache_thumbnail_directory = _wrap_in_base_path(
url_cache_thumbnail_directory_rel
)
+ @_wrap_with_jail_check(relative=False)
def url_cache_thumbnail_dirs_to_delete(self, media_id: str) -> List[str]:
"The dirs to try and remove if we delete the media_id thumbnails"
# Media id is of the form <DATE><RANDOM_STRING>
@@ -206,21 +376,35 @@ class MediaFilePaths:
if NEW_FORMAT_ID_RE.match(media_id):
return [
os.path.join(
- self.base_path, "url_cache_thumbnails", media_id[:10], media_id[11:]
+ self.base_path,
+ "url_cache_thumbnails",
+ _validate_path_component(media_id[:10]),
+ _validate_path_component(media_id[11:]),
+ ),
+ os.path.join(
+ self.base_path,
+ "url_cache_thumbnails",
+ _validate_path_component(media_id[:10]),
),
- os.path.join(self.base_path, "url_cache_thumbnails", media_id[:10]),
]
else:
return [
os.path.join(
self.base_path,
"url_cache_thumbnails",
- media_id[0:2],
- media_id[2:4],
- media_id[4:],
+ _validate_path_component(media_id[0:2]),
+ _validate_path_component(media_id[2:4]),
+ _validate_path_component(media_id[4:]),
),
os.path.join(
- self.base_path, "url_cache_thumbnails", media_id[0:2], media_id[2:4]
+ self.base_path,
+ "url_cache_thumbnails",
+ _validate_path_component(media_id[0:2]),
+ _validate_path_component(media_id[2:4]),
+ ),
+ os.path.join(
+ self.base_path,
+ "url_cache_thumbnails",
+ _validate_path_component(media_id[0:2]),
),
- os.path.join(self.base_path, "url_cache_thumbnails", media_id[0:2]),
]
|