diff --git a/synapse/rest/media/v1/_base.py b/synapse/rest/media/v1/_base.py
index 014fa893d6..9b40fd8a6c 100644
--- a/synapse/rest/media/v1/_base.py
+++ b/synapse/rest/media/v1/_base.py
@@ -29,7 +29,7 @@ from synapse.api.errors import Codes, SynapseError, cs_error
from synapse.http.server import finish_request, respond_with_json
from synapse.http.site import SynapseRequest
from synapse.logging.context import make_deferred_yieldable
-from synapse.util.stringutils import is_ascii
+from synapse.util.stringutils import is_ascii, parse_and_validate_server_name
logger = logging.getLogger(__name__)
@@ -51,6 +51,19 @@ TEXT_CONTENT_TYPES = [
def parse_media_id(request: Request) -> Tuple[str, str, Optional[str]]:
+ """Parses the server name, media ID and optional file name from the request URI
+
+ Also performs some rough validation on the server name.
+
+ Args:
+ request: The `Request`.
+
+ Returns:
+ A tuple containing the parsed server name, media ID and optional file name.
+
+ Raises:
+ SynapseError(404): if parsing or validation fail for any reason
+ """
try:
# The type on postpath seems incorrect in Twisted 21.2.0.
postpath: List[bytes] = request.postpath # type: ignore
@@ -62,6 +75,9 @@ def parse_media_id(request: Request) -> Tuple[str, str, Optional[str]]:
server_name = server_name_bytes.decode("utf-8")
media_id = media_id_bytes.decode("utf8")
+ # Validate the server name, raising if invalid
+ parse_and_validate_server_name(server_name)
+
file_name = None
if len(postpath) > 2:
try:
diff --git a/synapse/rest/media/v1/filepath.py b/synapse/rest/media/v1/filepath.py
index bec77088ee..c0e15c6513 100644
--- a/synapse/rest/media/v1/filepath.py
+++ b/synapse/rest/media/v1/filepath.py
@@ -16,7 +16,8 @@
import functools
import os
import re
-from typing import Any, Callable, List, TypeVar, cast
+import string
+from typing import Any, Callable, List, TypeVar, Union, cast
NEW_FORMAT_ID_RE = re.compile(r"^\d\d\d\d-\d\d-\d\d")
@@ -37,6 +38,85 @@ def _wrap_in_base_path(func: F) -> F:
return cast(F, _wrapped)
+GetPathMethod = TypeVar(
+ "GetPathMethod", bound=Union[Callable[..., str], Callable[..., List[str]]]
+)
+
+
+def _wrap_with_jail_check(func: GetPathMethod) -> GetPathMethod:
+ """Wraps a path-returning method to check that the returned path(s) do not escape
+ the media store directory.
+
+ The check is not expected to ever fail, unless `func` is missing a call to
+ `_validate_path_component`, or `_validate_path_component` is buggy.
+
+ Args:
+ func: The `MediaFilePaths` method to wrap. The method may return either a single
+ path, or a list of paths. Returned paths may be either absolute or relative.
+
+ Returns:
+ The method, wrapped with a check to ensure that the returned path(s) lie within
+ the media store directory. Raises a `ValueError` if the check fails.
+ """
+
+ @functools.wraps(func)
+ def _wrapped(
+ self: "MediaFilePaths", *args: Any, **kwargs: Any
+ ) -> Union[str, List[str]]:
+ path_or_paths = func(self, *args, **kwargs)
+
+ if isinstance(path_or_paths, list):
+ paths_to_check = path_or_paths
+ else:
+ paths_to_check = [path_or_paths]
+
+ for path in paths_to_check:
+ # path may be an absolute or relative path, depending on the method being
+ # wrapped. When "appending" an absolute path, `os.path.join` discards the
+ # previous path, which is desired here.
+ normalized_path = os.path.normpath(os.path.join(self.real_base_path, path))
+ if (
+ os.path.commonpath([normalized_path, self.real_base_path])
+ != self.real_base_path
+ ):
+ raise ValueError(f"Invalid media store path: {path!r}")
+
+ return path_or_paths
+
+ return cast(GetPathMethod, _wrapped)
+
+
+ALLOWED_CHARACTERS = set(
+ string.ascii_letters
+ + string.digits
+ + "_-"
+ + ".[]:" # Domain names, IPv6 addresses and ports in server names
+)
+FORBIDDEN_NAMES = {
+ "",
+ os.path.curdir, # "." for the current platform
+ os.path.pardir, # ".." for the current platform
+}
+
+
+def _validate_path_component(name: str) -> str:
+ """Checks that the given string can be safely used as a path component
+
+ Args:
+ name: The path component to check.
+
+ Returns:
+ The path component if valid.
+
+ Raises:
+ ValueError: If `name` cannot be safely used as a path component.
+ """
+ if not ALLOWED_CHARACTERS.issuperset(name) or name in FORBIDDEN_NAMES:
+ raise ValueError(f"Invalid path component: {name!r}")
+
+ return name
+
+
class MediaFilePaths:
"""Describes where files are stored on disk.
@@ -48,22 +128,46 @@ class MediaFilePaths:
def __init__(self, primary_base_path: str):
self.base_path = primary_base_path
+ # The media store directory, with all symlinks resolved.
+ self.real_base_path = os.path.realpath(primary_base_path)
+
+ # Refuse to initialize if paths cannot be validated correctly for the current
+ # platform.
+ assert os.path.sep not in ALLOWED_CHARACTERS
+ assert os.path.altsep not in ALLOWED_CHARACTERS
+ # On Windows, paths have all sorts of weirdness which `_validate_path_component`
+ # does not consider. In any case, the remote media store can't work correctly
+ # for certain homeservers there, since ":"s aren't allowed in paths.
+ assert os.name == "posix"
+
+ @_wrap_with_jail_check
def local_media_filepath_rel(self, media_id: str) -> str:
- return os.path.join("local_content", media_id[0:2], media_id[2:4], media_id[4:])
+ return os.path.join(
+ "local_content",
+ _validate_path_component(media_id[0:2]),
+ _validate_path_component(media_id[2:4]),
+ _validate_path_component(media_id[4:]),
+ )
local_media_filepath = _wrap_in_base_path(local_media_filepath_rel)
+ @_wrap_with_jail_check
def local_media_thumbnail_rel(
self, media_id: str, width: int, height: int, content_type: str, method: str
) -> str:
top_level_type, sub_type = content_type.split("/")
file_name = "%i-%i-%s-%s-%s" % (width, height, top_level_type, sub_type, method)
return os.path.join(
- "local_thumbnails", media_id[0:2], media_id[2:4], media_id[4:], file_name
+ "local_thumbnails",
+ _validate_path_component(media_id[0:2]),
+ _validate_path_component(media_id[2:4]),
+ _validate_path_component(media_id[4:]),
+ _validate_path_component(file_name),
)
local_media_thumbnail = _wrap_in_base_path(local_media_thumbnail_rel)
+ @_wrap_with_jail_check
def local_media_thumbnail_dir(self, media_id: str) -> str:
"""
Retrieve the local store path of thumbnails of a given media_id
@@ -76,18 +180,24 @@ class MediaFilePaths:
return os.path.join(
self.base_path,
"local_thumbnails",
- media_id[0:2],
- media_id[2:4],
- media_id[4:],
+ _validate_path_component(media_id[0:2]),
+ _validate_path_component(media_id[2:4]),
+ _validate_path_component(media_id[4:]),
)
+ @_wrap_with_jail_check
def remote_media_filepath_rel(self, server_name: str, file_id: str) -> str:
return os.path.join(
- "remote_content", server_name, file_id[0:2], file_id[2:4], file_id[4:]
+ "remote_content",
+ _validate_path_component(server_name),
+ _validate_path_component(file_id[0:2]),
+ _validate_path_component(file_id[2:4]),
+ _validate_path_component(file_id[4:]),
)
remote_media_filepath = _wrap_in_base_path(remote_media_filepath_rel)
+ @_wrap_with_jail_check
def remote_media_thumbnail_rel(
self,
server_name: str,
@@ -101,11 +211,11 @@ class MediaFilePaths:
file_name = "%i-%i-%s-%s-%s" % (width, height, top_level_type, sub_type, method)
return os.path.join(
"remote_thumbnail",
- server_name,
- file_id[0:2],
- file_id[2:4],
- file_id[4:],
- file_name,
+ _validate_path_component(server_name),
+ _validate_path_component(file_id[0:2]),
+ _validate_path_component(file_id[2:4]),
+ _validate_path_component(file_id[4:]),
+ _validate_path_component(file_name),
)
remote_media_thumbnail = _wrap_in_base_path(remote_media_thumbnail_rel)
@@ -113,6 +223,7 @@ class MediaFilePaths:
# Legacy path that was used to store thumbnails previously.
# Should be removed after some time, when most of the thumbnails are stored
# using the new path.
+ @_wrap_with_jail_check
def remote_media_thumbnail_rel_legacy(
self, server_name: str, file_id: str, width: int, height: int, content_type: str
) -> str:
@@ -120,43 +231,66 @@ class MediaFilePaths:
file_name = "%i-%i-%s-%s" % (width, height, top_level_type, sub_type)
return os.path.join(
"remote_thumbnail",
- server_name,
- file_id[0:2],
- file_id[2:4],
- file_id[4:],
- file_name,
+ _validate_path_component(server_name),
+ _validate_path_component(file_id[0:2]),
+ _validate_path_component(file_id[2:4]),
+ _validate_path_component(file_id[4:]),
+ _validate_path_component(file_name),
)
def remote_media_thumbnail_dir(self, server_name: str, file_id: str) -> str:
return os.path.join(
self.base_path,
"remote_thumbnail",
- server_name,
- file_id[0:2],
- file_id[2:4],
- file_id[4:],
+ _validate_path_component(server_name),
+ _validate_path_component(file_id[0:2]),
+ _validate_path_component(file_id[2:4]),
+ _validate_path_component(file_id[4:]),
)
+ @_wrap_with_jail_check
def url_cache_filepath_rel(self, media_id: str) -> str:
if NEW_FORMAT_ID_RE.match(media_id):
# Media id is of the form <DATE><RANDOM_STRING>
# E.g.: 2017-09-28-fsdRDt24DS234dsf
- return os.path.join("url_cache", media_id[:10], media_id[11:])
+ return os.path.join(
+ "url_cache",
+ _validate_path_component(media_id[:10]),
+ _validate_path_component(media_id[11:]),
+ )
else:
- return os.path.join("url_cache", media_id[0:2], media_id[2:4], media_id[4:])
+ return os.path.join(
+ "url_cache",
+ _validate_path_component(media_id[0:2]),
+ _validate_path_component(media_id[2:4]),
+ _validate_path_component(media_id[4:]),
+ )
url_cache_filepath = _wrap_in_base_path(url_cache_filepath_rel)
+ @_wrap_with_jail_check
def url_cache_filepath_dirs_to_delete(self, media_id: str) -> List[str]:
"The dirs to try and remove if we delete the media_id file"
if NEW_FORMAT_ID_RE.match(media_id):
- return [os.path.join(self.base_path, "url_cache", media_id[:10])]
+ return [
+ os.path.join(
+ self.base_path, "url_cache", _validate_path_component(media_id[:10])
+ )
+ ]
else:
return [
- os.path.join(self.base_path, "url_cache", media_id[0:2], media_id[2:4]),
- os.path.join(self.base_path, "url_cache", media_id[0:2]),
+ os.path.join(
+ self.base_path,
+ "url_cache",
+ _validate_path_component(media_id[0:2]),
+ _validate_path_component(media_id[2:4]),
+ ),
+ os.path.join(
+ self.base_path, "url_cache", _validate_path_component(media_id[0:2])
+ ),
]
+ @_wrap_with_jail_check
def url_cache_thumbnail_rel(
self, media_id: str, width: int, height: int, content_type: str, method: str
) -> str:
@@ -168,37 +302,46 @@ class MediaFilePaths:
if NEW_FORMAT_ID_RE.match(media_id):
return os.path.join(
- "url_cache_thumbnails", media_id[:10], media_id[11:], file_name
+ "url_cache_thumbnails",
+ _validate_path_component(media_id[:10]),
+ _validate_path_component(media_id[11:]),
+ _validate_path_component(file_name),
)
else:
return os.path.join(
"url_cache_thumbnails",
- media_id[0:2],
- media_id[2:4],
- media_id[4:],
- file_name,
+ _validate_path_component(media_id[0:2]),
+ _validate_path_component(media_id[2:4]),
+ _validate_path_component(media_id[4:]),
+ _validate_path_component(file_name),
)
url_cache_thumbnail = _wrap_in_base_path(url_cache_thumbnail_rel)
+ @_wrap_with_jail_check
def url_cache_thumbnail_directory_rel(self, media_id: str) -> str:
# Media id is of the form <DATE><RANDOM_STRING>
# E.g.: 2017-09-28-fsdRDt24DS234dsf
if NEW_FORMAT_ID_RE.match(media_id):
- return os.path.join("url_cache_thumbnails", media_id[:10], media_id[11:])
+ return os.path.join(
+ "url_cache_thumbnails",
+ _validate_path_component(media_id[:10]),
+ _validate_path_component(media_id[11:]),
+ )
else:
return os.path.join(
"url_cache_thumbnails",
- media_id[0:2],
- media_id[2:4],
- media_id[4:],
+ _validate_path_component(media_id[0:2]),
+ _validate_path_component(media_id[2:4]),
+ _validate_path_component(media_id[4:]),
)
url_cache_thumbnail_directory = _wrap_in_base_path(
url_cache_thumbnail_directory_rel
)
+ @_wrap_with_jail_check
def url_cache_thumbnail_dirs_to_delete(self, media_id: str) -> List[str]:
"The dirs to try and remove if we delete the media_id thumbnails"
# Media id is of the form <DATE><RANDOM_STRING>
@@ -206,21 +349,35 @@ class MediaFilePaths:
if NEW_FORMAT_ID_RE.match(media_id):
return [
os.path.join(
- self.base_path, "url_cache_thumbnails", media_id[:10], media_id[11:]
+ self.base_path,
+ "url_cache_thumbnails",
+ _validate_path_component(media_id[:10]),
+ _validate_path_component(media_id[11:]),
+ ),
+ os.path.join(
+ self.base_path,
+ "url_cache_thumbnails",
+ _validate_path_component(media_id[:10]),
),
- os.path.join(self.base_path, "url_cache_thumbnails", media_id[:10]),
]
else:
return [
os.path.join(
self.base_path,
"url_cache_thumbnails",
- media_id[0:2],
- media_id[2:4],
- media_id[4:],
+ _validate_path_component(media_id[0:2]),
+ _validate_path_component(media_id[2:4]),
+ _validate_path_component(media_id[4:]),
),
os.path.join(
- self.base_path, "url_cache_thumbnails", media_id[0:2], media_id[2:4]
+ self.base_path,
+ "url_cache_thumbnails",
+ _validate_path_component(media_id[0:2]),
+ _validate_path_component(media_id[2:4]),
+ ),
+ os.path.join(
+ self.base_path,
+ "url_cache_thumbnails",
+ _validate_path_component(media_id[0:2]),
),
- os.path.join(self.base_path, "url_cache_thumbnails", media_id[0:2]),
]
diff --git a/synapse/util/stringutils.py b/synapse/util/stringutils.py
index f029432191..ea1032b4fc 100644
--- a/synapse/util/stringutils.py
+++ b/synapse/util/stringutils.py
@@ -19,6 +19,8 @@ import string
from collections.abc import Iterable
from typing import Optional, Tuple
+from netaddr import valid_ipv6
+
from synapse.api.errors import Codes, SynapseError
_string_with_symbols = string.digits + string.ascii_letters + ".,;:^&*-_+=#~@"
@@ -97,7 +99,10 @@ def parse_server_name(server_name: str) -> Tuple[str, Optional[int]]:
raise ValueError("Invalid server name '%s'" % server_name)
-VALID_HOST_REGEX = re.compile("\\A[0-9a-zA-Z.-]+\\Z")
+# An approximation of the domain name syntax in RFC 1035, section 2.3.1.
+# NB: "\Z" is not equivalent to "$".
+# The latter will match the position before a "\n" at the end of a string.
+VALID_HOST_REGEX = re.compile("\\A[0-9a-zA-Z-]+(?:\\.[0-9a-zA-Z-]+)*\\Z")
def parse_and_validate_server_name(server_name: str) -> Tuple[str, Optional[int]]:
@@ -122,13 +127,15 @@ def parse_and_validate_server_name(server_name: str) -> Tuple[str, Optional[int]
if host[0] == "[":
if host[-1] != "]":
raise ValueError("Mismatched [...] in server name '%s'" % (server_name,))
- return host, port
- # otherwise it should only be alphanumerics.
- if not VALID_HOST_REGEX.match(host):
- raise ValueError(
- "Server name '%s' contains invalid characters" % (server_name,)
- )
+ # valid_ipv6 raises when given an empty string
+ ipv6_address = host[1:-1]
+ if not ipv6_address or not valid_ipv6(ipv6_address):
+ raise ValueError(
+ "Server name '%s' is not a valid IPv6 address" % (server_name,)
+ )
+ elif not VALID_HOST_REGEX.match(host):
+ raise ValueError("Server name '%s' has an invalid format" % (server_name,))
return host, port
diff --git a/tests/http/test_endpoint.py b/tests/http/test_endpoint.py
index 1f9a2f9b1d..c8cc21cadd 100644
--- a/tests/http/test_endpoint.py
+++ b/tests/http/test_endpoint.py
@@ -36,8 +36,11 @@ class ServerNameTestCase(unittest.TestCase):
"localhost:http", # non-numeric port
"1234]", # smells like ipv6 literal but isn't
"[1234",
+ "[1.2.3.4]",
"underscore_.com",
"percent%65.com",
+ "newline.com\n",
+ ".empty-label.com",
"1234:5678:80", # too many colons
]
for i in test_data:
diff --git a/tests/rest/media/v1/test_filepath.py b/tests/rest/media/v1/test_filepath.py
index 09504a485f..8fe94f7d85 100644
--- a/tests/rest/media/v1/test_filepath.py
+++ b/tests/rest/media/v1/test_filepath.py
@@ -11,6 +11,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import inspect
+from typing import Iterable
+
from synapse.rest.media.v1.filepath import MediaFilePaths
from tests import unittest
@@ -236,3 +239,250 @@ class MediaFilePathsTestCase(unittest.TestCase):
"/media_store/url_cache_thumbnails/Ge",
],
)
+
+ def test_server_name_validation(self):
+ """Test validation of server names"""
+ self._test_path_validation(
+ [
+ "remote_media_filepath_rel",
+ "remote_media_filepath",
+ "remote_media_thumbnail_rel",
+ "remote_media_thumbnail",
+ "remote_media_thumbnail_rel_legacy",
+ "remote_media_thumbnail_dir",
+ ],
+ parameter="server_name",
+ valid_values=[
+ "matrix.org",
+ "matrix.org:8448",
+ "matrix-federation.matrix.org",
+ "matrix-federation.matrix.org:8448",
+ "10.1.12.123",
+ "10.1.12.123:8448",
+ "[fd00:abcd::ffff]",
+ "[fd00:abcd::ffff]:8448",
+ ],
+ invalid_values=[
+ "/matrix.org",
+ "matrix.org/..",
+ "matrix.org\x00",
+ "",
+ ".",
+ "..",
+ "/",
+ ],
+ )
+
+ def test_file_id_validation(self):
+ """Test validation of local, remote and legacy URL cache file / media IDs"""
+ # File / media IDs get split into three parts to form paths, consisting of the
+ # first two characters, next two characters and rest of the ID.
+ valid_file_ids = [
+ "GerZNDnDZVjsOtardLuwfIBg",
+ # Unexpected, but produces an acceptable path:
+ "GerZN", # "N" becomes the last directory
+ ]
+ invalid_file_ids = [
+ "/erZNDnDZVjsOtardLuwfIBg",
+ "Ge/ZNDnDZVjsOtardLuwfIBg",
+ "GerZ/DnDZVjsOtardLuwfIBg",
+ "GerZ/..",
+ "G\x00rZNDnDZVjsOtardLuwfIBg",
+ "Ger\x00NDnDZVjsOtardLuwfIBg",
+ "GerZNDnDZVjsOtardLuwfIBg\x00",
+ "",
+ "Ge",
+ "GerZ",
+ "GerZ.",
+ "..rZNDnDZVjsOtardLuwfIBg",
+ "Ge..NDnDZVjsOtardLuwfIBg",
+ "GerZ..",
+ "GerZ/",
+ ]
+
+ self._test_path_validation(
+ [
+ "local_media_filepath_rel",
+ "local_media_filepath",
+ "local_media_thumbnail_rel",
+ "local_media_thumbnail",
+ "local_media_thumbnail_dir",
+ # Legacy URL cache media IDs
+ "url_cache_filepath_rel",
+ "url_cache_filepath",
+ # `url_cache_filepath_dirs_to_delete` is tested below.
+ "url_cache_thumbnail_rel",
+ "url_cache_thumbnail",
+ "url_cache_thumbnail_directory_rel",
+ "url_cache_thumbnail_directory",
+ "url_cache_thumbnail_dirs_to_delete",
+ ],
+ parameter="media_id",
+ valid_values=valid_file_ids,
+ invalid_values=invalid_file_ids,
+ )
+
+ # `url_cache_filepath_dirs_to_delete` ignores what would be the last path
+ # component, so only the first 4 characters matter.
+ self._test_path_validation(
+ [
+ "url_cache_filepath_dirs_to_delete",
+ ],
+ parameter="media_id",
+ valid_values=valid_file_ids,
+ invalid_values=[
+ "/erZNDnDZVjsOtardLuwfIBg",
+ "Ge/ZNDnDZVjsOtardLuwfIBg",
+ "G\x00rZNDnDZVjsOtardLuwfIBg",
+ "Ger\x00NDnDZVjsOtardLuwfIBg",
+ "",
+ "Ge",
+ "..rZNDnDZVjsOtardLuwfIBg",
+ "Ge..NDnDZVjsOtardLuwfIBg",
+ ],
+ )
+
+ self._test_path_validation(
+ [
+ "remote_media_filepath_rel",
+ "remote_media_filepath",
+ "remote_media_thumbnail_rel",
+ "remote_media_thumbnail",
+ "remote_media_thumbnail_rel_legacy",
+ "remote_media_thumbnail_dir",
+ ],
+ parameter="file_id",
+ valid_values=valid_file_ids,
+ invalid_values=invalid_file_ids,
+ )
+
+ def test_url_cache_media_id_validation(self):
+ """Test validation of URL cache media IDs"""
+ self._test_path_validation(
+ [
+ "url_cache_filepath_rel",
+ "url_cache_filepath",
+ # `url_cache_filepath_dirs_to_delete` only cares about the date prefix
+ "url_cache_thumbnail_rel",
+ "url_cache_thumbnail",
+ "url_cache_thumbnail_directory_rel",
+ "url_cache_thumbnail_directory",
+ "url_cache_thumbnail_dirs_to_delete",
+ ],
+ parameter="media_id",
+ valid_values=[
+ "2020-01-02_GerZNDnDZVjsOtar",
+ "2020-01-02_G", # Unexpected, but produces an acceptable path
+ ],
+ invalid_values=[
+ "2020-01-02",
+ "2020-01-02-",
+ "2020-01-02-.",
+ "2020-01-02-..",
+ "2020-01-02-/",
+ "2020-01-02-/GerZNDnDZVjsOtar",
+ "2020-01-02-GerZNDnDZVjsOtar/..",
+ "2020-01-02-GerZNDnDZVjsOtar\x00",
+ ],
+ )
+
+ def test_content_type_validation(self):
+ """Test validation of thumbnail content types"""
+ self._test_path_validation(
+ [
+ "local_media_thumbnail_rel",
+ "local_media_thumbnail",
+ "remote_media_thumbnail_rel",
+ "remote_media_thumbnail",
+ "remote_media_thumbnail_rel_legacy",
+ "url_cache_thumbnail_rel",
+ "url_cache_thumbnail",
+ ],
+ parameter="content_type",
+ valid_values=[
+ "image/jpeg",
+ ],
+ invalid_values=[
+ "", # ValueError: not enough values to unpack
+ "image/jpeg/abc", # ValueError: too many values to unpack
+ "image/jpeg\x00",
+ ],
+ )
+
+ def test_thumbnail_method_validation(self):
+ """Test validation of thumbnail methods"""
+ self._test_path_validation(
+ [
+ "local_media_thumbnail_rel",
+ "local_media_thumbnail",
+ "remote_media_thumbnail_rel",
+ "remote_media_thumbnail",
+ "url_cache_thumbnail_rel",
+ "url_cache_thumbnail",
+ ],
+ parameter="method",
+ valid_values=[
+ "crop",
+ "scale",
+ ],
+ invalid_values=[
+ "/scale",
+ "scale/..",
+ "scale\x00",
+ "/",
+ ],
+ )
+
+ def _test_path_validation(
+ self,
+ methods: Iterable[str],
+ parameter: str,
+ valid_values: Iterable[str],
+ invalid_values: Iterable[str],
+ ):
+ """Test that the specified methods validate the named parameter as expected
+
+ Args:
+ methods: The names of `MediaFilePaths` methods to test
+ parameter: The name of the parameter to test
+ valid_values: A list of parameter values that are expected to be accepted
+ invalid_values: A list of parameter values that are expected to be rejected
+
+ Raises:
+ AssertionError: If a value was accepted when it should have failed
+ validation.
+ ValueError: If a value failed validation when it should have been accepted.
+ """
+ for method in methods:
+ get_path = getattr(self.filepaths, method)
+
+ parameters = inspect.signature(get_path).parameters
+ kwargs = {
+ "server_name": "matrix.org",
+ "media_id": "GerZNDnDZVjsOtardLuwfIBg",
+ "file_id": "GerZNDnDZVjsOtardLuwfIBg",
+ "width": 800,
+ "height": 600,
+ "content_type": "image/jpeg",
+ "method": "scale",
+ }
+
+ if get_path.__name__.startswith("url_"):
+ kwargs["media_id"] = "2020-01-02_GerZNDnDZVjsOtar"
+
+ kwargs = {k: v for k, v in kwargs.items() if k in parameters}
+ kwargs.pop(parameter)
+
+ for value in valid_values:
+ kwargs[parameter] = value
+ get_path(**kwargs)
+ # No exception should be raised
+
+ for value in invalid_values:
+ with self.assertRaises(ValueError):
+ kwargs[parameter] = value
+ path_or_list = get_path(**kwargs)
+ self.fail(
+ f"{value!r} unexpectedly passed validation: "
+ f"{method} returned {path_or_list!r}"
+ )
|