summary refs log tree commit diff
path: root/synapse/rest/media/v1/filepath.py
diff options
context:
space:
mode:
authorSean Quah <seanq@element.io>2021-11-19 13:39:15 +0000
committerSean Quah <seanq@element.io>2021-11-19 13:39:15 +0000
commit91f2bd0907f1d05af67166846988e49644eb650c (patch)
tree1e1cb3881d0cf97b603e6a73f88af340ba491f58 /synapse/rest/media/v1/filepath.py
parent1.47.0 (diff)
downloadsynapse-91f2bd0907f1d05af67166846988e49644eb650c.tar.xz
Prevent the media store from writing outside of the configured directory
Also tighten validation of server names by forbidding invalid characters
in IPv6 addresses and empty domain labels.
Diffstat (limited to 'synapse/rest/media/v1/filepath.py')
-rw-r--r--synapse/rest/media/v1/filepath.py241
1 files changed, 199 insertions, 42 deletions
diff --git a/synapse/rest/media/v1/filepath.py b/synapse/rest/media/v1/filepath.py
index bec77088ee..c0e15c6513 100644
--- a/synapse/rest/media/v1/filepath.py
+++ b/synapse/rest/media/v1/filepath.py
@@ -16,7 +16,8 @@
 import functools
 import os
 import re
-from typing import Any, Callable, List, TypeVar, cast
+import string
+from typing import Any, Callable, List, TypeVar, Union, cast
 
 NEW_FORMAT_ID_RE = re.compile(r"^\d\d\d\d-\d\d-\d\d")
 
@@ -37,6 +38,85 @@ def _wrap_in_base_path(func: F) -> F:
     return cast(F, _wrapped)
 
 
+GetPathMethod = TypeVar(
+    "GetPathMethod", bound=Union[Callable[..., str], Callable[..., List[str]]]
+)
+
+
+def _wrap_with_jail_check(func: GetPathMethod) -> GetPathMethod:
+    """Wraps a path-returning method to check that the returned path(s) do not escape
+    the media store directory.
+
+    The check is not expected to ever fail, unless `func` is missing a call to
+    `_validate_path_component`, or `_validate_path_component` is buggy.
+
+    Args:
+        func: The `MediaFilePaths` method to wrap. The method may return either a single
+            path, or a list of paths. Returned paths may be either absolute or relative.
+
+    Returns:
+        The method, wrapped with a check to ensure that the returned path(s) lie within
+        the media store directory. Raises a `ValueError` if the check fails.
+    """
+
+    @functools.wraps(func)
+    def _wrapped(
+        self: "MediaFilePaths", *args: Any, **kwargs: Any
+    ) -> Union[str, List[str]]:
+        path_or_paths = func(self, *args, **kwargs)
+
+        if isinstance(path_or_paths, list):
+            paths_to_check = path_or_paths
+        else:
+            paths_to_check = [path_or_paths]
+
+        for path in paths_to_check:
+            # path may be an absolute or relative path, depending on the method being
+            # wrapped. When "appending" an absolute path, `os.path.join` discards the
+            # previous path, which is desired here.
+            normalized_path = os.path.normpath(os.path.join(self.real_base_path, path))
+            if (
+                os.path.commonpath([normalized_path, self.real_base_path])
+                != self.real_base_path
+            ):
+                raise ValueError(f"Invalid media store path: {path!r}")
+
+        return path_or_paths
+
+    return cast(GetPathMethod, _wrapped)
+
+
+ALLOWED_CHARACTERS = set(
+    string.ascii_letters
+    + string.digits
+    + "_-"
+    + ".[]:"  # Domain names, IPv6 addresses and ports in server names
+)
+FORBIDDEN_NAMES = {
+    "",
+    os.path.curdir,  # "." for the current platform
+    os.path.pardir,  # ".." for the current platform
+}
+
+
+def _validate_path_component(name: str) -> str:
+    """Checks that the given string can be safely used as a path component
+
+    Args:
+        name: The path component to check.
+
+    Returns:
+        The path component if valid.
+
+    Raises:
+        ValueError: If `name` cannot be safely used as a path component.
+    """
+    if not ALLOWED_CHARACTERS.issuperset(name) or name in FORBIDDEN_NAMES:
+        raise ValueError(f"Invalid path component: {name!r}")
+
+    return name
+
+
 class MediaFilePaths:
     """Describes where files are stored on disk.
 
@@ -48,22 +128,46 @@ class MediaFilePaths:
     def __init__(self, primary_base_path: str):
         self.base_path = primary_base_path
 
+        # The media store directory, with all symlinks resolved.
+        self.real_base_path = os.path.realpath(primary_base_path)
+
+        # Refuse to initialize if paths cannot be validated correctly for the current
+        # platform.
+        assert os.path.sep not in ALLOWED_CHARACTERS
+        assert os.path.altsep not in ALLOWED_CHARACTERS
+        # On Windows, paths have all sorts of weirdness which `_validate_path_component`
+        # does not consider. In any case, the remote media store can't work correctly
+        # for certain homeservers there, since ":"s aren't allowed in paths.
+        assert os.name == "posix"
+
+    @_wrap_with_jail_check
     def local_media_filepath_rel(self, media_id: str) -> str:
-        return os.path.join("local_content", media_id[0:2], media_id[2:4], media_id[4:])
+        return os.path.join(
+            "local_content",
+            _validate_path_component(media_id[0:2]),
+            _validate_path_component(media_id[2:4]),
+            _validate_path_component(media_id[4:]),
+        )
 
     local_media_filepath = _wrap_in_base_path(local_media_filepath_rel)
 
+    @_wrap_with_jail_check
     def local_media_thumbnail_rel(
         self, media_id: str, width: int, height: int, content_type: str, method: str
     ) -> str:
         top_level_type, sub_type = content_type.split("/")
         file_name = "%i-%i-%s-%s-%s" % (width, height, top_level_type, sub_type, method)
         return os.path.join(
-            "local_thumbnails", media_id[0:2], media_id[2:4], media_id[4:], file_name
+            "local_thumbnails",
+            _validate_path_component(media_id[0:2]),
+            _validate_path_component(media_id[2:4]),
+            _validate_path_component(media_id[4:]),
+            _validate_path_component(file_name),
         )
 
     local_media_thumbnail = _wrap_in_base_path(local_media_thumbnail_rel)
 
+    @_wrap_with_jail_check
     def local_media_thumbnail_dir(self, media_id: str) -> str:
         """
         Retrieve the local store path of thumbnails of a given media_id
@@ -76,18 +180,24 @@ class MediaFilePaths:
         return os.path.join(
             self.base_path,
             "local_thumbnails",
-            media_id[0:2],
-            media_id[2:4],
-            media_id[4:],
+            _validate_path_component(media_id[0:2]),
+            _validate_path_component(media_id[2:4]),
+            _validate_path_component(media_id[4:]),
         )
 
+    @_wrap_with_jail_check
     def remote_media_filepath_rel(self, server_name: str, file_id: str) -> str:
         return os.path.join(
-            "remote_content", server_name, file_id[0:2], file_id[2:4], file_id[4:]
+            "remote_content",
+            _validate_path_component(server_name),
+            _validate_path_component(file_id[0:2]),
+            _validate_path_component(file_id[2:4]),
+            _validate_path_component(file_id[4:]),
         )
 
     remote_media_filepath = _wrap_in_base_path(remote_media_filepath_rel)
 
+    @_wrap_with_jail_check
     def remote_media_thumbnail_rel(
         self,
         server_name: str,
@@ -101,11 +211,11 @@ class MediaFilePaths:
         file_name = "%i-%i-%s-%s-%s" % (width, height, top_level_type, sub_type, method)
         return os.path.join(
             "remote_thumbnail",
-            server_name,
-            file_id[0:2],
-            file_id[2:4],
-            file_id[4:],
-            file_name,
+            _validate_path_component(server_name),
+            _validate_path_component(file_id[0:2]),
+            _validate_path_component(file_id[2:4]),
+            _validate_path_component(file_id[4:]),
+            _validate_path_component(file_name),
         )
 
     remote_media_thumbnail = _wrap_in_base_path(remote_media_thumbnail_rel)
@@ -113,6 +223,7 @@ class MediaFilePaths:
     # Legacy path that was used to store thumbnails previously.
     # Should be removed after some time, when most of the thumbnails are stored
     # using the new path.
+    @_wrap_with_jail_check
     def remote_media_thumbnail_rel_legacy(
         self, server_name: str, file_id: str, width: int, height: int, content_type: str
     ) -> str:
@@ -120,43 +231,66 @@ class MediaFilePaths:
         file_name = "%i-%i-%s-%s" % (width, height, top_level_type, sub_type)
         return os.path.join(
             "remote_thumbnail",
-            server_name,
-            file_id[0:2],
-            file_id[2:4],
-            file_id[4:],
-            file_name,
+            _validate_path_component(server_name),
+            _validate_path_component(file_id[0:2]),
+            _validate_path_component(file_id[2:4]),
+            _validate_path_component(file_id[4:]),
+            _validate_path_component(file_name),
         )
 
     def remote_media_thumbnail_dir(self, server_name: str, file_id: str) -> str:
         return os.path.join(
             self.base_path,
             "remote_thumbnail",
-            server_name,
-            file_id[0:2],
-            file_id[2:4],
-            file_id[4:],
+            _validate_path_component(server_name),
+            _validate_path_component(file_id[0:2]),
+            _validate_path_component(file_id[2:4]),
+            _validate_path_component(file_id[4:]),
         )
 
+    @_wrap_with_jail_check
     def url_cache_filepath_rel(self, media_id: str) -> str:
         if NEW_FORMAT_ID_RE.match(media_id):
             # Media id is of the form <DATE><RANDOM_STRING>
             # E.g.: 2017-09-28-fsdRDt24DS234dsf
-            return os.path.join("url_cache", media_id[:10], media_id[11:])
+            return os.path.join(
+                "url_cache",
+                _validate_path_component(media_id[:10]),
+                _validate_path_component(media_id[11:]),
+            )
         else:
-            return os.path.join("url_cache", media_id[0:2], media_id[2:4], media_id[4:])
+            return os.path.join(
+                "url_cache",
+                _validate_path_component(media_id[0:2]),
+                _validate_path_component(media_id[2:4]),
+                _validate_path_component(media_id[4:]),
+            )
 
     url_cache_filepath = _wrap_in_base_path(url_cache_filepath_rel)
 
+    @_wrap_with_jail_check
     def url_cache_filepath_dirs_to_delete(self, media_id: str) -> List[str]:
         "The dirs to try and remove if we delete the media_id file"
         if NEW_FORMAT_ID_RE.match(media_id):
-            return [os.path.join(self.base_path, "url_cache", media_id[:10])]
+            return [
+                os.path.join(
+                    self.base_path, "url_cache", _validate_path_component(media_id[:10])
+                )
+            ]
         else:
             return [
-                os.path.join(self.base_path, "url_cache", media_id[0:2], media_id[2:4]),
-                os.path.join(self.base_path, "url_cache", media_id[0:2]),
+                os.path.join(
+                    self.base_path,
+                    "url_cache",
+                    _validate_path_component(media_id[0:2]),
+                    _validate_path_component(media_id[2:4]),
+                ),
+                os.path.join(
+                    self.base_path, "url_cache", _validate_path_component(media_id[0:2])
+                ),
             ]
 
+    @_wrap_with_jail_check
     def url_cache_thumbnail_rel(
         self, media_id: str, width: int, height: int, content_type: str, method: str
     ) -> str:
@@ -168,37 +302,46 @@ class MediaFilePaths:
 
         if NEW_FORMAT_ID_RE.match(media_id):
             return os.path.join(
-                "url_cache_thumbnails", media_id[:10], media_id[11:], file_name
+                "url_cache_thumbnails",
+                _validate_path_component(media_id[:10]),
+                _validate_path_component(media_id[11:]),
+                _validate_path_component(file_name),
             )
         else:
             return os.path.join(
                 "url_cache_thumbnails",
-                media_id[0:2],
-                media_id[2:4],
-                media_id[4:],
-                file_name,
+                _validate_path_component(media_id[0:2]),
+                _validate_path_component(media_id[2:4]),
+                _validate_path_component(media_id[4:]),
+                _validate_path_component(file_name),
             )
 
     url_cache_thumbnail = _wrap_in_base_path(url_cache_thumbnail_rel)
 
+    @_wrap_with_jail_check
     def url_cache_thumbnail_directory_rel(self, media_id: str) -> str:
         # Media id is of the form <DATE><RANDOM_STRING>
         # E.g.: 2017-09-28-fsdRDt24DS234dsf
 
         if NEW_FORMAT_ID_RE.match(media_id):
-            return os.path.join("url_cache_thumbnails", media_id[:10], media_id[11:])
+            return os.path.join(
+                "url_cache_thumbnails",
+                _validate_path_component(media_id[:10]),
+                _validate_path_component(media_id[11:]),
+            )
         else:
             return os.path.join(
                 "url_cache_thumbnails",
-                media_id[0:2],
-                media_id[2:4],
-                media_id[4:],
+                _validate_path_component(media_id[0:2]),
+                _validate_path_component(media_id[2:4]),
+                _validate_path_component(media_id[4:]),
             )
 
     url_cache_thumbnail_directory = _wrap_in_base_path(
         url_cache_thumbnail_directory_rel
     )
 
+    @_wrap_with_jail_check
     def url_cache_thumbnail_dirs_to_delete(self, media_id: str) -> List[str]:
         "The dirs to try and remove if we delete the media_id thumbnails"
         # Media id is of the form <DATE><RANDOM_STRING>
@@ -206,21 +349,35 @@ class MediaFilePaths:
         if NEW_FORMAT_ID_RE.match(media_id):
             return [
                 os.path.join(
-                    self.base_path, "url_cache_thumbnails", media_id[:10], media_id[11:]
+                    self.base_path,
+                    "url_cache_thumbnails",
+                    _validate_path_component(media_id[:10]),
+                    _validate_path_component(media_id[11:]),
+                ),
+                os.path.join(
+                    self.base_path,
+                    "url_cache_thumbnails",
+                    _validate_path_component(media_id[:10]),
                 ),
-                os.path.join(self.base_path, "url_cache_thumbnails", media_id[:10]),
             ]
         else:
             return [
                 os.path.join(
                     self.base_path,
                     "url_cache_thumbnails",
-                    media_id[0:2],
-                    media_id[2:4],
-                    media_id[4:],
+                    _validate_path_component(media_id[0:2]),
+                    _validate_path_component(media_id[2:4]),
+                    _validate_path_component(media_id[4:]),
                 ),
                 os.path.join(
-                    self.base_path, "url_cache_thumbnails", media_id[0:2], media_id[2:4]
+                    self.base_path,
+                    "url_cache_thumbnails",
+                    _validate_path_component(media_id[0:2]),
+                    _validate_path_component(media_id[2:4]),
+                ),
+                os.path.join(
+                    self.base_path,
+                    "url_cache_thumbnails",
+                    _validate_path_component(media_id[0:2]),
                 ),
-                os.path.join(self.base_path, "url_cache_thumbnails", media_id[0:2]),
             ]