summary refs log tree commit diff
path: root/tests
diff options
context:
space:
mode:
authorShay <hillerys@element.io>2024-07-02 06:07:04 -0700
committerGitHub <noreply@github.com>2024-07-02 14:07:04 +0100
commit8f890447b0f8b6cbe369b162670185e8c746b2f2 (patch)
treec8c290661a59b06257ce7e2fda19e799d83825eb /tests
parentFix sync waiting for an invalid token from the "future" (#17386) (diff)
downloadsynapse-8f890447b0f8b6cbe369b162670185e8c746b2f2.tar.xz
Support MSC3916 by adding `_matrix/client/v1/media/download` endpoint (#17365)
Diffstat (limited to 'tests')
-rw-r--r--tests/federation/test_federation_media.py35
-rw-r--r--tests/http/test_client.py143
-rw-r--r--tests/media/test_media_storage.py14
-rw-r--r--tests/replication/test_multi_media_repo.py234
-rw-r--r--tests/rest/client/test_media.py609
5 files changed, 991 insertions, 44 deletions
diff --git a/tests/federation/test_federation_media.py b/tests/federation/test_federation_media.py
index 2c396adbe3..142f73cfdb 100644
--- a/tests/federation/test_federation_media.py
+++ b/tests/federation/test_federation_media.py
@@ -36,10 +36,9 @@ from synapse.util import Clock
 
 from tests import unittest
 from tests.test_utils import SMALL_PNG
-from tests.unittest import override_config
 
 
-class FederationUnstableMediaDownloadsTest(unittest.FederatingHomeserverTestCase):
+class FederationMediaDownloadsTest(unittest.FederatingHomeserverTestCase):
 
     def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
         super().prepare(reactor, clock, hs)
@@ -65,9 +64,6 @@ class FederationUnstableMediaDownloadsTest(unittest.FederatingHomeserverTestCase
         )
         self.media_repo = hs.get_media_repository()
 
-    @override_config(
-        {"experimental_features": {"msc3916_authenticated_media_enabled": True}}
-    )
     def test_file_download(self) -> None:
         content = io.BytesIO(b"file_to_stream")
         content_uri = self.get_success(
@@ -82,7 +78,7 @@ class FederationUnstableMediaDownloadsTest(unittest.FederatingHomeserverTestCase
         # test with a text file
         channel = self.make_signed_federation_request(
             "GET",
-            f"/_matrix/federation/unstable/org.matrix.msc3916/media/download/{content_uri.media_id}",
+            f"/_matrix/federation/v1/media/download/{content_uri.media_id}",
         )
         self.pump()
         self.assertEqual(200, channel.code)
@@ -106,7 +102,8 @@ class FederationUnstableMediaDownloadsTest(unittest.FederatingHomeserverTestCase
 
         # check that the text file and expected value exist
         found_file = any(
-            "\r\nContent-Type: text/plain\r\n\r\nfile_to_stream" in field
+            "\r\nContent-Type: text/plain\r\nContent-Disposition: inline; filename=test_upload\r\n\r\nfile_to_stream"
+            in field
             for field in stripped
         )
         self.assertTrue(found_file)
@@ -124,7 +121,7 @@ class FederationUnstableMediaDownloadsTest(unittest.FederatingHomeserverTestCase
         # test with an image file
         channel = self.make_signed_federation_request(
             "GET",
-            f"/_matrix/federation/unstable/org.matrix.msc3916/media/download/{content_uri.media_id}",
+            f"/_matrix/federation/v1/media/download/{content_uri.media_id}",
         )
         self.pump()
         self.assertEqual(200, channel.code)
@@ -149,25 +146,3 @@ class FederationUnstableMediaDownloadsTest(unittest.FederatingHomeserverTestCase
         # check that the png file exists and matches what was uploaded
         found_file = any(SMALL_PNG in field for field in stripped_bytes)
         self.assertTrue(found_file)
-
-    @override_config(
-        {"experimental_features": {"msc3916_authenticated_media_enabled": False}}
-    )
-    def test_disable_config(self) -> None:
-        content = io.BytesIO(b"file_to_stream")
-        content_uri = self.get_success(
-            self.media_repo.create_content(
-                "text/plain",
-                "test_upload",
-                content,
-                46,
-                UserID.from_string("@user_id:whatever.org"),
-            )
-        )
-        channel = self.make_signed_federation_request(
-            "GET",
-            f"/_matrix/federation/unstable/org.matrix.msc3916/media/download/{content_uri.media_id}",
-        )
-        self.pump()
-        self.assertEqual(404, channel.code)
-        self.assertEqual(channel.json_body.get("errcode"), "M_UNRECOGNIZED")
diff --git a/tests/http/test_client.py b/tests/http/test_client.py
index a98091d711..721917f957 100644
--- a/tests/http/test_client.py
+++ b/tests/http/test_client.py
@@ -37,18 +37,155 @@ from synapse.http.client import (
     BlocklistingAgentWrapper,
     BlocklistingReactorWrapper,
     BodyExceededMaxSize,
+    MultipartResponse,
     _DiscardBodyWithMaxSizeProtocol,
+    _MultipartParserProtocol,
     read_body_with_max_size,
+    read_multipart_response,
 )
 
 from tests.server import FakeTransport, get_clock
 from tests.unittest import TestCase
 
 
+class ReadMultipartResponseTests(TestCase):
+    data1 = b"\r\n\r\n--6067d4698f8d40a0a794ea7d7379d53a\r\nContent-Type: application/json\r\n\r\n{}\r\n--6067d4698f8d40a0a794ea7d7379d53a\r\nContent-Type: text/plain\r\nContent-Disposition: inline; filename=test_upload\r\n\r\nfile_"
+    data2 = b"to_stream\r\n--6067d4698f8d40a0a794ea7d7379d53a--\r\n\r\n"
+
+    redirect_data = b"\r\n\r\n--6067d4698f8d40a0a794ea7d7379d53a\r\nContent-Type: application/json\r\n\r\n{}\r\n--6067d4698f8d40a0a794ea7d7379d53a\r\nLocation: https://cdn.example.org/ab/c1/2345.txt\r\n\r\n--6067d4698f8d40a0a794ea7d7379d53a--\r\n\r\n"
+
+    def _build_multipart_response(
+        self, response_length: Union[int, str], max_length: int
+    ) -> Tuple[
+        BytesIO,
+        "Deferred[MultipartResponse]",
+        _MultipartParserProtocol,
+    ]:
+        """Start reading the body, returns the response, result and proto"""
+        response = Mock(length=response_length)
+        result = BytesIO()
+        boundary = "6067d4698f8d40a0a794ea7d7379d53a"
+        deferred = read_multipart_response(response, result, boundary, max_length)
+
+        # Fish the protocol out of the response.
+        protocol = response.deliverBody.call_args[0][0]
+        protocol.transport = Mock()
+
+        return result, deferred, protocol
+
+    def _assert_error(
+        self,
+        deferred: "Deferred[MultipartResponse]",
+        protocol: _MultipartParserProtocol,
+    ) -> None:
+        """Ensure that the expected error is received."""
+        assert isinstance(deferred.result, Failure)
+        self.assertIsInstance(deferred.result.value, BodyExceededMaxSize)
+        assert protocol.transport is not None
+        # type-ignore: presumably abortConnection has been replaced with a Mock.
+        protocol.transport.abortConnection.assert_called_once()  # type: ignore[attr-defined]
+
+    def _cleanup_error(self, deferred: "Deferred[MultipartResponse]") -> None:
+        """Ensure that the error in the Deferred is handled gracefully."""
+        called = [False]
+
+        def errback(f: Failure) -> None:
+            called[0] = True
+
+        deferred.addErrback(errback)
+        self.assertTrue(called[0])
+
+    def test_parse_file(self) -> None:
+        """
+        Check that a multipart response containing a file is properly parsed
+        into the json/file parts, and the json and file are properly captured
+        """
+        result, deferred, protocol = self._build_multipart_response(249, 250)
+
+        # Start sending data.
+        protocol.dataReceived(self.data1)
+        protocol.dataReceived(self.data2)
+        # Close the connection.
+        protocol.connectionLost(Failure(ResponseDone()))
+
+        multipart_response: MultipartResponse = deferred.result  # type: ignore[assignment]
+
+        self.assertEqual(multipart_response.json, b"{}")
+        self.assertEqual(result.getvalue(), b"file_to_stream")
+        self.assertEqual(multipart_response.length, len(b"file_to_stream"))
+        self.assertEqual(multipart_response.content_type, b"text/plain")
+        self.assertEqual(
+            multipart_response.disposition, b"inline; filename=test_upload"
+        )
+
+    def test_parse_redirect(self) -> None:
+        """
+        check that a multipart response containing a redirect is properly parsed and redirect url is
+        returned
+        """
+        result, deferred, protocol = self._build_multipart_response(249, 250)
+
+        # Start sending data.
+        protocol.dataReceived(self.redirect_data)
+        # Close the connection.
+        protocol.connectionLost(Failure(ResponseDone()))
+
+        multipart_response: MultipartResponse = deferred.result  # type: ignore[assignment]
+
+        self.assertEqual(multipart_response.json, b"{}")
+        self.assertEqual(result.getvalue(), b"")
+        self.assertEqual(
+            multipart_response.url, b"https://cdn.example.org/ab/c1/2345.txt"
+        )
+
+    def test_too_large(self) -> None:
+        """A response which is too large raises an exception."""
+        result, deferred, protocol = self._build_multipart_response(UNKNOWN_LENGTH, 180)
+
+        # Start sending data.
+        protocol.dataReceived(self.data1)
+
+        self.assertEqual(result.getvalue(), b"file_")
+        self._assert_error(deferred, protocol)
+        self._cleanup_error(deferred)
+
+    def test_additional_data(self) -> None:
+        """A connection can receive data after being closed."""
+        result, deferred, protocol = self._build_multipart_response(UNKNOWN_LENGTH, 180)
+
+        # Start sending data.
+        protocol.dataReceived(self.data1)
+        self._assert_error(deferred, protocol)
+
+        # More data might have come in.
+        protocol.dataReceived(self.data2)
+
+        self.assertEqual(result.getvalue(), b"file_")
+        self._assert_error(deferred, protocol)
+        self._cleanup_error(deferred)
+
+    def test_content_length(self) -> None:
+        """The body shouldn't be read (at all) if the Content-Length header is too large."""
+        result, deferred, protocol = self._build_multipart_response(250, 1)
+
+        # Deferred shouldn't be called yet.
+        self.assertFalse(deferred.called)
+
+        # Start sending data.
+        protocol.dataReceived(self.data1)
+        self._assert_error(deferred, protocol)
+        self._cleanup_error(deferred)
+
+        # The data is never consumed.
+        self.assertEqual(result.getvalue(), b"")
+
+
 class ReadBodyWithMaxSizeTests(TestCase):
-    def _build_response(
-        self, length: Union[int, str] = UNKNOWN_LENGTH
-    ) -> Tuple[BytesIO, "Deferred[int]", _DiscardBodyWithMaxSizeProtocol]:
+    def _build_response(self, length: Union[int, str] = UNKNOWN_LENGTH) -> Tuple[
+        BytesIO,
+        "Deferred[int]",
+        _DiscardBodyWithMaxSizeProtocol,
+    ]:
         """Start reading the body, returns the response, result and proto"""
         response = Mock(length=length)
         result = BytesIO()
diff --git a/tests/media/test_media_storage.py b/tests/media/test_media_storage.py
index 46d20ce775..024086b775 100644
--- a/tests/media/test_media_storage.py
+++ b/tests/media/test_media_storage.py
@@ -129,7 +129,7 @@ class MediaStorageTests(unittest.HomeserverTestCase):
 
 
 @attr.s(auto_attribs=True, slots=True, frozen=True)
-class _TestImage:
+class TestImage:
     """An image for testing thumbnailing with the expected results
 
     Attributes:
@@ -158,7 +158,7 @@ class _TestImage:
     is_inline: bool = True
 
 
-small_png = _TestImage(
+small_png = TestImage(
     SMALL_PNG,
     b"image/png",
     b".png",
@@ -175,7 +175,7 @@ small_png = _TestImage(
     ),
 )
 
-small_png_with_transparency = _TestImage(
+small_png_with_transparency = TestImage(
     unhexlify(
         b"89504e470d0a1a0a0000000d49484452000000010000000101000"
         b"00000376ef9240000000274524e5300010194fdae0000000a4944"
@@ -188,7 +188,7 @@ small_png_with_transparency = _TestImage(
     # different versions of Pillow.
 )
 
-small_lossless_webp = _TestImage(
+small_lossless_webp = TestImage(
     unhexlify(
         b"524946461a000000574542505650384c0d0000002f0000001007" b"1011118888fe0700"
     ),
@@ -196,7 +196,7 @@ small_lossless_webp = _TestImage(
     b".webp",
 )
 
-empty_file = _TestImage(
+empty_file = TestImage(
     b"",
     b"image/gif",
     b".gif",
@@ -204,7 +204,7 @@ empty_file = _TestImage(
     unable_to_thumbnail=True,
 )
 
-SVG = _TestImage(
+SVG = TestImage(
     b"""<?xml version="1.0"?>
 <!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN"
   "http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd">
@@ -236,7 +236,7 @@ urls = [
 @parameterized_class(("test_image", "url"), itertools.product(test_images, urls))
 class MediaRepoTests(unittest.HomeserverTestCase):
     servlets = [media.register_servlets]
-    test_image: ClassVar[_TestImage]
+    test_image: ClassVar[TestImage]
     hijack_auth = True
     user_id = "@test:user"
     url: ClassVar[str]
diff --git a/tests/replication/test_multi_media_repo.py b/tests/replication/test_multi_media_repo.py
index 4927e45446..6fc4600c41 100644
--- a/tests/replication/test_multi_media_repo.py
+++ b/tests/replication/test_multi_media_repo.py
@@ -28,7 +28,7 @@ from twisted.web.http import HTTPChannel
 from twisted.web.server import Request
 
 from synapse.rest import admin
-from synapse.rest.client import login
+from synapse.rest.client import login, media
 from synapse.server import HomeServer
 from synapse.util import Clock
 
@@ -255,6 +255,238 @@ class MediaRepoShardTestCase(BaseMultiWorkerStreamTestCase):
         return sum(len(files) for _, _, files in os.walk(path))
 
 
+class AuthenticatedMediaRepoShardTestCase(BaseMultiWorkerStreamTestCase):
+    """Checks running multiple media repos work correctly using autheticated media paths"""
+
+    servlets = [
+        admin.register_servlets_for_client_rest_resource,
+        login.register_servlets,
+        media.register_servlets,
+    ]
+
+    file_data = b"\r\n\r\n--6067d4698f8d40a0a794ea7d7379d53a\r\nContent-Type: application/json\r\n\r\n{}\r\n--6067d4698f8d40a0a794ea7d7379d53a\r\nContent-Type: text/plain\r\nContent-Disposition: inline; filename=test_upload\r\n\r\nfile_to_stream\r\n--6067d4698f8d40a0a794ea7d7379d53a--\r\n\r\n"
+
+    def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
+        self.user_id = self.register_user("user", "pass")
+        self.access_token = self.login("user", "pass")
+
+        self.reactor.lookups["example.com"] = "1.2.3.4"
+
+    def default_config(self) -> dict:
+        conf = super().default_config()
+        conf["federation_custom_ca_list"] = [get_test_ca_cert_file()]
+        return conf
+
+    def make_worker_hs(
+        self, worker_app: str, extra_config: Optional[dict] = None, **kwargs: Any
+    ) -> HomeServer:
+        worker_hs = super().make_worker_hs(worker_app, extra_config, **kwargs)
+        # Force the media paths onto the replication resource.
+        worker_hs.get_media_repository_resource().register_servlets(
+            self._hs_to_site[worker_hs].resource, worker_hs
+        )
+        return worker_hs
+
+    def _get_media_req(
+        self, hs: HomeServer, target: str, media_id: str
+    ) -> Tuple[FakeChannel, Request]:
+        """Request some remote media from the given HS by calling the download
+        API.
+
+        This then triggers an outbound request from the HS to the target.
+
+        Returns:
+            The channel for the *client* request and the *outbound* request for
+            the media which the caller should respond to.
+        """
+        channel = make_request(
+            self.reactor,
+            self._hs_to_site[hs],
+            "GET",
+            f"/_matrix/client/v1/media/download/{target}/{media_id}",
+            shorthand=False,
+            access_token=self.access_token,
+            await_result=False,
+        )
+        self.pump()
+
+        clients = self.reactor.tcpClients
+        self.assertGreaterEqual(len(clients), 1)
+        (host, port, client_factory, _timeout, _bindAddress) = clients.pop()
+
+        # build the test server
+        server_factory = Factory.forProtocol(HTTPChannel)
+        # Request.finish expects the factory to have a 'log' method.
+        server_factory.log = _log_request
+
+        server_tls_protocol = wrap_server_factory_for_tls(
+            server_factory, self.reactor, sanlist=[b"DNS:example.com"]
+        ).buildProtocol(None)
+
+        # now, tell the client protocol factory to build the client protocol (it will be a
+        # _WrappingProtocol, around a TLSMemoryBIOProtocol, around an
+        # HTTP11ClientProtocol) and wire the output of said protocol up to the server via
+        # a FakeTransport.
+        #
+        # Normally this would be done by the TCP socket code in Twisted, but we are
+        # stubbing that out here.
+        client_protocol = client_factory.buildProtocol(None)
+        client_protocol.makeConnection(
+            FakeTransport(server_tls_protocol, self.reactor, client_protocol)
+        )
+
+        # tell the server tls protocol to send its stuff back to the client, too
+        server_tls_protocol.makeConnection(
+            FakeTransport(client_protocol, self.reactor, server_tls_protocol)
+        )
+
+        # fish the test server back out of the server-side TLS protocol.
+        http_server: HTTPChannel = server_tls_protocol.wrappedProtocol
+
+        # give the reactor a pump to get the TLS juices flowing.
+        self.reactor.pump((0.1,))
+
+        self.assertEqual(len(http_server.requests), 1)
+        request = http_server.requests[0]
+
+        self.assertEqual(request.method, b"GET")
+        self.assertEqual(
+            request.path,
+            f"/_matrix/federation/v1/media/download/{media_id}".encode(),
+        )
+        self.assertEqual(
+            request.requestHeaders.getRawHeaders(b"host"), [target.encode("utf-8")]
+        )
+
+        return channel, request
+
+    def test_basic(self) -> None:
+        """Test basic fetching of remote media from a single worker."""
+        hs1 = self.make_worker_hs("synapse.app.generic_worker")
+
+        channel, request = self._get_media_req(hs1, "example.com:443", "ABC123")
+
+        request.setResponseCode(200)
+        request.responseHeaders.setRawHeaders(
+            b"Content-Type",
+            ["multipart/mixed; boundary=6067d4698f8d40a0a794ea7d7379d53a"],
+        )
+        request.write(self.file_data)
+        request.finish()
+
+        self.pump(0.1)
+
+        self.assertEqual(channel.code, 200)
+        self.assertEqual(channel.result["body"], b"file_to_stream")
+
+    def test_download_simple_file_race(self) -> None:
+        """Test that fetching remote media from two different processes at the
+        same time works.
+        """
+        hs1 = self.make_worker_hs("synapse.app.generic_worker")
+        hs2 = self.make_worker_hs("synapse.app.generic_worker")
+
+        start_count = self._count_remote_media()
+
+        # Make two requests without responding to the outbound media requests.
+        channel1, request1 = self._get_media_req(hs1, "example.com:443", "ABC123")
+        channel2, request2 = self._get_media_req(hs2, "example.com:443", "ABC123")
+
+        # Respond to the first outbound media request and check that the client
+        # request is successful
+        request1.setResponseCode(200)
+        request1.responseHeaders.setRawHeaders(
+            b"Content-Type",
+            ["multipart/mixed; boundary=6067d4698f8d40a0a794ea7d7379d53a"],
+        )
+        request1.write(self.file_data)
+        request1.finish()
+
+        self.pump(0.1)
+
+        self.assertEqual(channel1.code, 200, channel1.result["body"])
+        self.assertEqual(channel1.result["body"], b"file_to_stream")
+
+        # Now respond to the second with the same content.
+        request2.setResponseCode(200)
+        request2.responseHeaders.setRawHeaders(
+            b"Content-Type",
+            ["multipart/mixed; boundary=6067d4698f8d40a0a794ea7d7379d53a"],
+        )
+        request2.write(self.file_data)
+        request2.finish()
+
+        self.pump(0.1)
+
+        self.assertEqual(channel2.code, 200, channel2.result["body"])
+        self.assertEqual(channel2.result["body"], b"file_to_stream")
+
+        # We expect only one new file to have been persisted.
+        self.assertEqual(start_count + 1, self._count_remote_media())
+
+    def test_download_image_race(self) -> None:
+        """Test that fetching remote *images* from two different processes at
+        the same time works.
+
+        This checks that races generating thumbnails are handled correctly.
+        """
+        hs1 = self.make_worker_hs("synapse.app.generic_worker")
+        hs2 = self.make_worker_hs("synapse.app.generic_worker")
+
+        start_count = self._count_remote_thumbnails()
+
+        channel1, request1 = self._get_media_req(hs1, "example.com:443", "PIC1")
+        channel2, request2 = self._get_media_req(hs2, "example.com:443", "PIC1")
+
+        request1.setResponseCode(200)
+        request1.responseHeaders.setRawHeaders(
+            b"Content-Type",
+            ["multipart/mixed; boundary=6067d4698f8d40a0a794ea7d7379d53a"],
+        )
+        img_data = b"\r\n\r\n--6067d4698f8d40a0a794ea7d7379d53a\r\nContent-Type: application/json\r\n\r\n{}\r\n--6067d4698f8d40a0a794ea7d7379d53a\r\nContent-Type: image/png\r\nContent-Disposition: inline; filename=test_img\r\n\r\n"
+        request1.write(img_data)
+        request1.write(SMALL_PNG)
+        request1.write(b"\r\n--6067d4698f8d40a0a794ea7d7379d53a--\r\n\r\n")
+        request1.finish()
+
+        self.pump(0.1)
+
+        self.assertEqual(channel1.code, 200, channel1.result["body"])
+        self.assertEqual(channel1.result["body"], SMALL_PNG)
+
+        request2.setResponseCode(200)
+        request2.responseHeaders.setRawHeaders(
+            b"Content-Type",
+            ["multipart/mixed; boundary=6067d4698f8d40a0a794ea7d7379d53a"],
+        )
+        request2.write(img_data)
+        request2.write(SMALL_PNG)
+        request2.write(b"\r\n--6067d4698f8d40a0a794ea7d7379d53a--\r\n\r\n")
+        request2.finish()
+
+        self.pump(0.1)
+
+        self.assertEqual(channel2.code, 200, channel2.result["body"])
+        self.assertEqual(channel2.result["body"], SMALL_PNG)
+
+        # We expect only three new thumbnails to have been persisted.
+        self.assertEqual(start_count + 3, self._count_remote_thumbnails())
+
+    def _count_remote_media(self) -> int:
+        """Count the number of files in our remote media directory."""
+        path = os.path.join(
+            self.hs.get_media_repository().primary_base_path, "remote_content"
+        )
+        return sum(len(files) for _, _, files in os.walk(path))
+
+    def _count_remote_thumbnails(self) -> int:
+        """Count the number of files in our remote thumbnails directory."""
+        path = os.path.join(
+            self.hs.get_media_repository().primary_base_path, "remote_thumbnail"
+        )
+        return sum(len(files) for _, _, files in os.walk(path))
+
+
 def _log_request(request: Request) -> None:
     """Implements Factory.log, which is expected by Request.finish"""
     logger.info("Completed request %s", request)
diff --git a/tests/rest/client/test_media.py b/tests/rest/client/test_media.py
index be4a289ec1..6b5af2dbb6 100644
--- a/tests/rest/client/test_media.py
+++ b/tests/rest/client/test_media.py
@@ -19,31 +19,54 @@
 #
 #
 import base64
+import io
 import json
 import os
 import re
-from typing import Any, Dict, Optional, Sequence, Tuple, Type
+from typing import Any, BinaryIO, ClassVar, Dict, List, Optional, Sequence, Tuple, Type
+from unittest.mock import MagicMock, Mock, patch
+from urllib import parse
 from urllib.parse import quote, urlencode
 
+from parameterized import parameterized_class
+
+from twisted.internet import defer
 from twisted.internet._resolver import HostResolution
 from twisted.internet.address import IPv4Address, IPv6Address
+from twisted.internet.defer import Deferred
 from twisted.internet.error import DNSLookupError
 from twisted.internet.interfaces import IAddress, IResolutionReceiver
+from twisted.python.failure import Failure
 from twisted.test.proto_helpers import AccumulatingProtocol, MemoryReactor
+from twisted.web.http_headers import Headers
+from twisted.web.iweb import UNKNOWN_LENGTH, IResponse
 from twisted.web.resource import Resource
 
+from synapse.api.errors import HttpResponseException
+from synapse.api.ratelimiting import Ratelimiter
 from synapse.config.oembed import OEmbedEndpointConfig
+from synapse.http.client import MultipartResponse
+from synapse.http.types import QueryParams
+from synapse.logging.context import make_deferred_yieldable
 from synapse.media._base import FileInfo
 from synapse.media.url_previewer import IMAGE_CACHE_EXPIRY_MS
 from synapse.rest import admin
 from synapse.rest.client import login, media
 from synapse.server import HomeServer
-from synapse.types import JsonDict
+from synapse.types import JsonDict, UserID
 from synapse.util import Clock
 from synapse.util.stringutils import parse_and_validate_mxc_uri
 
 from tests import unittest
-from tests.server import FakeTransport, ThreadedMemoryReactorClock
+from tests.media.test_media_storage import (
+    SVG,
+    TestImage,
+    empty_file,
+    small_lossless_webp,
+    small_png,
+    small_png_with_transparency,
+)
+from tests.server import FakeChannel, FakeTransport, ThreadedMemoryReactorClock
 from tests.test_utils import SMALL_PNG
 from tests.unittest import override_config
 
@@ -1607,3 +1630,583 @@ class UnstableMediaConfigTest(unittest.HomeserverTestCase):
         self.assertEqual(
             channel.json_body["m.upload.size"], self.hs.config.media.max_upload_size
         )
+
+
+class RemoteDownloadLimiterTestCase(unittest.HomeserverTestCase):
+    servlets = [
+        media.register_servlets,
+        login.register_servlets,
+        admin.register_servlets,
+    ]
+
+    def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
+        config = self.default_config()
+
+        self.storage_path = self.mktemp()
+        self.media_store_path = self.mktemp()
+        os.mkdir(self.storage_path)
+        os.mkdir(self.media_store_path)
+        config["media_store_path"] = self.media_store_path
+
+        provider_config = {
+            "module": "synapse.media.storage_provider.FileStorageProviderBackend",
+            "store_local": True,
+            "store_synchronous": False,
+            "store_remote": True,
+            "config": {"directory": self.storage_path},
+        }
+
+        config["media_storage_providers"] = [provider_config]
+
+        return self.setup_test_homeserver(config=config)
+
+    def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
+        self.repo = hs.get_media_repository()
+        self.client = hs.get_federation_http_client()
+        self.store = hs.get_datastores().main
+        self.user = self.register_user("user", "pass")
+        self.tok = self.login("user", "pass")
+
+    # mock actually reading file body
+    def read_multipart_response_30MiB(*args: Any, **kwargs: Any) -> Deferred:
+        d: Deferred = defer.Deferred()
+        d.callback(MultipartResponse(b"{}", 31457280, b"img/png", None))
+        return d
+
+    def read_multipart_response_50MiB(*args: Any, **kwargs: Any) -> Deferred:
+        d: Deferred = defer.Deferred()
+        d.callback(MultipartResponse(b"{}", 31457280, b"img/png", None))
+        return d
+
+    @patch(
+        "synapse.http.matrixfederationclient.read_multipart_response",
+        read_multipart_response_30MiB,
+    )
+    def test_download_ratelimit_default(self) -> None:
+        """
+        Test remote media download ratelimiting against default configuration - 500MB bucket
+        and 87kb/second drain rate
+        """
+
+        # mock out actually sending the request, returns a 30MiB response
+        async def _send_request(*args: Any, **kwargs: Any) -> IResponse:
+            resp = MagicMock(spec=IResponse)
+            resp.code = 200
+            resp.length = 31457280
+            resp.headers = Headers(
+                {"Content-Type": ["multipart/mixed; boundary=gc0p4Jq0M2Yt08jU534c0p"]}
+            )
+            resp.phrase = b"OK"
+            return resp
+
+        self.client._send_request = _send_request  # type: ignore
+
+        # first request should go through
+        channel = self.make_request(
+            "GET",
+            "/_matrix/client/v1/media/download/remote.org/abc",
+            shorthand=False,
+            access_token=self.tok,
+        )
+        assert channel.code == 200
+
+        # next 15 should go through
+        for i in range(15):
+            channel2 = self.make_request(
+                "GET",
+                f"/_matrix/client/v1/media/download/remote.org/abc{i}",
+                shorthand=False,
+                access_token=self.tok,
+            )
+            assert channel2.code == 200
+
+        # 17th will hit ratelimit
+        channel3 = self.make_request(
+            "GET",
+            "/_matrix/client/v1/media/download/remote.org/abcd",
+            shorthand=False,
+            access_token=self.tok,
+        )
+        assert channel3.code == 429
+
+        # however, a request from a different IP will go through
+        channel4 = self.make_request(
+            "GET",
+            "/_matrix/client/v1/media/download/remote.org/abcde",
+            shorthand=False,
+            client_ip="187.233.230.159",
+            access_token=self.tok,
+        )
+        assert channel4.code == 200
+
+        # at 87Kib/s it should take about 2 minutes for enough to drain from bucket that another
+        # 30MiB download is authorized - The last download was blocked at 503,316,480.
+        # The next download will be authorized when bucket hits 492,830,720
+        # (524,288,000 total capacity - 31,457,280 download size) so 503,316,480 - 492,830,720 ~= 10,485,760
+        # needs to drain before another download will be authorized, that will take ~=
+        # 2 minutes (10,485,760/89,088/60)
+        self.reactor.pump([2.0 * 60.0])
+
+        # enough has drained and next request goes through
+        channel5 = self.make_request(
+            "GET",
+            "/_matrix/client/v1/media/download/remote.org/abcdef",
+            shorthand=False,
+            access_token=self.tok,
+        )
+        assert channel5.code == 200
+
+    @override_config(
+        {
+            "remote_media_download_per_second": "50M",
+            "remote_media_download_burst_count": "50M",
+        }
+    )
+    @patch(
+        "synapse.http.matrixfederationclient.read_multipart_response",
+        read_multipart_response_50MiB,
+    )
+    def test_download_rate_limit_config(self) -> None:
+        """
+        Test that download rate limit config options are correctly picked up and applied
+        """
+
+        async def _send_request(*args: Any, **kwargs: Any) -> IResponse:
+            resp = MagicMock(spec=IResponse)
+            resp.code = 200
+            resp.length = 52428800
+            resp.headers = Headers(
+                {"Content-Type": ["multipart/mixed; boundary=gc0p4Jq0M2Yt08jU534c0p"]}
+            )
+            resp.phrase = b"OK"
+            return resp
+
+        self.client._send_request = _send_request  # type: ignore
+
+        # first request should go through
+        channel = self.make_request(
+            "GET",
+            "/_matrix/client/v1/media/download/remote.org/abc",
+            shorthand=False,
+            access_token=self.tok,
+        )
+        assert channel.code == 200
+
+        # immediate second request should fail
+        channel = self.make_request(
+            "GET",
+            "/_matrix/client/v1/media/download/remote.org/abcd",
+            shorthand=False,
+            access_token=self.tok,
+        )
+        assert channel.code == 429
+
+        # advance half a second
+        self.reactor.pump([0.5])
+
+        # request still fails
+        channel = self.make_request(
+            "GET",
+            "/_matrix/client/v1/media/download/remote.org/abcde",
+            shorthand=False,
+            access_token=self.tok,
+        )
+        assert channel.code == 429
+
+        # advance another half second
+        self.reactor.pump([0.5])
+
+        # enough has drained from bucket and request is successful
+        channel = self.make_request(
+            "GET",
+            "/_matrix/client/v1/media/download/remote.org/abcdef",
+            shorthand=False,
+            access_token=self.tok,
+        )
+        assert channel.code == 200
+
+    @patch(
+        "synapse.http.matrixfederationclient.read_multipart_response",
+        read_multipart_response_30MiB,
+    )
+    def test_download_ratelimit_max_size_sub(self) -> None:
+        """
+        Test that if no content-length is provided, the default max size is applied instead
+        """
+
+        # mock out actually sending the request
+        async def _send_request(*args: Any, **kwargs: Any) -> IResponse:
+            resp = MagicMock(spec=IResponse)
+            resp.code = 200
+            resp.length = UNKNOWN_LENGTH
+            resp.headers = Headers(
+                {"Content-Type": ["multipart/mixed; boundary=gc0p4Jq0M2Yt08jU534c0p"]}
+            )
+            resp.phrase = b"OK"
+            return resp
+
+        self.client._send_request = _send_request  # type: ignore
+
+        # ten requests should go through using the max size (500MB/50MB)
+        for i in range(10):
+            channel2 = self.make_request(
+                "GET",
+                f"/_matrix/client/v1/media/download/remote.org/abc{i}",
+                shorthand=False,
+                access_token=self.tok,
+            )
+            assert channel2.code == 200
+
+        # eleventh will hit ratelimit
+        channel3 = self.make_request(
+            "GET",
+            "/_matrix/client/v1/media/download/remote.org/abcd",
+            shorthand=False,
+            access_token=self.tok,
+        )
+        assert channel3.code == 429
+
+    def test_file_download(self) -> None:
+        content = io.BytesIO(b"file_to_stream")
+        content_uri = self.get_success(
+            self.repo.create_content(
+                "text/plain",
+                "test_upload",
+                content,
+                46,
+                UserID.from_string("@user_id:whatever.org"),
+            )
+        )
+        # test with a text file
+        channel = self.make_request(
+            "GET",
+            f"/_matrix/client/v1/media/download/test/{content_uri.media_id}",
+            shorthand=False,
+            access_token=self.tok,
+        )
+        self.pump()
+        self.assertEqual(200, channel.code)
+
+
+test_images = [
+    small_png,
+    small_png_with_transparency,
+    small_lossless_webp,
+    empty_file,
+    SVG,
+]
+input_values = [(x,) for x in test_images]
+
+
+@parameterized_class(("test_image",), input_values)
+class DownloadTestCase(unittest.HomeserverTestCase):
+    test_image: ClassVar[TestImage]
+    servlets = [
+        media.register_servlets,
+        login.register_servlets,
+        admin.register_servlets,
+    ]
+
+    def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
+        self.fetches: List[
+            Tuple[
+                "Deferred[Any]",
+                str,
+                str,
+                Optional[QueryParams],
+            ]
+        ] = []
+
+        def federation_get_file(
+            destination: str,
+            path: str,
+            output_stream: BinaryIO,
+            download_ratelimiter: Ratelimiter,
+            ip_address: Any,
+            max_size: int,
+            args: Optional[QueryParams] = None,
+            retry_on_dns_fail: bool = True,
+            ignore_backoff: bool = False,
+            follow_redirects: bool = False,
+        ) -> "Deferred[Tuple[int, Dict[bytes, List[bytes]], bytes]]":
+            """A mock for MatrixFederationHttpClient.federation_get_file."""
+
+            def write_to(
+                r: Tuple[bytes, Tuple[int, Dict[bytes, List[bytes]], bytes]]
+            ) -> Tuple[int, Dict[bytes, List[bytes]], bytes]:
+                data, response = r
+                output_stream.write(data)
+                return response
+
+            def write_err(f: Failure) -> Failure:
+                f.trap(HttpResponseException)
+                output_stream.write(f.value.response)
+                return f
+
+            d: Deferred[Tuple[bytes, Tuple[int, Dict[bytes, List[bytes]], bytes]]] = (
+                Deferred()
+            )
+            self.fetches.append((d, destination, path, args))
+            # Note that this callback changes the value held by d.
+            d_after_callback = d.addCallbacks(write_to, write_err)
+            return make_deferred_yieldable(d_after_callback)
+
+        def get_file(
+            destination: str,
+            path: str,
+            output_stream: BinaryIO,
+            download_ratelimiter: Ratelimiter,
+            ip_address: Any,
+            max_size: int,
+            args: Optional[QueryParams] = None,
+            retry_on_dns_fail: bool = True,
+            ignore_backoff: bool = False,
+            follow_redirects: bool = False,
+        ) -> "Deferred[Tuple[int, Dict[bytes, List[bytes]]]]":
+            """A mock for MatrixFederationHttpClient.get_file."""
+
+            def write_to(
+                r: Tuple[bytes, Tuple[int, Dict[bytes, List[bytes]]]]
+            ) -> Tuple[int, Dict[bytes, List[bytes]]]:
+                data, response = r
+                output_stream.write(data)
+                return response
+
+            def write_err(f: Failure) -> Failure:
+                f.trap(HttpResponseException)
+                output_stream.write(f.value.response)
+                return f
+
+            d: Deferred[Tuple[bytes, Tuple[int, Dict[bytes, List[bytes]]]]] = Deferred()
+            self.fetches.append((d, destination, path, args))
+            # Note that this callback changes the value held by d.
+            d_after_callback = d.addCallbacks(write_to, write_err)
+            return make_deferred_yieldable(d_after_callback)
+
+        # Mock out the homeserver's MatrixFederationHttpClient
+        client = Mock()
+        client.federation_get_file = federation_get_file
+        client.get_file = get_file
+
+        self.storage_path = self.mktemp()
+        self.media_store_path = self.mktemp()
+        os.mkdir(self.storage_path)
+        os.mkdir(self.media_store_path)
+
+        config = self.default_config()
+        config["media_store_path"] = self.media_store_path
+        config["max_image_pixels"] = 2000000
+
+        provider_config = {
+            "module": "synapse.media.storage_provider.FileStorageProviderBackend",
+            "store_local": True,
+            "store_synchronous": False,
+            "store_remote": True,
+            "config": {"directory": self.storage_path},
+        }
+        config["media_storage_providers"] = [provider_config]
+        config["experimental_features"] = {"msc3916_authenticated_media_enabled": True}
+
+        hs = self.setup_test_homeserver(config=config, federation_http_client=client)
+
+        return hs
+
+    def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
+        self.store = hs.get_datastores().main
+        self.media_repo = hs.get_media_repository()
+
+        self.remote = "example.com"
+        self.media_id = "12345"
+
+        self.user = self.register_user("user", "pass")
+        self.tok = self.login("user", "pass")
+
+    def _req(
+        self, content_disposition: Optional[bytes], include_content_type: bool = True
+    ) -> FakeChannel:
+        channel = self.make_request(
+            "GET",
+            f"/_matrix/client/v1/media/download/{self.remote}/{self.media_id}",
+            shorthand=False,
+            await_result=False,
+            access_token=self.tok,
+        )
+        self.pump()
+
+        # We've made one fetch, to example.com, using the federation media URL
+        self.assertEqual(len(self.fetches), 1)
+        self.assertEqual(self.fetches[0][1], "example.com")
+        self.assertEqual(
+            self.fetches[0][2], "/_matrix/federation/v1/media/download/" + self.media_id
+        )
+        self.assertEqual(
+            self.fetches[0][3],
+            {"timeout_ms": "20000"},
+        )
+
+        headers = {
+            b"Content-Length": [b"%d" % (len(self.test_image.data))],
+        }
+
+        if include_content_type:
+            headers[b"Content-Type"] = [self.test_image.content_type]
+
+        if content_disposition:
+            headers[b"Content-Disposition"] = [content_disposition]
+
+        self.fetches[0][0].callback(
+            (self.test_image.data, (len(self.test_image.data), headers, b"{}"))
+        )
+
+        self.pump()
+        self.assertEqual(channel.code, 200)
+
+        return channel
+
+    def test_handle_missing_content_type(self) -> None:
+        channel = self._req(
+            b"attachment; filename=out" + self.test_image.extension,
+            include_content_type=False,
+        )
+        headers = channel.headers
+        self.assertEqual(channel.code, 200)
+        self.assertEqual(
+            headers.getRawHeaders(b"Content-Type"), [b"application/octet-stream"]
+        )
+
+    def test_disposition_filename_ascii(self) -> None:
+        """
+        If the filename is filename=<ascii> then Synapse will decode it as an
+        ASCII string, and use filename= in the response.
+        """
+        channel = self._req(b"attachment; filename=out" + self.test_image.extension)
+
+        headers = channel.headers
+        self.assertEqual(
+            headers.getRawHeaders(b"Content-Type"), [self.test_image.content_type]
+        )
+        self.assertEqual(
+            headers.getRawHeaders(b"Content-Disposition"),
+            [
+                (b"inline" if self.test_image.is_inline else b"attachment")
+                + b"; filename=out"
+                + self.test_image.extension
+            ],
+        )
+
+    def test_disposition_filenamestar_utf8escaped(self) -> None:
+        """
+        If the filename is filename=*utf8''<utf8 escaped> then Synapse will
+        correctly decode it as the UTF-8 string, and use filename* in the
+        response.
+        """
+        filename = parse.quote("\u2603".encode()).encode("ascii")
+        channel = self._req(
+            b"attachment; filename*=utf-8''" + filename + self.test_image.extension
+        )
+
+        headers = channel.headers
+        self.assertEqual(
+            headers.getRawHeaders(b"Content-Type"), [self.test_image.content_type]
+        )
+        self.assertEqual(
+            headers.getRawHeaders(b"Content-Disposition"),
+            [
+                (b"inline" if self.test_image.is_inline else b"attachment")
+                + b"; filename*=utf-8''"
+                + filename
+                + self.test_image.extension
+            ],
+        )
+
+    def test_disposition_none(self) -> None:
+        """
+        If there is no filename, Content-Disposition should only
+        be a disposition type.
+        """
+        channel = self._req(None)
+
+        headers = channel.headers
+        self.assertEqual(
+            headers.getRawHeaders(b"Content-Type"), [self.test_image.content_type]
+        )
+        self.assertEqual(
+            headers.getRawHeaders(b"Content-Disposition"),
+            [b"inline" if self.test_image.is_inline else b"attachment"],
+        )
+
+    def test_x_robots_tag_header(self) -> None:
+        """
+        Tests that the `X-Robots-Tag` header is present, which informs web crawlers
+        to not index, archive, or follow links in media.
+        """
+        channel = self._req(b"attachment; filename=out" + self.test_image.extension)
+
+        headers = channel.headers
+        self.assertEqual(
+            headers.getRawHeaders(b"X-Robots-Tag"),
+            [b"noindex, nofollow, noarchive, noimageindex"],
+        )
+
+    def test_cross_origin_resource_policy_header(self) -> None:
+        """
+        Test that the Cross-Origin-Resource-Policy header is set to "cross-origin"
+        allowing web clients to embed media from the downloads API.
+        """
+        channel = self._req(b"attachment; filename=out" + self.test_image.extension)
+
+        headers = channel.headers
+
+        self.assertEqual(
+            headers.getRawHeaders(b"Cross-Origin-Resource-Policy"),
+            [b"cross-origin"],
+        )
+
+    def test_unknown_federation_endpoint(self) -> None:
+        """
+        Test that if the downloadd request to remote federation endpoint returns a 404
+        we fall back to the _matrix/media endpoint
+        """
+        channel = self.make_request(
+            "GET",
+            f"/_matrix/client/v1/media/download/{self.remote}/{self.media_id}",
+            shorthand=False,
+            await_result=False,
+            access_token=self.tok,
+        )
+        self.pump()
+
+        # We've made one fetch, to example.com, using the media URL, and asking
+        # the other server not to do a remote fetch
+        self.assertEqual(len(self.fetches), 1)
+        self.assertEqual(self.fetches[0][1], "example.com")
+        self.assertEqual(
+            self.fetches[0][2], f"/_matrix/federation/v1/media/download/{self.media_id}"
+        )
+
+        # The result which says the endpoint is unknown.
+        unknown_endpoint = b'{"errcode":"M_UNRECOGNIZED","error":"Unknown request"}'
+        self.fetches[0][0].errback(
+            HttpResponseException(404, "NOT FOUND", unknown_endpoint)
+        )
+
+        self.pump()
+
+        # There should now be another request to the _matrix/media/v3/download URL.
+        self.assertEqual(len(self.fetches), 2)
+        self.assertEqual(self.fetches[1][1], "example.com")
+        self.assertEqual(
+            self.fetches[1][2],
+            f"/_matrix/media/v3/download/example.com/{self.media_id}",
+        )
+
+        headers = {
+            b"Content-Length": [b"%d" % (len(self.test_image.data))],
+        }
+
+        self.fetches[1][0].callback(
+            (self.test_image.data, (len(self.test_image.data), headers))
+        )
+
+        self.pump()
+        self.assertEqual(channel.code, 200)