diff --git a/tests/rest/admin/test_user.py b/tests/rest/admin/test_user.py
index 98d0623734..3aa7414787 100644
--- a/tests/rest/admin/test_user.py
+++ b/tests/rest/admin/test_user.py
@@ -23,8 +23,8 @@ from mock import Mock
import synapse.rest.admin
from synapse.api.constants import UserTypes
from synapse.api.errors import Codes, HttpResponseException, ResourceLimitError
-from synapse.rest.client.v1 import login, room
-from synapse.rest.client.v2_alpha import sync
+from synapse.rest.client.v1 import login, logout, room
+from synapse.rest.client.v2_alpha import devices, sync
from tests import unittest
from tests.test_utils import make_awaitable
@@ -1101,3 +1101,191 @@ class UserMembershipRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(200, channel.code, msg=channel.json_body)
self.assertEqual(number_rooms, channel.json_body["total"])
self.assertEqual(number_rooms, len(channel.json_body["joined_rooms"]))
+
+
+class UserTokenRestTestCase(unittest.HomeserverTestCase):
+ """Test for /_synapse/admin/v1/users/<user>/login
+ """
+
+ servlets = [
+ synapse.rest.admin.register_servlets,
+ login.register_servlets,
+ sync.register_servlets,
+ room.register_servlets,
+ devices.register_servlets,
+ logout.register_servlets,
+ ]
+
+ def prepare(self, reactor, clock, hs):
+ self.store = hs.get_datastore()
+
+ self.admin_user = self.register_user("admin", "pass", admin=True)
+ self.admin_user_tok = self.login("admin", "pass")
+
+ self.other_user = self.register_user("user", "pass")
+ self.other_user_tok = self.login("user", "pass")
+ self.url = "/_synapse/admin/v1/users/%s/login" % urllib.parse.quote(
+ self.other_user
+ )
+
+ def _get_token(self) -> str:
+ request, channel = self.make_request(
+ "PUT", self.url, b"{}", access_token=self.admin_user_tok
+ )
+ self.render(request)
+ self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ return channel.json_body["access_token"]
+
+ def test_no_auth(self):
+ """Try to login as a user without authentication.
+ """
+ request, channel = self.make_request("PUT", self.url, b"{}")
+ self.render(request)
+
+ self.assertEqual(401, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
+
+ def test_not_admin(self):
+ """Try to login as a user as a non-admin user.
+ """
+ request, channel = self.make_request(
+ "PUT", self.url, b"{}", access_token=self.other_user_tok
+ )
+ self.render(request)
+
+ self.assertEqual(403, int(channel.result["code"]), msg=channel.result["body"])
+
+ def test_send_event(self):
+ """Test that sending event as a user works.
+ """
+ # Create a room.
+ room_id = self.helper.create_room_as(self.other_user, tok=self.other_user_tok)
+
+ # Login in as the user
+ puppet_token = self._get_token()
+
+ # Test that sending works, and generates the event as the right user.
+ resp = self.helper.send_event(room_id, "com.example.test", tok=puppet_token)
+ event_id = resp["event_id"]
+ event = self.get_success(self.store.get_event(event_id))
+ self.assertEqual(event.sender, self.other_user)
+
+ def test_devices(self):
+ """Tests that logging in as a user doesn't create a new device for them.
+ """
+ # Login in as the user
+ self._get_token()
+
+ # Check that we don't see a new device in our devices list
+ request, channel = self.make_request(
+ "GET", "devices", b"{}", access_token=self.other_user_tok
+ )
+ self.render(request)
+ self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+
+ # We should only see the one device (from the login in `prepare`)
+ self.assertEqual(len(channel.json_body["devices"]), 1)
+
+ def test_logout(self):
+ """Test that calling `/logout` with the token works.
+ """
+ # Login in as the user
+ puppet_token = self._get_token()
+
+ # Test that we can successfully make a request
+ request, channel = self.make_request(
+ "GET", "devices", b"{}", access_token=puppet_token
+ )
+ self.render(request)
+ self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+
+ # Logout with the puppet token
+ request, channel = self.make_request(
+ "POST", "logout", b"{}", access_token=puppet_token
+ )
+ self.render(request)
+ self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+
+ # The puppet token should no longer work
+ request, channel = self.make_request(
+ "GET", "devices", b"{}", access_token=puppet_token
+ )
+ self.render(request)
+ self.assertEqual(401, int(channel.result["code"]), msg=channel.result["body"])
+
+ # .. but the real user's tokens should still work
+ request, channel = self.make_request(
+ "GET", "devices", b"{}", access_token=self.other_user_tok
+ )
+ self.render(request)
+ self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+
+ def test_user_logout_all(self):
+ """Tests that the target user calling `/logout/all` does *not* expire
+ the token.
+ """
+ # Login in as the user
+ puppet_token = self._get_token()
+
+ # Test that we can successfully make a request
+ request, channel = self.make_request(
+ "GET", "devices", b"{}", access_token=puppet_token
+ )
+ self.render(request)
+ self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+
+ # Logout all with the real user token
+ request, channel = self.make_request(
+ "POST", "logout/all", b"{}", access_token=self.other_user_tok
+ )
+ self.render(request)
+ self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+
+ # The puppet token should still work
+ request, channel = self.make_request(
+ "GET", "devices", b"{}", access_token=puppet_token
+ )
+ self.render(request)
+ self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+
+ # .. but the real user's tokens shouldn't
+ request, channel = self.make_request(
+ "GET", "devices", b"{}", access_token=self.other_user_tok
+ )
+ self.render(request)
+ self.assertEqual(401, int(channel.result["code"]), msg=channel.result["body"])
+
+ def test_admin_logout_all(self):
+ """Tests that the admin user calling `/logout/all` does expire the
+ token.
+ """
+ # Login in as the user
+ puppet_token = self._get_token()
+
+ # Test that we can successfully make a request
+ request, channel = self.make_request(
+ "GET", "devices", b"{}", access_token=puppet_token
+ )
+ self.render(request)
+ self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+
+ # Logout all with the admin user token
+ request, channel = self.make_request(
+ "POST", "logout/all", b"{}", access_token=self.admin_user_tok
+ )
+ self.render(request)
+ self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+
+ # The puppet token should no longer work
+ request, channel = self.make_request(
+ "GET", "devices", b"{}", access_token=puppet_token
+ )
+ self.render(request)
+ self.assertEqual(401, int(channel.result["code"]), msg=channel.result["body"])
+
+ # .. but the real user's tokens should still work
+ request, channel = self.make_request(
+ "GET", "devices", b"{}", access_token=self.other_user_tok
+ )
+ self.render(request)
+ self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
|