summary refs log tree commit diff
path: root/synapse/rest/media/v1/filepath.py
diff options
context:
space:
mode:
authorSean Quah <8349537+squahtx@users.noreply.github.com>2021-12-02 16:05:24 +0000
committerGitHub <noreply@github.com>2021-12-02 16:05:24 +0000
commit858d80bf0f9f656a03992794874081b806e49222 (patch)
treeb6be5ccb0c49765e9b0ccdd94122b63562aab839 /synapse/rest/media/v1/filepath.py
parentAdd type annotations to `tests.storage.test_appservice`. (#11488) (diff)
downloadsynapse-858d80bf0f9f656a03992794874081b806e49222.tar.xz
Fix media repository failing when media store path contains symlinks (#11446)
Diffstat (limited to 'synapse/rest/media/v1/filepath.py')
-rw-r--r--synapse/rest/media/v1/filepath.py115
1 files changed, 71 insertions, 44 deletions
diff --git a/synapse/rest/media/v1/filepath.py b/synapse/rest/media/v1/filepath.py
index c0e15c6513..1f6441c412 100644
--- a/synapse/rest/media/v1/filepath.py
+++ b/synapse/rest/media/v1/filepath.py
@@ -43,47 +43,75 @@ GetPathMethod = TypeVar(
 )
 
 
-def _wrap_with_jail_check(func: GetPathMethod) -> GetPathMethod:
+def _wrap_with_jail_check(relative: bool) -> Callable[[GetPathMethod], GetPathMethod]:
     """Wraps a path-returning method to check that the returned path(s) do not escape
     the media store directory.
 
+    The path-returning method may return either a single path, or a list of paths.
+
     The check is not expected to ever fail, unless `func` is missing a call to
     `_validate_path_component`, or `_validate_path_component` is buggy.
 
     Args:
-        func: The `MediaFilePaths` method to wrap. The method may return either a single
-            path, or a list of paths. Returned paths may be either absolute or relative.
+        relative: A boolean indicating whether the wrapped method returns paths relative
+            to the media store directory.
 
     Returns:
-        The method, wrapped with a check to ensure that the returned path(s) lie within
-        the media store directory. Raises a `ValueError` if the check fails.
+        A method which will wrap a path-returning method, adding a check to ensure that
+        the returned path(s) lie within the media store directory. The check will raise
+        a `ValueError` if it fails.
     """
 
-    @functools.wraps(func)
-    def _wrapped(
-        self: "MediaFilePaths", *args: Any, **kwargs: Any
-    ) -> Union[str, List[str]]:
-        path_or_paths = func(self, *args, **kwargs)
-
-        if isinstance(path_or_paths, list):
-            paths_to_check = path_or_paths
-        else:
-            paths_to_check = [path_or_paths]
-
-        for path in paths_to_check:
-            # path may be an absolute or relative path, depending on the method being
-            # wrapped. When "appending" an absolute path, `os.path.join` discards the
-            # previous path, which is desired here.
-            normalized_path = os.path.normpath(os.path.join(self.real_base_path, path))
-            if (
-                os.path.commonpath([normalized_path, self.real_base_path])
-                != self.real_base_path
-            ):
-                raise ValueError(f"Invalid media store path: {path!r}")
-
-        return path_or_paths
-
-    return cast(GetPathMethod, _wrapped)
+    def _wrap_with_jail_check_inner(func: GetPathMethod) -> GetPathMethod:
+        @functools.wraps(func)
+        def _wrapped(
+            self: "MediaFilePaths", *args: Any, **kwargs: Any
+        ) -> Union[str, List[str]]:
+            path_or_paths = func(self, *args, **kwargs)
+
+            if isinstance(path_or_paths, list):
+                paths_to_check = path_or_paths
+            else:
+                paths_to_check = [path_or_paths]
+
+            for path in paths_to_check:
+                # Construct the path that will ultimately be used.
+                # We cannot guess whether `path` is relative to the media store
+                # directory, since the media store directory may itself be a relative
+                # path.
+                if relative:
+                    path = os.path.join(self.base_path, path)
+                normalized_path = os.path.normpath(path)
+
+                # Now that `normpath` has eliminated `../`s and `./`s from the path,
+                # `os.path.commonpath` can be used to check whether it lies within the
+                # media store directory.
+                if (
+                    os.path.commonpath([normalized_path, self.normalized_base_path])
+                    != self.normalized_base_path
+                ):
+                    # The path resolves to outside the media store directory,
+                    # or `self.base_path` is `.`, which is an unlikely configuration.
+                    raise ValueError(f"Invalid media store path: {path!r}")
+
+                # Note that `os.path.normpath`/`abspath` has a subtle caveat:
+                # `a/b/c/../c` will normalize to `a/b/c`, but the former refers to a
+                # different path if `a/b/c` is a symlink. That is, the check above is
+                # not perfect and may allow a certain restricted subset of untrustworthy
+                # paths through. Since the check above is secondary to the main
+                # `_validate_path_component` checks, it's less important for it to be
+                # perfect.
+                #
+                # As an alternative, `os.path.realpath` will resolve symlinks, but
+                # proves problematic if there are symlinks inside the media store.
+                # eg. if `url_store/` is symlinked to elsewhere, its canonical path
+                # won't match that of the main media store directory.
+
+            return path_or_paths
+
+        return cast(GetPathMethod, _wrapped)
+
+    return _wrap_with_jail_check_inner
 
 
 ALLOWED_CHARACTERS = set(
@@ -127,9 +155,7 @@ class MediaFilePaths:
 
     def __init__(self, primary_base_path: str):
         self.base_path = primary_base_path
-
-        # The media store directory, with all symlinks resolved.
-        self.real_base_path = os.path.realpath(primary_base_path)
+        self.normalized_base_path = os.path.normpath(self.base_path)
 
         # Refuse to initialize if paths cannot be validated correctly for the current
         # platform.
@@ -140,7 +166,7 @@ class MediaFilePaths:
         # for certain homeservers there, since ":"s aren't allowed in paths.
         assert os.name == "posix"
 
-    @_wrap_with_jail_check
+    @_wrap_with_jail_check(relative=True)
     def local_media_filepath_rel(self, media_id: str) -> str:
         return os.path.join(
             "local_content",
@@ -151,7 +177,7 @@ class MediaFilePaths:
 
     local_media_filepath = _wrap_in_base_path(local_media_filepath_rel)
 
-    @_wrap_with_jail_check
+    @_wrap_with_jail_check(relative=True)
     def local_media_thumbnail_rel(
         self, media_id: str, width: int, height: int, content_type: str, method: str
     ) -> str:
@@ -167,7 +193,7 @@ class MediaFilePaths:
 
     local_media_thumbnail = _wrap_in_base_path(local_media_thumbnail_rel)
 
-    @_wrap_with_jail_check
+    @_wrap_with_jail_check(relative=False)
     def local_media_thumbnail_dir(self, media_id: str) -> str:
         """
         Retrieve the local store path of thumbnails of a given media_id
@@ -185,7 +211,7 @@ class MediaFilePaths:
             _validate_path_component(media_id[4:]),
         )
 
-    @_wrap_with_jail_check
+    @_wrap_with_jail_check(relative=True)
     def remote_media_filepath_rel(self, server_name: str, file_id: str) -> str:
         return os.path.join(
             "remote_content",
@@ -197,7 +223,7 @@ class MediaFilePaths:
 
     remote_media_filepath = _wrap_in_base_path(remote_media_filepath_rel)
 
-    @_wrap_with_jail_check
+    @_wrap_with_jail_check(relative=True)
     def remote_media_thumbnail_rel(
         self,
         server_name: str,
@@ -223,7 +249,7 @@ class MediaFilePaths:
     # Legacy path that was used to store thumbnails previously.
     # Should be removed after some time, when most of the thumbnails are stored
     # using the new path.
-    @_wrap_with_jail_check
+    @_wrap_with_jail_check(relative=True)
     def remote_media_thumbnail_rel_legacy(
         self, server_name: str, file_id: str, width: int, height: int, content_type: str
     ) -> str:
@@ -238,6 +264,7 @@ class MediaFilePaths:
             _validate_path_component(file_name),
         )
 
+    @_wrap_with_jail_check(relative=False)
     def remote_media_thumbnail_dir(self, server_name: str, file_id: str) -> str:
         return os.path.join(
             self.base_path,
@@ -248,7 +275,7 @@ class MediaFilePaths:
             _validate_path_component(file_id[4:]),
         )
 
-    @_wrap_with_jail_check
+    @_wrap_with_jail_check(relative=True)
     def url_cache_filepath_rel(self, media_id: str) -> str:
         if NEW_FORMAT_ID_RE.match(media_id):
             # Media id is of the form <DATE><RANDOM_STRING>
@@ -268,7 +295,7 @@ class MediaFilePaths:
 
     url_cache_filepath = _wrap_in_base_path(url_cache_filepath_rel)
 
-    @_wrap_with_jail_check
+    @_wrap_with_jail_check(relative=False)
     def url_cache_filepath_dirs_to_delete(self, media_id: str) -> List[str]:
         "The dirs to try and remove if we delete the media_id file"
         if NEW_FORMAT_ID_RE.match(media_id):
@@ -290,7 +317,7 @@ class MediaFilePaths:
                 ),
             ]
 
-    @_wrap_with_jail_check
+    @_wrap_with_jail_check(relative=True)
     def url_cache_thumbnail_rel(
         self, media_id: str, width: int, height: int, content_type: str, method: str
     ) -> str:
@@ -318,7 +345,7 @@ class MediaFilePaths:
 
     url_cache_thumbnail = _wrap_in_base_path(url_cache_thumbnail_rel)
 
-    @_wrap_with_jail_check
+    @_wrap_with_jail_check(relative=True)
     def url_cache_thumbnail_directory_rel(self, media_id: str) -> str:
         # Media id is of the form <DATE><RANDOM_STRING>
         # E.g.: 2017-09-28-fsdRDt24DS234dsf
@@ -341,7 +368,7 @@ class MediaFilePaths:
         url_cache_thumbnail_directory_rel
     )
 
-    @_wrap_with_jail_check
+    @_wrap_with_jail_check(relative=False)
     def url_cache_thumbnail_dirs_to_delete(self, media_id: str) -> List[str]:
         "The dirs to try and remove if we delete the media_id thumbnails"
         # Media id is of the form <DATE><RANDOM_STRING>