diff --git a/tests/rest/media/test_domain_blocking.py b/tests/rest/media/test_domain_blocking.py
new file mode 100644
index 0000000000..9beeeab843
--- /dev/null
+++ b/tests/rest/media/test_domain_blocking.py
@@ -0,0 +1,139 @@
+# Copyright 2023 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from typing import Dict
+
+from twisted.test.proto_helpers import MemoryReactor
+from twisted.web.resource import Resource
+
+from synapse.media._base import FileInfo
+from synapse.server import HomeServer
+from synapse.util import Clock
+
+from tests import unittest
+from tests.test_utils import SMALL_PNG
+from tests.unittest import override_config
+
+
+class MediaDomainBlockingTests(unittest.HomeserverTestCase):
+ remote_media_id = "doesnotmatter"
+ remote_server_name = "evil.com"
+
+ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
+ self.store = hs.get_datastores().main
+
+ # Inject a piece of media. We'll use this to ensure we're returning a sane
+ # response when we're not supposed to block it, distinguishing a media block
+ # from a regular 404.
+ file_id = "abcdefg12345"
+ file_info = FileInfo(server_name=self.remote_server_name, file_id=file_id)
+ with hs.get_media_repository().media_storage.store_into_file(file_info) as (
+ f,
+ fname,
+ finish,
+ ):
+ f.write(SMALL_PNG)
+ self.get_success(finish())
+
+ self.get_success(
+ self.store.store_cached_remote_media(
+ origin=self.remote_server_name,
+ media_id=self.remote_media_id,
+ media_type="image/png",
+ media_length=1,
+ time_now_ms=clock.time_msec(),
+ upload_name="test.png",
+ filesystem_id=file_id,
+ )
+ )
+
+ def create_resource_dict(self) -> Dict[str, Resource]:
+ # We need to manually set the resource tree to include media, the
+ # default only does `/_matrix/client` APIs.
+ return {"/_matrix/media": self.hs.get_media_repository_resource()}
+
+ @override_config(
+ {
+ # Disable downloads from the domain we'll be trying to download from.
+ # Should result in a 404.
+ "prevent_media_downloads_from": ["evil.com"]
+ }
+ )
+ def test_cannot_download_blocked_media(self) -> None:
+ """
+ Tests to ensure that remote media which is blocked cannot be downloaded.
+ """
+ response = self.make_request(
+ "GET",
+ f"/_matrix/media/v3/download/evil.com/{self.remote_media_id}",
+ shorthand=False,
+ )
+ self.assertEqual(response.code, 404)
+
+ @override_config(
+ {
+ # Disable downloads from a domain we won't be requesting downloads from.
+ # This proves we haven't broken anything.
+ "prevent_media_downloads_from": ["not-listed.com"]
+ }
+ )
+ def test_remote_media_normally_unblocked(self) -> None:
+ """
+ Tests to ensure that remote media is normally able to be downloaded
+ when no domain block is in place.
+ """
+ response = self.make_request(
+ "GET",
+ f"/_matrix/media/v3/download/evil.com/{self.remote_media_id}",
+ shorthand=False,
+ )
+ self.assertEqual(response.code, 200)
+
+ @override_config(
+ {
+ # Disable downloads from the domain we'll be trying to download from.
+ # Should result in a 404.
+ "prevent_media_downloads_from": ["evil.com"],
+ "dynamic_thumbnails": True,
+ }
+ )
+ def test_cannot_download_blocked_media_thumbnail(self) -> None:
+ """
+ Same test as test_cannot_download_blocked_media but for thumbnails.
+ """
+ response = self.make_request(
+ "GET",
+ f"/_matrix/media/v3/thumbnail/evil.com/{self.remote_media_id}?width=100&height=100",
+ shorthand=False,
+ content={"width": 100, "height": 100},
+ )
+ self.assertEqual(response.code, 404)
+
+ @override_config(
+ {
+ # Disable downloads from a domain we won't be requesting downloads from.
+ # This proves we haven't broken anything.
+ "prevent_media_downloads_from": ["not-listed.com"],
+ "dynamic_thumbnails": True,
+ }
+ )
+ def test_remote_media_thumbnail_normally_unblocked(self) -> None:
+ """
+ Same test as test_remote_media_normally_unblocked but for thumbnails.
+ """
+ response = self.make_request(
+ "GET",
+ f"/_matrix/media/v3/thumbnail/evil.com/{self.remote_media_id}?width=100&height=100",
+ shorthand=False,
+ )
+ self.assertEqual(response.code, 200)
|