summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/6681.feature1
-rw-r--r--docs/admin_api/media_admin_api.md76
-rw-r--r--docs/workers.md4
-rw-r--r--synapse/rest/admin/media.py68
-rw-r--r--synapse/storage/data_stores/main/room.py116
-rw-r--r--tests/rest/admin/test_admin.py341
-rw-r--r--tests/rest/client/v1/utils.py37
7 files changed, 632 insertions, 11 deletions
diff --git a/changelog.d/6681.feature b/changelog.d/6681.feature
new file mode 100644
index 0000000000..5cf19a4e0e
--- /dev/null
+++ b/changelog.d/6681.feature
@@ -0,0 +1 @@
+Add new quarantine media admin APIs to quarantine by media ID or by user who uploaded the media.
diff --git a/docs/admin_api/media_admin_api.md b/docs/admin_api/media_admin_api.md
index 8b3666d5f5..46ba7a1a71 100644
--- a/docs/admin_api/media_admin_api.md
+++ b/docs/admin_api/media_admin_api.md
@@ -22,19 +22,81 @@ It returns a JSON body like the following:
 }
 ```
 
-# Quarantine media in a room
+# Quarantine media
 
-This API 'quarantines' all the media in a room.
+Quarantining media means that it is marked as inaccessible by users. It applies
+to any local media, and any locally-cached copies of remote media.
 
-The API is:
+The media file itself (and any thumbnails) is not deleted from the server.
+
+## Quarantining media by ID
+
+This API quarantines a single piece of local or remote media.
+
+Request:
 
 ```
-POST /_synapse/admin/v1/quarantine_media/<room_id>
+POST /_synapse/admin/v1/media/quarantine/<server_name>/<media_id>
 
 {}
 ```
 
-Quarantining media means that it is marked as inaccessible by users. It applies
-to any local media, and any locally-cached copies of remote media.
+Where `server_name` is in the form of `example.org`, and `media_id` is in the
+form of `abcdefg12345...`.
+
+Response:
+
+```
+{}
+```
+
+## Quarantining media in a room
+
+This API quarantines all local and remote media in a room.
+
+Request:
+
+```
+POST /_synapse/admin/v1/room/<room_id>/media/quarantine
+
+{}
+```
+
+Where `room_id` is in the form of `!roomid12345:example.org`.
+
+Response:
+
+```
+{
+  "num_quarantined": 10  # The number of media items successfully quarantined
+}
+```
+
+Note that there is a legacy endpoint, `POST
+/_synapse/admin/v1/quarantine_media/<room_id >`, that operates the same.
+However, it is deprecated and may be removed in a future release.
+
+## Quarantining all media of a user
+
+This API quarantines all *local* media that a *local* user has uploaded. That is to say, if
+you would like to quarantine media uploaded by a user on a remote homeserver, you should
+instead use one of the other APIs.
+
+Request:
+
+```
+POST /_synapse/admin/v1/user/<user_id>/media/quarantine
+
+{}
+```
+
+Where `user_id` is in the form of `@bob:example.org`.
+
+Response:
+
+```
+{
+  "num_quarantined": 10  # The number of media items successfully quarantined
+}
+```
 
-The media file itself (and any thumbnails) is not deleted from the server.
diff --git a/docs/workers.md b/docs/workers.md
index f4283aeb05..0ab269fd96 100644
--- a/docs/workers.md
+++ b/docs/workers.md
@@ -202,7 +202,9 @@ Handles the media repository. It can handle all endpoints starting with:
 ... and the following regular expressions matching media-specific administration APIs:
 
     ^/_synapse/admin/v1/purge_media_cache$
-    ^/_synapse/admin/v1/room/.*/media$
+    ^/_synapse/admin/v1/room/.*/media.*$
+    ^/_synapse/admin/v1/user/.*/media.*$
+    ^/_synapse/admin/v1/media/.*$
     ^/_synapse/admin/v1/quarantine_media/.*$
 
 You should also set `enable_media_repo: False` in the shared configuration
diff --git a/synapse/rest/admin/media.py b/synapse/rest/admin/media.py
index fa833e54cf..3a445d6eed 100644
--- a/synapse/rest/admin/media.py
+++ b/synapse/rest/admin/media.py
@@ -32,16 +32,24 @@ class QuarantineMediaInRoom(RestServlet):
     this server.
     """
 
-    PATTERNS = historical_admin_path_patterns("/quarantine_media/(?P<room_id>[^/]+)")
+    PATTERNS = (
+        historical_admin_path_patterns("/room/(?P<room_id>[^/]+)/media/quarantine")
+        +
+        # This path kept around for legacy reasons
+        historical_admin_path_patterns("/quarantine_media/(?P<room_id>![^/]+)")
+    )
 
     def __init__(self, hs):
         self.store = hs.get_datastore()
         self.auth = hs.get_auth()
 
-    async def on_POST(self, request, room_id):
+    async def on_POST(self, request, room_id: str):
         requester = await self.auth.get_user_by_req(request)
         await assert_user_is_admin(self.auth, requester.user)
 
+        logging.info("Quarantining room: %s", room_id)
+
+        # Quarantine all media in this room
         num_quarantined = await self.store.quarantine_media_ids_in_room(
             room_id, requester.user.to_string()
         )
@@ -49,6 +57,60 @@ class QuarantineMediaInRoom(RestServlet):
         return 200, {"num_quarantined": num_quarantined}
 
 
+class QuarantineMediaByUser(RestServlet):
+    """Quarantines all local media by a given user so that no one can download it via
+    this server.
+    """
+
+    PATTERNS = historical_admin_path_patterns(
+        "/user/(?P<user_id>[^/]+)/media/quarantine"
+    )
+
+    def __init__(self, hs):
+        self.store = hs.get_datastore()
+        self.auth = hs.get_auth()
+
+    async def on_POST(self, request, user_id: str):
+        requester = await self.auth.get_user_by_req(request)
+        await assert_user_is_admin(self.auth, requester.user)
+
+        logging.info("Quarantining local media by user: %s", user_id)
+
+        # Quarantine all media this user has uploaded
+        num_quarantined = await self.store.quarantine_media_ids_by_user(
+            user_id, requester.user.to_string()
+        )
+
+        return 200, {"num_quarantined": num_quarantined}
+
+
+class QuarantineMediaByID(RestServlet):
+    """Quarantines local or remote media by a given ID so that no one can download
+    it via this server.
+    """
+
+    PATTERNS = historical_admin_path_patterns(
+        "/media/quarantine/(?P<server_name>[^/]+)/(?P<media_id>[^/]+)"
+    )
+
+    def __init__(self, hs):
+        self.store = hs.get_datastore()
+        self.auth = hs.get_auth()
+
+    async def on_POST(self, request, server_name: str, media_id: str):
+        requester = await self.auth.get_user_by_req(request)
+        await assert_user_is_admin(self.auth, requester.user)
+
+        logging.info("Quarantining local media by ID: %s/%s", server_name, media_id)
+
+        # Quarantine this media id
+        await self.store.quarantine_media_by_id(
+            server_name, media_id, requester.user.to_string()
+        )
+
+        return 200, {}
+
+
 class ListMediaInRoom(RestServlet):
     """Lists all of the media in a given room.
     """
@@ -94,4 +156,6 @@ def register_servlets_for_media_repo(hs, http_server):
     """
     PurgeMediaCacheRestServlet(hs).register(http_server)
     QuarantineMediaInRoom(hs).register(http_server)
+    QuarantineMediaByID(hs).register(http_server)
+    QuarantineMediaByUser(hs).register(http_server)
     ListMediaInRoom(hs).register(http_server)
diff --git a/synapse/storage/data_stores/main/room.py b/synapse/storage/data_stores/main/room.py
index 8636d75030..49bab62be3 100644
--- a/synapse/storage/data_stores/main/room.py
+++ b/synapse/storage/data_stores/main/room.py
@@ -18,7 +18,7 @@ import collections
 import logging
 import re
 from abc import abstractmethod
-from typing import Optional, Tuple
+from typing import List, Optional, Tuple
 
 from six import integer_types
 
@@ -399,6 +399,8 @@ class RoomWorkerStore(SQLBaseStore):
         the associated media
         """
 
+        logger.info("Quarantining media in room: %s", room_id)
+
         def _quarantine_media_in_room_txn(txn):
             local_mxcs, remote_mxcs = self._get_media_mxcs_in_room_txn(txn, room_id)
             total_media_quarantined = 0
@@ -494,6 +496,118 @@ class RoomWorkerStore(SQLBaseStore):
 
         return local_media_mxcs, remote_media_mxcs
 
+    def quarantine_media_by_id(
+        self, server_name: str, media_id: str, quarantined_by: str,
+    ):
+        """quarantines a single local or remote media id
+
+        Args:
+            server_name: The name of the server that holds this media
+            media_id: The ID of the media to be quarantined
+            quarantined_by: The user ID that initiated the quarantine request
+        """
+        logger.info("Quarantining media: %s/%s", server_name, media_id)
+        is_local = server_name == self.config.server_name
+
+        def _quarantine_media_by_id_txn(txn):
+            local_mxcs = [media_id] if is_local else []
+            remote_mxcs = [(server_name, media_id)] if not is_local else []
+
+            return self._quarantine_media_txn(
+                txn, local_mxcs, remote_mxcs, quarantined_by
+            )
+
+        return self.db.runInteraction(
+            "quarantine_media_by_user", _quarantine_media_by_id_txn
+        )
+
+    def quarantine_media_ids_by_user(self, user_id: str, quarantined_by: str):
+        """quarantines all local media associated with a single user
+
+        Args:
+            user_id: The ID of the user to quarantine media of
+            quarantined_by: The ID of the user who made the quarantine request
+        """
+
+        def _quarantine_media_by_user_txn(txn):
+            local_media_ids = self._get_media_ids_by_user_txn(txn, user_id)
+            return self._quarantine_media_txn(txn, local_media_ids, [], quarantined_by)
+
+        return self.db.runInteraction(
+            "quarantine_media_by_user", _quarantine_media_by_user_txn
+        )
+
+    def _get_media_ids_by_user_txn(self, txn, user_id: str, filter_quarantined=True):
+        """Retrieves local media IDs by a given user
+
+        Args:
+            txn (cursor)
+            user_id: The ID of the user to retrieve media IDs of
+
+        Returns:
+            The local and remote media as a lists of tuples where the key is
+            the hostname and the value is the media ID.
+        """
+        # Local media
+        sql = """
+            SELECT media_id
+            FROM local_media_repository
+            WHERE user_id = ?
+            """
+        if filter_quarantined:
+            sql += "AND quarantined_by IS NULL"
+        txn.execute(sql, (user_id,))
+
+        local_media_ids = [row[0] for row in txn]
+
+        # TODO: Figure out all remote media a user has referenced in a message
+
+        return local_media_ids
+
+    def _quarantine_media_txn(
+        self,
+        txn,
+        local_mxcs: List[str],
+        remote_mxcs: List[Tuple[str, str]],
+        quarantined_by: str,
+    ) -> int:
+        """Quarantine local and remote media items
+
+        Args:
+            txn (cursor)
+            local_mxcs: A list of local mxc URLs
+            remote_mxcs: A list of (remote server, media id) tuples representing
+                remote mxc URLs
+            quarantined_by: The ID of the user who initiated the quarantine request
+        Returns:
+            The total number of media items quarantined
+        """
+        total_media_quarantined = 0
+
+        # Update all the tables to set the quarantined_by flag
+        txn.executemany(
+            """
+            UPDATE local_media_repository
+            SET quarantined_by = ?
+            WHERE media_id = ?
+        """,
+            ((quarantined_by, media_id) for media_id in local_mxcs),
+        )
+
+        txn.executemany(
+            """
+                UPDATE remote_media_cache
+                SET quarantined_by = ?
+                WHERE media_origin = ? AND media_id = ?
+            """,
+            ((quarantined_by, origin, media_id) for origin, media_id in remote_mxcs),
+        )
+
+        total_media_quarantined += len(local_mxcs)
+        total_media_quarantined += len(remote_mxcs)
+
+        return total_media_quarantined
+
 
 class RoomBackgroundUpdateStore(SQLBaseStore):
     REMOVE_TOMESTONED_ROOMS_BG_UPDATE = "remove_tombstoned_rooms_from_directory"
diff --git a/tests/rest/admin/test_admin.py b/tests/rest/admin/test_admin.py
index 6ceb483aa8..7a7e898843 100644
--- a/tests/rest/admin/test_admin.py
+++ b/tests/rest/admin/test_admin.py
@@ -14,11 +14,17 @@
 # limitations under the License.
 
 import json
+import os
+import urllib.parse
+from binascii import unhexlify
 
 from mock import Mock
 
+from twisted.internet.defer import Deferred
+
 import synapse.rest.admin
 from synapse.http.server import JsonResource
+from synapse.logging.context import make_deferred_yieldable
 from synapse.rest.admin import VersionServlet
 from synapse.rest.client.v1 import events, login, room
 from synapse.rest.client.v2_alpha import groups
@@ -346,3 +352,338 @@ class PurgeRoomTestCase(unittest.HomeserverTestCase):
             self.assertEqual(count, 0, msg="Rows not purged in {}".format(table))
 
     test_purge_room.skip = "Disabled because it's currently broken"
+
+
+class QuarantineMediaTestCase(unittest.HomeserverTestCase):
+    """Test /quarantine_media admin API.
+    """
+
+    servlets = [
+        synapse.rest.admin.register_servlets,
+        synapse.rest.admin.register_servlets_for_media_repo,
+        login.register_servlets,
+        room.register_servlets,
+    ]
+
+    def prepare(self, reactor, clock, hs):
+        self.store = hs.get_datastore()
+        self.hs = hs
+
+        # Allow for uploading and downloading to/from the media repo
+        self.media_repo = hs.get_media_repository_resource()
+        self.download_resource = self.media_repo.children[b"download"]
+        self.upload_resource = self.media_repo.children[b"upload"]
+        self.image_data = unhexlify(
+            b"89504e470d0a1a0a0000000d4948445200000001000000010806"
+            b"0000001f15c4890000000a49444154789c63000100000500010d"
+            b"0a2db40000000049454e44ae426082"
+        )
+
+    def make_homeserver(self, reactor, clock):
+
+        self.fetches = []
+
+        def get_file(destination, path, output_stream, args=None, max_size=None):
+            """
+            Returns tuple[int,dict,str,int] of file length, response headers,
+            absolute URI, and response code.
+            """
+
+            def write_to(r):
+                data, response = r
+                output_stream.write(data)
+                return response
+
+            d = Deferred()
+            d.addCallback(write_to)
+            self.fetches.append((d, destination, path, args))
+            return make_deferred_yieldable(d)
+
+        client = Mock()
+        client.get_file = get_file
+
+        self.storage_path = self.mktemp()
+        self.media_store_path = self.mktemp()
+        os.mkdir(self.storage_path)
+        os.mkdir(self.media_store_path)
+
+        config = self.default_config()
+        config["media_store_path"] = self.media_store_path
+        config["thumbnail_requirements"] = {}
+        config["max_image_pixels"] = 2000000
+
+        provider_config = {
+            "module": "synapse.rest.media.v1.storage_provider.FileStorageProviderBackend",
+            "store_local": True,
+            "store_synchronous": False,
+            "store_remote": True,
+            "config": {"directory": self.storage_path},
+        }
+        config["media_storage_providers"] = [provider_config]
+
+        hs = self.setup_test_homeserver(config=config, http_client=client)
+
+        return hs
+
+    def test_quarantine_media_requires_admin(self):
+        self.register_user("nonadmin", "pass", admin=False)
+        non_admin_user_tok = self.login("nonadmin", "pass")
+
+        # Attempt quarantine media APIs as non-admin
+        url = "/_synapse/admin/v1/media/quarantine/example.org/abcde12345"
+        request, channel = self.make_request(
+            "POST", url.encode("ascii"), access_token=non_admin_user_tok,
+        )
+        self.render(request)
+
+        # Expect a forbidden error
+        self.assertEqual(
+            403,
+            int(channel.result["code"]),
+            msg="Expected forbidden on quarantining media as a non-admin",
+        )
+
+        # And the roomID/userID endpoint
+        url = "/_synapse/admin/v1/room/!room%3Aexample.com/media/quarantine"
+        request, channel = self.make_request(
+            "POST", url.encode("ascii"), access_token=non_admin_user_tok,
+        )
+        self.render(request)
+
+        # Expect a forbidden error
+        self.assertEqual(
+            403,
+            int(channel.result["code"]),
+            msg="Expected forbidden on quarantining media as a non-admin",
+        )
+
+    def test_quarantine_media_by_id(self):
+        self.register_user("id_admin", "pass", admin=True)
+        admin_user_tok = self.login("id_admin", "pass")
+
+        self.register_user("id_nonadmin", "pass", admin=False)
+        non_admin_user_tok = self.login("id_nonadmin", "pass")
+
+        # Upload some media into the room
+        response = self.helper.upload_media(
+            self.upload_resource, self.image_data, tok=admin_user_tok
+        )
+
+        # Extract media ID from the response
+        server_name_and_media_id = response["content_uri"][
+            6:
+        ]  # Cut off the 'mxc://' bit
+        server_name, media_id = server_name_and_media_id.split("/")
+
+        # Attempt to access the media
+        request, channel = self.make_request(
+            "GET",
+            server_name_and_media_id,
+            shorthand=False,
+            access_token=non_admin_user_tok,
+        )
+        request.render(self.download_resource)
+        self.pump(1.0)
+
+        # Should be successful
+        self.assertEqual(200, int(channel.code), msg=channel.result["body"])
+
+        # Quarantine the media
+        url = "/_synapse/admin/v1/media/quarantine/%s/%s" % (
+            urllib.parse.quote(server_name),
+            urllib.parse.quote(media_id),
+        )
+        request, channel = self.make_request("POST", url, access_token=admin_user_tok,)
+        self.render(request)
+        self.pump(1.0)
+        self.assertEqual(200, int(channel.code), msg=channel.result["body"])
+
+        # Attempt to access the media
+        request, channel = self.make_request(
+            "GET",
+            server_name_and_media_id,
+            shorthand=False,
+            access_token=admin_user_tok,
+        )
+        request.render(self.download_resource)
+        self.pump(1.0)
+
+        # Should be quarantined
+        self.assertEqual(
+            404,
+            int(channel.code),
+            msg=(
+                "Expected to receive a 404 on accessing quarantined media: %s"
+                % server_name_and_media_id
+            ),
+        )
+
+    def test_quarantine_all_media_in_room(self):
+        self.register_user("room_admin", "pass", admin=True)
+        admin_user_tok = self.login("room_admin", "pass")
+
+        non_admin_user = self.register_user("room_nonadmin", "pass", admin=False)
+        non_admin_user_tok = self.login("room_nonadmin", "pass")
+
+        room_id = self.helper.create_room_as(non_admin_user, tok=admin_user_tok)
+        self.helper.join(room_id, non_admin_user, tok=non_admin_user_tok)
+
+        # Upload some media
+        response_1 = self.helper.upload_media(
+            self.upload_resource, self.image_data, tok=non_admin_user_tok
+        )
+        response_2 = self.helper.upload_media(
+            self.upload_resource, self.image_data, tok=non_admin_user_tok
+        )
+
+        # Extract mxcs
+        mxc_1 = response_1["content_uri"]
+        mxc_2 = response_2["content_uri"]
+
+        # Send it into the room
+        self.helper.send_event(
+            room_id,
+            "m.room.message",
+            content={"body": "image-1", "msgtype": "m.image", "url": mxc_1},
+            txn_id="111",
+            tok=non_admin_user_tok,
+        )
+        self.helper.send_event(
+            room_id,
+            "m.room.message",
+            content={"body": "image-2", "msgtype": "m.image", "url": mxc_2},
+            txn_id="222",
+            tok=non_admin_user_tok,
+        )
+
+        # Quarantine all media in the room
+        url = "/_synapse/admin/v1/room/%s/media/quarantine" % urllib.parse.quote(
+            room_id
+        )
+        request, channel = self.make_request("POST", url, access_token=admin_user_tok,)
+        self.render(request)
+        self.pump(1.0)
+        self.assertEqual(200, int(channel.code), msg=channel.result["body"])
+        self.assertEqual(
+            json.loads(channel.result["body"].decode("utf-8")),
+            {"num_quarantined": 2},
+            "Expected 2 quarantined items",
+        )
+
+        # Convert mxc URLs to server/media_id strings
+        server_and_media_id_1 = mxc_1[6:]
+        server_and_media_id_2 = mxc_2[6:]
+
+        # Test that we cannot download any of the media anymore
+        request, channel = self.make_request(
+            "GET",
+            server_and_media_id_1,
+            shorthand=False,
+            access_token=non_admin_user_tok,
+        )
+        request.render(self.download_resource)
+        self.pump(1.0)
+
+        # Should be quarantined
+        self.assertEqual(
+            404,
+            int(channel.code),
+            msg=(
+                "Expected to receive a 404 on accessing quarantined media: %s"
+                % server_and_media_id_1
+            ),
+        )
+
+        request, channel = self.make_request(
+            "GET",
+            server_and_media_id_2,
+            shorthand=False,
+            access_token=non_admin_user_tok,
+        )
+        request.render(self.download_resource)
+        self.pump(1.0)
+
+        # Should be quarantined
+        self.assertEqual(
+            404,
+            int(channel.code),
+            msg=(
+                "Expected to receive a 404 on accessing quarantined media: %s"
+                % server_and_media_id_2
+            ),
+        )
+
+    def test_quarantine_all_media_by_user(self):
+        self.register_user("user_admin", "pass", admin=True)
+        admin_user_tok = self.login("user_admin", "pass")
+
+        non_admin_user = self.register_user("user_nonadmin", "pass", admin=False)
+        non_admin_user_tok = self.login("user_nonadmin", "pass")
+
+        # Upload some media
+        response_1 = self.helper.upload_media(
+            self.upload_resource, self.image_data, tok=non_admin_user_tok
+        )
+        response_2 = self.helper.upload_media(
+            self.upload_resource, self.image_data, tok=non_admin_user_tok
+        )
+
+        # Extract media IDs
+        server_and_media_id_1 = response_1["content_uri"][6:]
+        server_and_media_id_2 = response_2["content_uri"][6:]
+
+        # Quarantine all media by this user
+        url = "/_synapse/admin/v1/user/%s/media/quarantine" % urllib.parse.quote(
+            non_admin_user
+        )
+        request, channel = self.make_request(
+            "POST", url.encode("ascii"), access_token=admin_user_tok,
+        )
+        self.render(request)
+        self.pump(1.0)
+        self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+        self.assertEqual(
+            json.loads(channel.result["body"].decode("utf-8")),
+            {"num_quarantined": 2},
+            "Expected 2 quarantined items",
+        )
+
+        # Attempt to access each piece of media
+        request, channel = self.make_request(
+            "GET",
+            server_and_media_id_1,
+            shorthand=False,
+            access_token=non_admin_user_tok,
+        )
+        request.render(self.download_resource)
+        self.pump(1.0)
+
+        # Should be quarantined
+        self.assertEqual(
+            404,
+            int(channel.code),
+            msg=(
+                "Expected to receive a 404 on accessing quarantined media: %s"
+                % server_and_media_id_1,
+            ),
+        )
+
+        # Attempt to access each piece of media
+        request, channel = self.make_request(
+            "GET",
+            server_and_media_id_2,
+            shorthand=False,
+            access_token=non_admin_user_tok,
+        )
+        request.render(self.download_resource)
+        self.pump(1.0)
+
+        # Should be quarantined
+        self.assertEqual(
+            404,
+            int(channel.code),
+            msg=(
+                "Expected to receive a 404 on accessing quarantined media: %s"
+                % server_and_media_id_2
+            ),
+        )
diff --git a/tests/rest/client/v1/utils.py b/tests/rest/client/v1/utils.py
index e7417b3d14..873d5ef99c 100644
--- a/tests/rest/client/v1/utils.py
+++ b/tests/rest/client/v1/utils.py
@@ -21,6 +21,8 @@ import time
 
 import attr
 
+from twisted.web.resource import Resource
+
 from synapse.api.constants import Membership
 
 from tests.server import make_request, render
@@ -160,3 +162,38 @@ class RestHelper(object):
         )
 
         return channel.json_body
+
+    def upload_media(
+        self,
+        resource: Resource,
+        image_data: bytes,
+        tok: str,
+        filename: str = "test.png",
+        expect_code: int = 200,
+    ) -> dict:
+        """Upload a piece of test media to the media repo
+        Args:
+            resource: The resource that will handle the upload request
+            image_data: The image data to upload
+            tok: The user token to use during the upload
+            filename: The filename of the media to be uploaded
+            expect_code: The return code to expect from attempting to upload the media
+        """
+        image_length = len(image_data)
+        path = "/_matrix/media/r0/upload?filename=%s" % (filename,)
+        request, channel = make_request(
+            self.hs.get_reactor(), "POST", path, content=image_data, access_token=tok
+        )
+        request.requestHeaders.addRawHeader(
+            b"Content-Length", str(image_length).encode("UTF-8")
+        )
+        request.render(resource)
+        self.hs.get_reactor().pump([100])
+
+        assert channel.code == expect_code, "Expected: %d, got: %d, resp: %r" % (
+            expect_code,
+            int(channel.result["code"]),
+            channel.result["body"],
+        )
+
+        return channel.json_body