diff --git a/tests/rest/client/v2_alpha/test_auth.py b/tests/rest/client/v2_alpha/test_auth.py
index 485e3650c3..6b90f838b6 100644
--- a/tests/rest/client/v2_alpha/test_auth.py
+++ b/tests/rest/client/v2_alpha/test_auth.py
@@ -20,7 +20,7 @@ import synapse.rest.admin
from synapse.api.constants import LoginType
from synapse.handlers.ui_auth.checkers import UserInteractiveAuthChecker
from synapse.rest.client.v1 import login
-from synapse.rest.client.v2_alpha import auth, devices, register
+from synapse.rest.client.v2_alpha import account, auth, devices, register
from synapse.rest.synapse.client import build_synapse_client_resource_tree
from synapse.types import JsonDict, UserID
@@ -498,3 +498,221 @@ class UIAuthTests(unittest.HomeserverTestCase):
self.delete_device(
self.user_tok, self.device_id, 403, body={"auth": {"session": session_id}}
)
+
+
+class RefreshAuthTests(unittest.HomeserverTestCase):
+ servlets = [
+ auth.register_servlets,
+ account.register_servlets,
+ login.register_servlets,
+ synapse.rest.admin.register_servlets_for_client_rest_resource,
+ register.register_servlets,
+ ]
+ hijack_auth = False
+
+ def prepare(self, reactor, clock, hs):
+ self.user_pass = "pass"
+ self.user = self.register_user("test", self.user_pass)
+
+ def test_login_issue_refresh_token(self):
+ """
+ A login response should include a refresh_token only if asked.
+ """
+ # Test login
+ body = {"type": "m.login.password", "user": "test", "password": self.user_pass}
+
+ login_without_refresh = self.make_request(
+ "POST", "/_matrix/client/r0/login", body
+ )
+ self.assertEqual(login_without_refresh.code, 200, login_without_refresh.result)
+ self.assertNotIn("refresh_token", login_without_refresh.json_body)
+
+ login_with_refresh = self.make_request(
+ "POST",
+ "/_matrix/client/r0/login?org.matrix.msc2918.refresh_token=true",
+ body,
+ )
+ self.assertEqual(login_with_refresh.code, 200, login_with_refresh.result)
+ self.assertIn("refresh_token", login_with_refresh.json_body)
+ self.assertIn("expires_in_ms", login_with_refresh.json_body)
+
+ def test_register_issue_refresh_token(self):
+ """
+ A register response should include a refresh_token only if asked.
+ """
+ register_without_refresh = self.make_request(
+ "POST",
+ "/_matrix/client/r0/register",
+ {
+ "username": "test2",
+ "password": self.user_pass,
+ "auth": {"type": LoginType.DUMMY},
+ },
+ )
+ self.assertEqual(
+ register_without_refresh.code, 200, register_without_refresh.result
+ )
+ self.assertNotIn("refresh_token", register_without_refresh.json_body)
+
+ register_with_refresh = self.make_request(
+ "POST",
+ "/_matrix/client/r0/register?org.matrix.msc2918.refresh_token=true",
+ {
+ "username": "test3",
+ "password": self.user_pass,
+ "auth": {"type": LoginType.DUMMY},
+ },
+ )
+ self.assertEqual(register_with_refresh.code, 200, register_with_refresh.result)
+ self.assertIn("refresh_token", register_with_refresh.json_body)
+ self.assertIn("expires_in_ms", register_with_refresh.json_body)
+
+ def test_token_refresh(self):
+ """
+ A refresh token can be used to issue a new access token.
+ """
+ body = {"type": "m.login.password", "user": "test", "password": self.user_pass}
+ login_response = self.make_request(
+ "POST",
+ "/_matrix/client/r0/login?org.matrix.msc2918.refresh_token=true",
+ body,
+ )
+ self.assertEqual(login_response.code, 200, login_response.result)
+
+ refresh_response = self.make_request(
+ "POST",
+ "/_matrix/client/unstable/org.matrix.msc2918.refresh_token/refresh",
+ {"refresh_token": login_response.json_body["refresh_token"]},
+ )
+ self.assertEqual(refresh_response.code, 200, refresh_response.result)
+ self.assertIn("access_token", refresh_response.json_body)
+ self.assertIn("refresh_token", refresh_response.json_body)
+ self.assertIn("expires_in_ms", refresh_response.json_body)
+
+ # The access and refresh tokens should be different from the original ones after refresh
+ self.assertNotEqual(
+ login_response.json_body["access_token"],
+ refresh_response.json_body["access_token"],
+ )
+ self.assertNotEqual(
+ login_response.json_body["refresh_token"],
+ refresh_response.json_body["refresh_token"],
+ )
+
+ @override_config({"access_token_lifetime": "1m"})
+ def test_refresh_token_expiration(self):
+ """
+ The access token should have some time as specified in the config.
+ """
+ body = {"type": "m.login.password", "user": "test", "password": self.user_pass}
+ login_response = self.make_request(
+ "POST",
+ "/_matrix/client/r0/login?org.matrix.msc2918.refresh_token=true",
+ body,
+ )
+ self.assertEqual(login_response.code, 200, login_response.result)
+ self.assertApproximates(
+ login_response.json_body["expires_in_ms"], 60 * 1000, 100
+ )
+
+ refresh_response = self.make_request(
+ "POST",
+ "/_matrix/client/unstable/org.matrix.msc2918.refresh_token/refresh",
+ {"refresh_token": login_response.json_body["refresh_token"]},
+ )
+ self.assertEqual(refresh_response.code, 200, refresh_response.result)
+ self.assertApproximates(
+ refresh_response.json_body["expires_in_ms"], 60 * 1000, 100
+ )
+
+ def test_refresh_token_invalidation(self):
+ """Refresh tokens are invalidated after first use of the next token.
+
+ A refresh token is considered invalid if:
+ - it was already used at least once
+ - and either
+ - the next access token was used
+ - the next refresh token was used
+
+ The chain of tokens goes like this:
+
+ login -|-> first_refresh -> third_refresh (fails)
+ |-> second_refresh -> fifth_refresh
+ |-> fourth_refresh (fails)
+ """
+
+ body = {"type": "m.login.password", "user": "test", "password": self.user_pass}
+ login_response = self.make_request(
+ "POST",
+ "/_matrix/client/r0/login?org.matrix.msc2918.refresh_token=true",
+ body,
+ )
+ self.assertEqual(login_response.code, 200, login_response.result)
+
+ # This first refresh should work properly
+ first_refresh_response = self.make_request(
+ "POST",
+ "/_matrix/client/unstable/org.matrix.msc2918.refresh_token/refresh",
+ {"refresh_token": login_response.json_body["refresh_token"]},
+ )
+ self.assertEqual(
+ first_refresh_response.code, 200, first_refresh_response.result
+ )
+
+ # This one as well, since the token in the first one was never used
+ second_refresh_response = self.make_request(
+ "POST",
+ "/_matrix/client/unstable/org.matrix.msc2918.refresh_token/refresh",
+ {"refresh_token": login_response.json_body["refresh_token"]},
+ )
+ self.assertEqual(
+ second_refresh_response.code, 200, second_refresh_response.result
+ )
+
+ # This one should not, since the token from the first refresh is not valid anymore
+ third_refresh_response = self.make_request(
+ "POST",
+ "/_matrix/client/unstable/org.matrix.msc2918.refresh_token/refresh",
+ {"refresh_token": first_refresh_response.json_body["refresh_token"]},
+ )
+ self.assertEqual(
+ third_refresh_response.code, 401, third_refresh_response.result
+ )
+
+ # The associated access token should also be invalid
+ whoami_response = self.make_request(
+ "GET",
+ "/_matrix/client/r0/account/whoami",
+ access_token=first_refresh_response.json_body["access_token"],
+ )
+ self.assertEqual(whoami_response.code, 401, whoami_response.result)
+
+ # But all other tokens should work (they will expire after some time)
+ for access_token in [
+ second_refresh_response.json_body["access_token"],
+ login_response.json_body["access_token"],
+ ]:
+ whoami_response = self.make_request(
+ "GET", "/_matrix/client/r0/account/whoami", access_token=access_token
+ )
+ self.assertEqual(whoami_response.code, 200, whoami_response.result)
+
+ # Now that the access token from the last valid refresh was used once, refreshing with the N-1 token should fail
+ fourth_refresh_response = self.make_request(
+ "POST",
+ "/_matrix/client/unstable/org.matrix.msc2918.refresh_token/refresh",
+ {"refresh_token": login_response.json_body["refresh_token"]},
+ )
+ self.assertEqual(
+ fourth_refresh_response.code, 403, fourth_refresh_response.result
+ )
+
+ # But refreshing from the last valid refresh token still works
+ fifth_refresh_response = self.make_request(
+ "POST",
+ "/_matrix/client/unstable/org.matrix.msc2918.refresh_token/refresh",
+ {"refresh_token": second_refresh_response.json_body["refresh_token"]},
+ )
+ self.assertEqual(
+ fifth_refresh_response.code, 200, fifth_refresh_response.result
+ )
|