summary refs log tree commit diff
path: root/synapse/media/filepath.py
diff options
context:
space:
mode:
authorPatrick Cloke <clokep@users.noreply.github.com>2023-02-27 08:26:05 -0500
committerGitHub <noreply@github.com>2023-02-27 08:26:05 -0500
commit4fc8875876374ec8f97a3b3cc344a4e3abcf769f (patch)
tree41b920427c0f62cd1463324c89fe7f5ce3d15164 /synapse/media/filepath.py
parentSmall fixes to `MatrixFederationHttpClient` docstrings (#15148) (diff)
downloadsynapse-4fc8875876374ec8f97a3b3cc344a4e3abcf769f.tar.xz
Refactor media modules. (#15146)
* Removes the `v1` directory from `test.rest.media.v1`.
* Moves the non-REST code from `synapse.rest.media.v1` to `synapse.media`.
* Flatten the `v1` directory from `synapse.rest.media`,  but leave compatiblity
  with 3rd party media repositories and spam checkers.
Diffstat (limited to 'synapse/media/filepath.py')
-rw-r--r--synapse/media/filepath.py410
1 files changed, 410 insertions, 0 deletions
diff --git a/synapse/media/filepath.py b/synapse/media/filepath.py
new file mode 100644
index 0000000000..1f6441c412
--- /dev/null
+++ b/synapse/media/filepath.py
@@ -0,0 +1,410 @@
+# Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2020-2021 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import functools
+import os
+import re
+import string
+from typing import Any, Callable, List, TypeVar, Union, cast
+
+NEW_FORMAT_ID_RE = re.compile(r"^\d\d\d\d-\d\d-\d\d")
+
+
+F = TypeVar("F", bound=Callable[..., str])
+
+
+def _wrap_in_base_path(func: F) -> F:
+    """Takes a function that returns a relative path and turns it into an
+    absolute path based on the location of the primary media store
+    """
+
+    @functools.wraps(func)
+    def _wrapped(self: "MediaFilePaths", *args: Any, **kwargs: Any) -> str:
+        path = func(self, *args, **kwargs)
+        return os.path.join(self.base_path, path)
+
+    return cast(F, _wrapped)
+
+
+GetPathMethod = TypeVar(
+    "GetPathMethod", bound=Union[Callable[..., str], Callable[..., List[str]]]
+)
+
+
+def _wrap_with_jail_check(relative: bool) -> Callable[[GetPathMethod], GetPathMethod]:
+    """Wraps a path-returning method to check that the returned path(s) do not escape
+    the media store directory.
+
+    The path-returning method may return either a single path, or a list of paths.
+
+    The check is not expected to ever fail, unless `func` is missing a call to
+    `_validate_path_component`, or `_validate_path_component` is buggy.
+
+    Args:
+        relative: A boolean indicating whether the wrapped method returns paths relative
+            to the media store directory.
+
+    Returns:
+        A method which will wrap a path-returning method, adding a check to ensure that
+        the returned path(s) lie within the media store directory. The check will raise
+        a `ValueError` if it fails.
+    """
+
+    def _wrap_with_jail_check_inner(func: GetPathMethod) -> GetPathMethod:
+        @functools.wraps(func)
+        def _wrapped(
+            self: "MediaFilePaths", *args: Any, **kwargs: Any
+        ) -> Union[str, List[str]]:
+            path_or_paths = func(self, *args, **kwargs)
+
+            if isinstance(path_or_paths, list):
+                paths_to_check = path_or_paths
+            else:
+                paths_to_check = [path_or_paths]
+
+            for path in paths_to_check:
+                # Construct the path that will ultimately be used.
+                # We cannot guess whether `path` is relative to the media store
+                # directory, since the media store directory may itself be a relative
+                # path.
+                if relative:
+                    path = os.path.join(self.base_path, path)
+                normalized_path = os.path.normpath(path)
+
+                # Now that `normpath` has eliminated `../`s and `./`s from the path,
+                # `os.path.commonpath` can be used to check whether it lies within the
+                # media store directory.
+                if (
+                    os.path.commonpath([normalized_path, self.normalized_base_path])
+                    != self.normalized_base_path
+                ):
+                    # The path resolves to outside the media store directory,
+                    # or `self.base_path` is `.`, which is an unlikely configuration.
+                    raise ValueError(f"Invalid media store path: {path!r}")
+
+                # Note that `os.path.normpath`/`abspath` has a subtle caveat:
+                # `a/b/c/../c` will normalize to `a/b/c`, but the former refers to a
+                # different path if `a/b/c` is a symlink. That is, the check above is
+                # not perfect and may allow a certain restricted subset of untrustworthy
+                # paths through. Since the check above is secondary to the main
+                # `_validate_path_component` checks, it's less important for it to be
+                # perfect.
+                #
+                # As an alternative, `os.path.realpath` will resolve symlinks, but
+                # proves problematic if there are symlinks inside the media store.
+                # eg. if `url_store/` is symlinked to elsewhere, its canonical path
+                # won't match that of the main media store directory.
+
+            return path_or_paths
+
+        return cast(GetPathMethod, _wrapped)
+
+    return _wrap_with_jail_check_inner
+
+
+ALLOWED_CHARACTERS = set(
+    string.ascii_letters
+    + string.digits
+    + "_-"
+    + ".[]:"  # Domain names, IPv6 addresses and ports in server names
+)
+FORBIDDEN_NAMES = {
+    "",
+    os.path.curdir,  # "." for the current platform
+    os.path.pardir,  # ".." for the current platform
+}
+
+
+def _validate_path_component(name: str) -> str:
+    """Checks that the given string can be safely used as a path component
+
+    Args:
+        name: The path component to check.
+
+    Returns:
+        The path component if valid.
+
+    Raises:
+        ValueError: If `name` cannot be safely used as a path component.
+    """
+    if not ALLOWED_CHARACTERS.issuperset(name) or name in FORBIDDEN_NAMES:
+        raise ValueError(f"Invalid path component: {name!r}")
+
+    return name
+
+
+class MediaFilePaths:
+    """Describes where files are stored on disk.
+
+    Most of the functions have a `*_rel` variant which returns a file path that
+    is relative to the base media store path. This is mainly used when we want
+    to write to the backup media store (when one is configured)
+    """
+
+    def __init__(self, primary_base_path: str):
+        self.base_path = primary_base_path
+        self.normalized_base_path = os.path.normpath(self.base_path)
+
+        # Refuse to initialize if paths cannot be validated correctly for the current
+        # platform.
+        assert os.path.sep not in ALLOWED_CHARACTERS
+        assert os.path.altsep not in ALLOWED_CHARACTERS
+        # On Windows, paths have all sorts of weirdness which `_validate_path_component`
+        # does not consider. In any case, the remote media store can't work correctly
+        # for certain homeservers there, since ":"s aren't allowed in paths.
+        assert os.name == "posix"
+
+    @_wrap_with_jail_check(relative=True)
+    def local_media_filepath_rel(self, media_id: str) -> str:
+        return os.path.join(
+            "local_content",
+            _validate_path_component(media_id[0:2]),
+            _validate_path_component(media_id[2:4]),
+            _validate_path_component(media_id[4:]),
+        )
+
+    local_media_filepath = _wrap_in_base_path(local_media_filepath_rel)
+
+    @_wrap_with_jail_check(relative=True)
+    def local_media_thumbnail_rel(
+        self, media_id: str, width: int, height: int, content_type: str, method: str
+    ) -> str:
+        top_level_type, sub_type = content_type.split("/")
+        file_name = "%i-%i-%s-%s-%s" % (width, height, top_level_type, sub_type, method)
+        return os.path.join(
+            "local_thumbnails",
+            _validate_path_component(media_id[0:2]),
+            _validate_path_component(media_id[2:4]),
+            _validate_path_component(media_id[4:]),
+            _validate_path_component(file_name),
+        )
+
+    local_media_thumbnail = _wrap_in_base_path(local_media_thumbnail_rel)
+
+    @_wrap_with_jail_check(relative=False)
+    def local_media_thumbnail_dir(self, media_id: str) -> str:
+        """
+        Retrieve the local store path of thumbnails of a given media_id
+
+        Args:
+            media_id: The media ID to query.
+        Returns:
+            Path of local_thumbnails from media_id
+        """
+        return os.path.join(
+            self.base_path,
+            "local_thumbnails",
+            _validate_path_component(media_id[0:2]),
+            _validate_path_component(media_id[2:4]),
+            _validate_path_component(media_id[4:]),
+        )
+
+    @_wrap_with_jail_check(relative=True)
+    def remote_media_filepath_rel(self, server_name: str, file_id: str) -> str:
+        return os.path.join(
+            "remote_content",
+            _validate_path_component(server_name),
+            _validate_path_component(file_id[0:2]),
+            _validate_path_component(file_id[2:4]),
+            _validate_path_component(file_id[4:]),
+        )
+
+    remote_media_filepath = _wrap_in_base_path(remote_media_filepath_rel)
+
+    @_wrap_with_jail_check(relative=True)
+    def remote_media_thumbnail_rel(
+        self,
+        server_name: str,
+        file_id: str,
+        width: int,
+        height: int,
+        content_type: str,
+        method: str,
+    ) -> str:
+        top_level_type, sub_type = content_type.split("/")
+        file_name = "%i-%i-%s-%s-%s" % (width, height, top_level_type, sub_type, method)
+        return os.path.join(
+            "remote_thumbnail",
+            _validate_path_component(server_name),
+            _validate_path_component(file_id[0:2]),
+            _validate_path_component(file_id[2:4]),
+            _validate_path_component(file_id[4:]),
+            _validate_path_component(file_name),
+        )
+
+    remote_media_thumbnail = _wrap_in_base_path(remote_media_thumbnail_rel)
+
+    # Legacy path that was used to store thumbnails previously.
+    # Should be removed after some time, when most of the thumbnails are stored
+    # using the new path.
+    @_wrap_with_jail_check(relative=True)
+    def remote_media_thumbnail_rel_legacy(
+        self, server_name: str, file_id: str, width: int, height: int, content_type: str
+    ) -> str:
+        top_level_type, sub_type = content_type.split("/")
+        file_name = "%i-%i-%s-%s" % (width, height, top_level_type, sub_type)
+        return os.path.join(
+            "remote_thumbnail",
+            _validate_path_component(server_name),
+            _validate_path_component(file_id[0:2]),
+            _validate_path_component(file_id[2:4]),
+            _validate_path_component(file_id[4:]),
+            _validate_path_component(file_name),
+        )
+
+    @_wrap_with_jail_check(relative=False)
+    def remote_media_thumbnail_dir(self, server_name: str, file_id: str) -> str:
+        return os.path.join(
+            self.base_path,
+            "remote_thumbnail",
+            _validate_path_component(server_name),
+            _validate_path_component(file_id[0:2]),
+            _validate_path_component(file_id[2:4]),
+            _validate_path_component(file_id[4:]),
+        )
+
+    @_wrap_with_jail_check(relative=True)
+    def url_cache_filepath_rel(self, media_id: str) -> str:
+        if NEW_FORMAT_ID_RE.match(media_id):
+            # Media id is of the form <DATE><RANDOM_STRING>
+            # E.g.: 2017-09-28-fsdRDt24DS234dsf
+            return os.path.join(
+                "url_cache",
+                _validate_path_component(media_id[:10]),
+                _validate_path_component(media_id[11:]),
+            )
+        else:
+            return os.path.join(
+                "url_cache",
+                _validate_path_component(media_id[0:2]),
+                _validate_path_component(media_id[2:4]),
+                _validate_path_component(media_id[4:]),
+            )
+
+    url_cache_filepath = _wrap_in_base_path(url_cache_filepath_rel)
+
+    @_wrap_with_jail_check(relative=False)
+    def url_cache_filepath_dirs_to_delete(self, media_id: str) -> List[str]:
+        "The dirs to try and remove if we delete the media_id file"
+        if NEW_FORMAT_ID_RE.match(media_id):
+            return [
+                os.path.join(
+                    self.base_path, "url_cache", _validate_path_component(media_id[:10])
+                )
+            ]
+        else:
+            return [
+                os.path.join(
+                    self.base_path,
+                    "url_cache",
+                    _validate_path_component(media_id[0:2]),
+                    _validate_path_component(media_id[2:4]),
+                ),
+                os.path.join(
+                    self.base_path, "url_cache", _validate_path_component(media_id[0:2])
+                ),
+            ]
+
+    @_wrap_with_jail_check(relative=True)
+    def url_cache_thumbnail_rel(
+        self, media_id: str, width: int, height: int, content_type: str, method: str
+    ) -> str:
+        # Media id is of the form <DATE><RANDOM_STRING>
+        # E.g.: 2017-09-28-fsdRDt24DS234dsf
+
+        top_level_type, sub_type = content_type.split("/")
+        file_name = "%i-%i-%s-%s-%s" % (width, height, top_level_type, sub_type, method)
+
+        if NEW_FORMAT_ID_RE.match(media_id):
+            return os.path.join(
+                "url_cache_thumbnails",
+                _validate_path_component(media_id[:10]),
+                _validate_path_component(media_id[11:]),
+                _validate_path_component(file_name),
+            )
+        else:
+            return os.path.join(
+                "url_cache_thumbnails",
+                _validate_path_component(media_id[0:2]),
+                _validate_path_component(media_id[2:4]),
+                _validate_path_component(media_id[4:]),
+                _validate_path_component(file_name),
+            )
+
+    url_cache_thumbnail = _wrap_in_base_path(url_cache_thumbnail_rel)
+
+    @_wrap_with_jail_check(relative=True)
+    def url_cache_thumbnail_directory_rel(self, media_id: str) -> str:
+        # Media id is of the form <DATE><RANDOM_STRING>
+        # E.g.: 2017-09-28-fsdRDt24DS234dsf
+
+        if NEW_FORMAT_ID_RE.match(media_id):
+            return os.path.join(
+                "url_cache_thumbnails",
+                _validate_path_component(media_id[:10]),
+                _validate_path_component(media_id[11:]),
+            )
+        else:
+            return os.path.join(
+                "url_cache_thumbnails",
+                _validate_path_component(media_id[0:2]),
+                _validate_path_component(media_id[2:4]),
+                _validate_path_component(media_id[4:]),
+            )
+
+    url_cache_thumbnail_directory = _wrap_in_base_path(
+        url_cache_thumbnail_directory_rel
+    )
+
+    @_wrap_with_jail_check(relative=False)
+    def url_cache_thumbnail_dirs_to_delete(self, media_id: str) -> List[str]:
+        "The dirs to try and remove if we delete the media_id thumbnails"
+        # Media id is of the form <DATE><RANDOM_STRING>
+        # E.g.: 2017-09-28-fsdRDt24DS234dsf
+        if NEW_FORMAT_ID_RE.match(media_id):
+            return [
+                os.path.join(
+                    self.base_path,
+                    "url_cache_thumbnails",
+                    _validate_path_component(media_id[:10]),
+                    _validate_path_component(media_id[11:]),
+                ),
+                os.path.join(
+                    self.base_path,
+                    "url_cache_thumbnails",
+                    _validate_path_component(media_id[:10]),
+                ),
+            ]
+        else:
+            return [
+                os.path.join(
+                    self.base_path,
+                    "url_cache_thumbnails",
+                    _validate_path_component(media_id[0:2]),
+                    _validate_path_component(media_id[2:4]),
+                    _validate_path_component(media_id[4:]),
+                ),
+                os.path.join(
+                    self.base_path,
+                    "url_cache_thumbnails",
+                    _validate_path_component(media_id[0:2]),
+                    _validate_path_component(media_id[2:4]),
+                ),
+                os.path.join(
+                    self.base_path,
+                    "url_cache_thumbnails",
+                    _validate_path_component(media_id[0:2]),
+                ),
+            ]