diff --git a/tests/rest/client/v1/test_login.py b/tests/rest/client/v1/test_login.py
index 2668662c9e..5d987a30c7 100644
--- a/tests/rest/client/v1/test_login.py
+++ b/tests/rest/client/v1/test_login.py
@@ -7,8 +7,9 @@ from mock import Mock
import jwt
import synapse.rest.admin
+from synapse.appservice import ApplicationService
from synapse.rest.client.v1 import login, logout
-from synapse.rest.client.v2_alpha import devices
+from synapse.rest.client.v2_alpha import devices, register
from synapse.rest.client.v2_alpha.account import WhoamiRestServlet
from tests import unittest
@@ -748,3 +749,134 @@ class JWTPubKeyTestCase(unittest.HomeserverTestCase):
channel.json_body["error"],
"JWT validation failed: Signature verification failed",
)
+
+
+AS_USER = "as_user_alice"
+
+
+class AppserviceLoginRestServletTestCase(unittest.HomeserverTestCase):
+ servlets = [
+ login.register_servlets,
+ register.register_servlets,
+ ]
+
+ def register_as_user(self, username):
+ request, channel = self.make_request(
+ b"POST",
+ "/_matrix/client/r0/register?access_token=%s" % (self.service.token,),
+ {"username": username},
+ )
+ self.render(request)
+
+ def make_homeserver(self, reactor, clock):
+ self.hs = self.setup_test_homeserver()
+
+ self.service = ApplicationService(
+ id="unique_identifier",
+ token="some_token",
+ hostname="example.com",
+ sender="@asbot:example.com",
+ namespaces={
+ ApplicationService.NS_USERS: [
+ {"regex": r"@as_user.*", "exclusive": False}
+ ],
+ ApplicationService.NS_ROOMS: [],
+ ApplicationService.NS_ALIASES: [],
+ },
+ )
+ self.another_service = ApplicationService(
+ id="another__identifier",
+ token="another_token",
+ hostname="example.com",
+ sender="@as2bot:example.com",
+ namespaces={
+ ApplicationService.NS_USERS: [
+ {"regex": r"@as2_user.*", "exclusive": False}
+ ],
+ ApplicationService.NS_ROOMS: [],
+ ApplicationService.NS_ALIASES: [],
+ },
+ )
+
+ self.hs.get_datastore().services_cache.append(self.service)
+ self.hs.get_datastore().services_cache.append(self.another_service)
+ return self.hs
+
+ def test_login_appservice_user(self):
+ """Test that an appservice user can use /login
+ """
+ self.register_as_user(AS_USER)
+
+ params = {
+ "type": login.LoginRestServlet.APPSERVICE_TYPE,
+ "identifier": {"type": "m.id.user", "user": AS_USER},
+ }
+ request, channel = self.make_request(
+ b"POST", LOGIN_URL, params, access_token=self.service.token
+ )
+
+ self.render(request)
+ self.assertEquals(channel.result["code"], b"200", channel.result)
+
+ def test_login_appservice_user_bot(self):
+ """Test that the appservice bot can use /login
+ """
+ self.register_as_user(AS_USER)
+
+ params = {
+ "type": login.LoginRestServlet.APPSERVICE_TYPE,
+ "identifier": {"type": "m.id.user", "user": self.service.sender},
+ }
+ request, channel = self.make_request(
+ b"POST", LOGIN_URL, params, access_token=self.service.token
+ )
+
+ self.render(request)
+ self.assertEquals(channel.result["code"], b"200", channel.result)
+
+ def test_login_appservice_wrong_user(self):
+ """Test that non-as users cannot login with the as token
+ """
+ self.register_as_user(AS_USER)
+
+ params = {
+ "type": login.LoginRestServlet.APPSERVICE_TYPE,
+ "identifier": {"type": "m.id.user", "user": "fibble_wibble"},
+ }
+ request, channel = self.make_request(
+ b"POST", LOGIN_URL, params, access_token=self.service.token
+ )
+
+ self.render(request)
+ self.assertEquals(channel.result["code"], b"403", channel.result)
+
+ def test_login_appservice_wrong_as(self):
+ """Test that as users cannot login with wrong as token
+ """
+ self.register_as_user(AS_USER)
+
+ params = {
+ "type": login.LoginRestServlet.APPSERVICE_TYPE,
+ "identifier": {"type": "m.id.user", "user": AS_USER},
+ }
+ request, channel = self.make_request(
+ b"POST", LOGIN_URL, params, access_token=self.another_service.token
+ )
+
+ self.render(request)
+ self.assertEquals(channel.result["code"], b"403", channel.result)
+
+ def test_login_appservice_no_token(self):
+ """Test that users must provide a token when using the appservice
+ login method
+ """
+ self.register_as_user(AS_USER)
+
+ params = {
+ "type": login.LoginRestServlet.APPSERVICE_TYPE,
+ "identifier": {"type": "m.id.user", "user": AS_USER},
+ }
+ request, channel = self.make_request(b"POST", LOGIN_URL, params)
+
+ self.render(request)
+ self.assertEquals(channel.result["code"], b"401", channel.result)
|