summary refs log tree commit diff
path: root/tests/rest/admin
diff options
context:
space:
mode:
Diffstat (limited to 'tests/rest/admin')
-rw-r--r--tests/rest/admin/test_user.py282
1 files changed, 282 insertions, 0 deletions
diff --git a/tests/rest/admin/test_user.py b/tests/rest/admin/test_user.py
index d4b7ae21d1..e815b92329 100644
--- a/tests/rest/admin/test_user.py
+++ b/tests/rest/admin/test_user.py
@@ -17,6 +17,7 @@ import hashlib
 import hmac
 import json
 import urllib.parse
+from binascii import unhexlify
 
 from mock import Mock
 
@@ -1115,3 +1116,284 @@ class UserMembershipRestTestCase(unittest.HomeserverTestCase):
         self.assertEqual(200, channel.code, msg=channel.json_body)
         self.assertEqual(number_rooms, channel.json_body["total"])
         self.assertEqual(number_rooms, len(channel.json_body["joined_rooms"]))
+
+
+class UserMediaRestTestCase(unittest.HomeserverTestCase):
+
+    servlets = [
+        synapse.rest.admin.register_servlets,
+        login.register_servlets,
+    ]
+
+    def prepare(self, reactor, clock, hs):
+        self.store = hs.get_datastore()
+        self.media_repo = hs.get_media_repository_resource()
+
+        self.admin_user = self.register_user("admin", "pass", admin=True)
+        self.admin_user_tok = self.login("admin", "pass")
+
+        self.other_user = self.register_user("user", "pass")
+        self.url = "/_synapse/admin/v1/users/%s/media" % urllib.parse.quote(
+            self.other_user
+        )
+
+    def test_no_auth(self):
+        """
+        Try to list media of an user without authentication.
+        """
+        request, channel = self.make_request("GET", self.url, b"{}")
+        self.render(request)
+
+        self.assertEqual(401, int(channel.result["code"]), msg=channel.result["body"])
+        self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
+
+    def test_requester_is_no_admin(self):
+        """
+        If the user is not a server admin, an error is returned.
+        """
+        other_user_token = self.login("user", "pass")
+
+        request, channel = self.make_request(
+            "GET", self.url, access_token=other_user_token,
+        )
+        self.render(request)
+
+        self.assertEqual(403, int(channel.result["code"]), msg=channel.result["body"])
+        self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
+
+    def test_user_does_not_exist(self):
+        """
+        Tests that a lookup for a user that does not exist returns a 404
+        """
+        url = "/_synapse/admin/v1/users/@unknown_person:test/media"
+        request, channel = self.make_request(
+            "GET", url, access_token=self.admin_user_tok,
+        )
+        self.render(request)
+
+        self.assertEqual(404, channel.code, msg=channel.json_body)
+        self.assertEqual(Codes.NOT_FOUND, channel.json_body["errcode"])
+
+    def test_user_is_not_local(self):
+        """
+        Tests that a lookup for a user that is not a local returns a 400
+        """
+        url = "/_synapse/admin/v1/users/@unknown_person:unknown_domain/media"
+
+        request, channel = self.make_request(
+            "GET", url, access_token=self.admin_user_tok,
+        )
+        self.render(request)
+
+        self.assertEqual(400, channel.code, msg=channel.json_body)
+        self.assertEqual("Can only lookup local users", channel.json_body["error"])
+
+    def test_limit(self):
+        """
+        Testing list of media with limit
+        """
+
+        number_media = 20
+        other_user_tok = self.login("user", "pass")
+        self._create_media(other_user_tok, number_media)
+
+        request, channel = self.make_request(
+            "GET", self.url + "?limit=5", access_token=self.admin_user_tok,
+        )
+        self.render(request)
+
+        self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+        self.assertEqual(channel.json_body["total"], number_media)
+        self.assertEqual(len(channel.json_body["media"]), 5)
+        self.assertEqual(channel.json_body["next_token"], 5)
+        self._check_fields(channel.json_body["media"])
+
+    def test_from(self):
+        """
+        Testing list of media with a defined starting point (from)
+        """
+
+        number_media = 20
+        other_user_tok = self.login("user", "pass")
+        self._create_media(other_user_tok, number_media)
+
+        request, channel = self.make_request(
+            "GET", self.url + "?from=5", access_token=self.admin_user_tok,
+        )
+        self.render(request)
+
+        self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+        self.assertEqual(channel.json_body["total"], number_media)
+        self.assertEqual(len(channel.json_body["media"]), 15)
+        self.assertNotIn("next_token", channel.json_body)
+        self._check_fields(channel.json_body["media"])
+
+    def test_limit_and_from(self):
+        """
+        Testing list of media with a defined starting point and limit
+        """
+
+        number_media = 20
+        other_user_tok = self.login("user", "pass")
+        self._create_media(other_user_tok, number_media)
+
+        request, channel = self.make_request(
+            "GET", self.url + "?from=5&limit=10", access_token=self.admin_user_tok,
+        )
+        self.render(request)
+
+        self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+        self.assertEqual(channel.json_body["total"], number_media)
+        self.assertEqual(channel.json_body["next_token"], 15)
+        self.assertEqual(len(channel.json_body["media"]), 10)
+        self._check_fields(channel.json_body["media"])
+
+    def test_limit_is_negative(self):
+        """
+        Testing that a negative limit parameter returns a 400
+        """
+
+        request, channel = self.make_request(
+            "GET", self.url + "?limit=-5", access_token=self.admin_user_tok,
+        )
+        self.render(request)
+
+        self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
+        self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"])
+
+    def test_from_is_negative(self):
+        """
+        Testing that a negative from parameter returns a 400
+        """
+
+        request, channel = self.make_request(
+            "GET", self.url + "?from=-5", access_token=self.admin_user_tok,
+        )
+        self.render(request)
+
+        self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
+        self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"])
+
+    def test_next_token(self):
+        """
+        Testing that `next_token` appears at the right place
+        """
+
+        number_media = 20
+        other_user_tok = self.login("user", "pass")
+        self._create_media(other_user_tok, number_media)
+
+        #  `next_token` does not appear
+        # Number of results is the number of entries
+        request, channel = self.make_request(
+            "GET", self.url + "?limit=20", access_token=self.admin_user_tok,
+        )
+        self.render(request)
+
+        self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+        self.assertEqual(channel.json_body["total"], number_media)
+        self.assertEqual(len(channel.json_body["media"]), number_media)
+        self.assertNotIn("next_token", channel.json_body)
+
+        #  `next_token` does not appear
+        # Number of max results is larger than the number of entries
+        request, channel = self.make_request(
+            "GET", self.url + "?limit=21", access_token=self.admin_user_tok,
+        )
+        self.render(request)
+
+        self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+        self.assertEqual(channel.json_body["total"], number_media)
+        self.assertEqual(len(channel.json_body["media"]), number_media)
+        self.assertNotIn("next_token", channel.json_body)
+
+        #  `next_token` does appear
+        # Number of max results is smaller than the number of entries
+        request, channel = self.make_request(
+            "GET", self.url + "?limit=19", access_token=self.admin_user_tok,
+        )
+        self.render(request)
+
+        self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+        self.assertEqual(channel.json_body["total"], number_media)
+        self.assertEqual(len(channel.json_body["media"]), 19)
+        self.assertEqual(channel.json_body["next_token"], 19)
+
+        # Check
+        # Set `from` to value of `next_token` for request remaining entries
+        #  `next_token` does not appear
+        request, channel = self.make_request(
+            "GET", self.url + "?from=19", access_token=self.admin_user_tok,
+        )
+        self.render(request)
+
+        self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+        self.assertEqual(channel.json_body["total"], number_media)
+        self.assertEqual(len(channel.json_body["media"]), 1)
+        self.assertNotIn("next_token", channel.json_body)
+
+    def test_user_has_no_media(self):
+        """
+        Tests that a normal lookup for media is successfully
+        if user has no media created
+        """
+
+        request, channel = self.make_request(
+            "GET", self.url, access_token=self.admin_user_tok,
+        )
+        self.render(request)
+
+        self.assertEqual(200, channel.code, msg=channel.json_body)
+        self.assertEqual(0, channel.json_body["total"])
+        self.assertEqual(0, len(channel.json_body["media"]))
+
+    def test_get_media(self):
+        """
+        Tests that a normal lookup for media is successfully
+        """
+
+        number_media = 5
+        other_user_tok = self.login("user", "pass")
+        self._create_media(other_user_tok, number_media)
+
+        request, channel = self.make_request(
+            "GET", self.url, access_token=self.admin_user_tok,
+        )
+        self.render(request)
+
+        self.assertEqual(200, channel.code, msg=channel.json_body)
+        self.assertEqual(number_media, channel.json_body["total"])
+        self.assertEqual(number_media, len(channel.json_body["media"]))
+        self.assertNotIn("next_token", channel.json_body)
+        self._check_fields(channel.json_body["media"])
+
+    def _create_media(self, user_token, number_media):
+        """
+        Create a number of media for a specific user
+        """
+        upload_resource = self.media_repo.children[b"upload"]
+        for i in range(number_media):
+            # file size is 67 Byte
+            image_data = unhexlify(
+                b"89504e470d0a1a0a0000000d4948445200000001000000010806"
+                b"0000001f15c4890000000a49444154789c63000100000500010d"
+                b"0a2db40000000049454e44ae426082"
+            )
+
+            # Upload some media into the room
+            self.helper.upload_media(
+                upload_resource, image_data, tok=user_token, expect_code=200
+            )
+
+    def _check_fields(self, content):
+        """Checks that all attributes are present in content
+        """
+        for m in content:
+            self.assertIn("media_id", m)
+            self.assertIn("media_type", m)
+            self.assertIn("media_length", m)
+            self.assertIn("upload_name", m)
+            self.assertIn("created_ts", m)
+            self.assertIn("last_access_ts", m)
+            self.assertIn("quarantined_by", m)
+            self.assertIn("safe_from_quarantine", m)