diff --git a/tests/rest/client/test_auth.py b/tests/rest/client/test_auth.py
index 090cef5216..ebf653d018 100644
--- a/tests/rest/client/test_auth.py
+++ b/tests/rest/client/test_auth.py
@@ -465,9 +465,11 @@ class UIAuthTests(unittest.HomeserverTestCase):
* checking that the original operation succeeds
"""
+ fake_oidc_server = self.helper.fake_oidc_server()
+
# log the user in
remote_user_id = UserID.from_string(self.user).localpart
- login_resp = self.helper.login_via_oidc(remote_user_id)
+ login_resp, _ = self.helper.login_via_oidc(fake_oidc_server, remote_user_id)
self.assertEqual(login_resp["user_id"], self.user)
# initiate a UI Auth process by attempting to delete the device
@@ -481,8 +483,8 @@ class UIAuthTests(unittest.HomeserverTestCase):
# run the UIA-via-SSO flow
session_id = channel.json_body["session"]
- channel = self.helper.auth_via_oidc(
- {"sub": remote_user_id}, ui_auth_session_id=session_id
+ channel, _ = self.helper.auth_via_oidc(
+ fake_oidc_server, {"sub": remote_user_id}, ui_auth_session_id=session_id
)
# that should serve a confirmation page
@@ -499,7 +501,8 @@ class UIAuthTests(unittest.HomeserverTestCase):
@skip_unless(HAS_OIDC, "requires OIDC")
@override_config({"oidc_config": TEST_OIDC_CONFIG})
def test_does_not_offer_password_for_sso_user(self) -> None:
- login_resp = self.helper.login_via_oidc("username")
+ fake_oidc_server = self.helper.fake_oidc_server()
+ login_resp, _ = self.helper.login_via_oidc(fake_oidc_server, "username")
user_tok = login_resp["access_token"]
device_id = login_resp["device_id"]
@@ -522,7 +525,10 @@ class UIAuthTests(unittest.HomeserverTestCase):
@override_config({"oidc_config": TEST_OIDC_CONFIG})
def test_offers_both_flows_for_upgraded_user(self) -> None:
"""A user that had a password and then logged in with SSO should get both flows"""
- login_resp = self.helper.login_via_oidc(UserID.from_string(self.user).localpart)
+ fake_oidc_server = self.helper.fake_oidc_server()
+ login_resp, _ = self.helper.login_via_oidc(
+ fake_oidc_server, UserID.from_string(self.user).localpart
+ )
self.assertEqual(login_resp["user_id"], self.user)
channel = self.delete_device(
@@ -539,8 +545,13 @@ class UIAuthTests(unittest.HomeserverTestCase):
@override_config({"oidc_config": TEST_OIDC_CONFIG})
def test_ui_auth_fails_for_incorrect_sso_user(self) -> None:
"""If the user tries to authenticate with the wrong SSO user, they get an error"""
+
+ fake_oidc_server = self.helper.fake_oidc_server()
+
# log the user in
- login_resp = self.helper.login_via_oidc(UserID.from_string(self.user).localpart)
+ login_resp, _ = self.helper.login_via_oidc(
+ fake_oidc_server, UserID.from_string(self.user).localpart
+ )
self.assertEqual(login_resp["user_id"], self.user)
# start a UI Auth flow by attempting to delete a device
@@ -553,8 +564,8 @@ class UIAuthTests(unittest.HomeserverTestCase):
session_id = channel.json_body["session"]
# do the OIDC auth, but auth as the wrong user
- channel = self.helper.auth_via_oidc(
- {"sub": "wrong_user"}, ui_auth_session_id=session_id
+ channel, _ = self.helper.auth_via_oidc(
+ fake_oidc_server, {"sub": "wrong_user"}, ui_auth_session_id=session_id
)
# that should return a failure message
@@ -584,7 +595,10 @@ class UIAuthTests(unittest.HomeserverTestCase):
"""Tests that if we register a user via SSO while requiring approval for new
accounts, we still raise the correct error before logging the user in.
"""
- login_resp = self.helper.login_via_oidc("username", expected_status=403)
+ fake_oidc_server = self.helper.fake_oidc_server()
+ login_resp, _ = self.helper.login_via_oidc(
+ fake_oidc_server, "username", expected_status=403
+ )
self.assertEqual(login_resp["errcode"], Codes.USER_AWAITING_APPROVAL)
self.assertEqual(
diff --git a/tests/rest/client/test_login.py b/tests/rest/client/test_login.py
index e801ba8c8b..ff5baa9f0a 100644
--- a/tests/rest/client/test_login.py
+++ b/tests/rest/client/test_login.py
@@ -36,7 +36,7 @@ from synapse.util import Clock
from tests import unittest
from tests.handlers.test_oidc import HAS_OIDC
from tests.handlers.test_saml import has_saml2
-from tests.rest.client.utils import TEST_OIDC_AUTH_ENDPOINT, TEST_OIDC_CONFIG
+from tests.rest.client.utils import TEST_OIDC_CONFIG
from tests.server import FakeChannel
from tests.test_utils.html_parsers import TestHtmlParser
from tests.unittest import HomeserverTestCase, override_config, skip_unless
@@ -612,13 +612,16 @@ class MultiSSOTestCase(unittest.HomeserverTestCase):
def test_login_via_oidc(self) -> None:
"""If OIDC is chosen, should redirect to the OIDC auth endpoint"""
- # pick the default OIDC provider
- channel = self.make_request(
- "GET",
- "/_synapse/client/pick_idp?redirectUrl="
- + urllib.parse.quote_plus(TEST_CLIENT_REDIRECT_URL)
- + "&idp=oidc",
- )
+ fake_oidc_server = self.helper.fake_oidc_server()
+
+ with fake_oidc_server.patch_homeserver(hs=self.hs):
+ # pick the default OIDC provider
+ channel = self.make_request(
+ "GET",
+ "/_synapse/client/pick_idp?redirectUrl="
+ + urllib.parse.quote_plus(TEST_CLIENT_REDIRECT_URL)
+ + "&idp=oidc",
+ )
self.assertEqual(channel.code, 302, channel.result)
location_headers = channel.headers.getRawHeaders("Location")
assert location_headers
@@ -626,7 +629,7 @@ class MultiSSOTestCase(unittest.HomeserverTestCase):
oidc_uri_path, oidc_uri_query = oidc_uri.split("?", 1)
# it should redirect us to the auth page of the OIDC server
- self.assertEqual(oidc_uri_path, TEST_OIDC_AUTH_ENDPOINT)
+ self.assertEqual(oidc_uri_path, fake_oidc_server.authorization_endpoint)
# ... and should have set a cookie including the redirect url
cookie_headers = channel.headers.getRawHeaders("Set-Cookie")
@@ -643,7 +646,9 @@ class MultiSSOTestCase(unittest.HomeserverTestCase):
TEST_CLIENT_REDIRECT_URL,
)
- channel = self.helper.complete_oidc_auth(oidc_uri, cookies, {"sub": "user1"})
+ channel, _ = self.helper.complete_oidc_auth(
+ fake_oidc_server, oidc_uri, cookies, {"sub": "user1"}
+ )
# that should serve a confirmation page
self.assertEqual(channel.code, 200, channel.result)
@@ -693,7 +698,10 @@ class MultiSSOTestCase(unittest.HomeserverTestCase):
def test_client_idp_redirect_to_oidc(self) -> None:
"""If the client pick a known IdP, redirect to it"""
- channel = self._make_sso_redirect_request("oidc")
+ fake_oidc_server = self.helper.fake_oidc_server()
+
+ with fake_oidc_server.patch_homeserver(hs=self.hs):
+ channel = self._make_sso_redirect_request("oidc")
self.assertEqual(channel.code, 302, channel.result)
location_headers = channel.headers.getRawHeaders("Location")
assert location_headers
@@ -701,7 +709,7 @@ class MultiSSOTestCase(unittest.HomeserverTestCase):
oidc_uri_path, oidc_uri_query = oidc_uri.split("?", 1)
# it should redirect us to the auth page of the OIDC server
- self.assertEqual(oidc_uri_path, TEST_OIDC_AUTH_ENDPOINT)
+ self.assertEqual(oidc_uri_path, fake_oidc_server.authorization_endpoint)
def _make_sso_redirect_request(self, idp_prov: Optional[str] = None) -> FakeChannel:
"""Send a request to /_matrix/client/r0/login/sso/redirect
@@ -1280,9 +1288,13 @@ class UsernamePickerTestCase(HomeserverTestCase):
def test_username_picker(self) -> None:
"""Test the happy path of a username picker flow."""
+ fake_oidc_server = self.helper.fake_oidc_server()
+
# do the start of the login flow
- channel = self.helper.auth_via_oidc(
- {"sub": "tester", "displayname": "Jonny"}, TEST_CLIENT_REDIRECT_URL
+ channel, _ = self.helper.auth_via_oidc(
+ fake_oidc_server,
+ {"sub": "tester", "displayname": "Jonny"},
+ TEST_CLIENT_REDIRECT_URL,
)
# that should redirect to the username picker
diff --git a/tests/rest/client/utils.py b/tests/rest/client/utils.py
index c249a42bb6..967d229223 100644
--- a/tests/rest/client/utils.py
+++ b/tests/rest/client/utils.py
@@ -31,7 +31,6 @@ from typing import (
Tuple,
overload,
)
-from unittest.mock import patch
from urllib.parse import urlencode
import attr
@@ -46,8 +45,19 @@ from synapse.server import HomeServer
from synapse.types import JsonDict
from tests.server import FakeChannel, FakeSite, make_request
-from tests.test_utils import FakeResponse
from tests.test_utils.html_parsers import TestHtmlParser
+from tests.test_utils.oidc import FakeAuthorizationGrant, FakeOidcServer
+
+# an 'oidc_config' suitable for login_via_oidc.
+TEST_OIDC_ISSUER = "https://issuer.test/"
+TEST_OIDC_CONFIG = {
+ "enabled": True,
+ "issuer": TEST_OIDC_ISSUER,
+ "client_id": "test-client-id",
+ "client_secret": "test-client-secret",
+ "scopes": ["openid"],
+ "user_mapping_provider": {"config": {"localpart_template": "{{ user.sub }}"}},
+}
@attr.s(auto_attribs=True)
@@ -543,12 +553,28 @@ class RestHelper:
return channel.json_body
+ def fake_oidc_server(self, issuer: str = TEST_OIDC_ISSUER) -> FakeOidcServer:
+ """Create a ``FakeOidcServer``.
+
+ This can be used in conjuction with ``login_via_oidc``::
+
+ fake_oidc_server = self.helper.fake_oidc_server()
+ login_data, _ = self.helper.login_via_oidc(fake_oidc_server, "user")
+ """
+
+ return FakeOidcServer(
+ clock=self.hs.get_clock(),
+ issuer=issuer,
+ )
+
def login_via_oidc(
self,
+ fake_server: FakeOidcServer,
remote_user_id: str,
+ with_sid: bool = False,
expected_status: int = 200,
- ) -> JsonDict:
- """Log in via OIDC
+ ) -> Tuple[JsonDict, FakeAuthorizationGrant]:
+ """Log in (as a new user) via OIDC
Returns the result of the final token login.
@@ -560,7 +586,10 @@ class RestHelper:
the normal places.
"""
client_redirect_url = "https://x"
- channel = self.auth_via_oidc({"sub": remote_user_id}, client_redirect_url)
+ userinfo = {"sub": remote_user_id}
+ channel, grant = self.auth_via_oidc(
+ fake_server, userinfo, client_redirect_url, with_sid=with_sid
+ )
# expect a confirmation page
assert channel.code == HTTPStatus.OK, channel.result
@@ -585,14 +614,16 @@ class RestHelper:
assert (
channel.code == expected_status
), f"unexpected status in response: {channel.code}"
- return channel.json_body
+ return channel.json_body, grant
def auth_via_oidc(
self,
+ fake_server: FakeOidcServer,
user_info_dict: JsonDict,
client_redirect_url: Optional[str] = None,
ui_auth_session_id: Optional[str] = None,
- ) -> FakeChannel:
+ with_sid: bool = False,
+ ) -> Tuple[FakeChannel, FakeAuthorizationGrant]:
"""Perform an OIDC authentication flow via a mock OIDC provider.
This can be used for either login or user-interactive auth.
@@ -616,6 +647,7 @@ class RestHelper:
the login redirect endpoint
ui_auth_session_id: if set, we will perform a UI Auth flow. The session id
of the UI auth.
+ with_sid: if True, generates a random `sid` (OIDC session ID)
Returns:
A FakeChannel containing the result of calling the OIDC callback endpoint.
@@ -625,14 +657,15 @@ class RestHelper:
cookies: Dict[str, str] = {}
- # if we're doing a ui auth, hit the ui auth redirect endpoint
- if ui_auth_session_id:
- # can't set the client redirect url for UI Auth
- assert client_redirect_url is None
- oauth_uri = self.initiate_sso_ui_auth(ui_auth_session_id, cookies)
- else:
- # otherwise, hit the login redirect endpoint
- oauth_uri = self.initiate_sso_login(client_redirect_url, cookies)
+ with fake_server.patch_homeserver(hs=self.hs):
+ # if we're doing a ui auth, hit the ui auth redirect endpoint
+ if ui_auth_session_id:
+ # can't set the client redirect url for UI Auth
+ assert client_redirect_url is None
+ oauth_uri = self.initiate_sso_ui_auth(ui_auth_session_id, cookies)
+ else:
+ # otherwise, hit the login redirect endpoint
+ oauth_uri = self.initiate_sso_login(client_redirect_url, cookies)
# we now have a URI for the OIDC IdP, but we skip that and go straight
# back to synapse's OIDC callback resource. However, we do need the "state"
@@ -640,17 +673,21 @@ class RestHelper:
# that synapse passes to the client.
oauth_uri_path, _ = oauth_uri.split("?", 1)
- assert oauth_uri_path == TEST_OIDC_AUTH_ENDPOINT, (
+ assert oauth_uri_path == fake_server.authorization_endpoint, (
"unexpected SSO URI " + oauth_uri_path
)
- return self.complete_oidc_auth(oauth_uri, cookies, user_info_dict)
+ return self.complete_oidc_auth(
+ fake_server, oauth_uri, cookies, user_info_dict, with_sid=with_sid
+ )
def complete_oidc_auth(
self,
+ fake_serer: FakeOidcServer,
oauth_uri: str,
cookies: Mapping[str, str],
user_info_dict: JsonDict,
- ) -> FakeChannel:
+ with_sid: bool = False,
+ ) -> Tuple[FakeChannel, FakeAuthorizationGrant]:
"""Mock out an OIDC authentication flow
Assumes that an OIDC auth has been initiated by one of initiate_sso_login or
@@ -661,50 +698,37 @@ class RestHelper:
Requires the OIDC callback resource to be mounted at the normal place.
Args:
+ fake_server: the fake OIDC server with which the auth should be done
oauth_uri: the OIDC URI returned by synapse's redirect endpoint (ie,
from initiate_sso_login or initiate_sso_ui_auth).
cookies: the cookies set by synapse's redirect endpoint, which will be
sent back to the callback endpoint.
user_info_dict: the remote userinfo that the OIDC provider should present.
Typically this should be '{"sub": "<remote user id>"}'.
+ with_sid: if True, generates a random `sid` (OIDC session ID)
Returns:
A FakeChannel containing the result of calling the OIDC callback endpoint.
"""
_, oauth_uri_qs = oauth_uri.split("?", 1)
params = urllib.parse.parse_qs(oauth_uri_qs)
+
+ code, grant = fake_serer.start_authorization(
+ scope=params["scope"][0],
+ userinfo=user_info_dict,
+ client_id=params["client_id"][0],
+ redirect_uri=params["redirect_uri"][0],
+ nonce=params["nonce"][0],
+ with_sid=with_sid,
+ )
+ state = params["state"][0]
+
callback_uri = "%s?%s" % (
urllib.parse.urlparse(params["redirect_uri"][0]).path,
- urllib.parse.urlencode({"state": params["state"][0], "code": "TEST_CODE"}),
+ urllib.parse.urlencode({"state": state, "code": code}),
)
- # before we hit the callback uri, stub out some methods in the http client so
- # that we don't have to handle full HTTPS requests.
- # (expected url, json response) pairs, in the order we expect them.
- expected_requests = [
- # first we get a hit to the token endpoint, which we tell to return
- # a dummy OIDC access token
- (TEST_OIDC_TOKEN_ENDPOINT, {"access_token": "TEST"}),
- # and then one to the user_info endpoint, which returns our remote user id.
- (TEST_OIDC_USERINFO_ENDPOINT, user_info_dict),
- ]
-
- async def mock_req(
- method: str,
- uri: str,
- data: Optional[dict] = None,
- headers: Optional[Iterable[Tuple[AnyStr, AnyStr]]] = None,
- ):
- (expected_uri, resp_obj) = expected_requests.pop(0)
- assert uri == expected_uri
- resp = FakeResponse(
- code=HTTPStatus.OK,
- phrase=b"OK",
- body=json.dumps(resp_obj).encode("utf-8"),
- )
- return resp
-
- with patch.object(self.hs.get_proxied_http_client(), "request", mock_req):
+ with fake_serer.patch_homeserver(hs=self.hs):
# now hit the callback URI with the right params and a made-up code
channel = make_request(
self.hs.get_reactor(),
@@ -715,7 +739,7 @@ class RestHelper:
("Cookie", "%s=%s" % (k, v)) for (k, v) in cookies.items()
],
)
- return channel
+ return channel, grant
def initiate_sso_login(
self, client_redirect_url: Optional[str], cookies: MutableMapping[str, str]
@@ -806,21 +830,3 @@ class RestHelper:
assert len(p.links) == 1, "not exactly one link in confirmation page"
oauth_uri = p.links[0]
return oauth_uri
-
-
-# an 'oidc_config' suitable for login_via_oidc.
-TEST_OIDC_AUTH_ENDPOINT = "https://issuer.test/auth"
-TEST_OIDC_TOKEN_ENDPOINT = "https://issuer.test/token"
-TEST_OIDC_USERINFO_ENDPOINT = "https://issuer.test/userinfo"
-TEST_OIDC_CONFIG = {
- "enabled": True,
- "discover": False,
- "issuer": "https://issuer.test",
- "client_id": "test-client-id",
- "client_secret": "test-client-secret",
- "scopes": ["profile"],
- "authorization_endpoint": TEST_OIDC_AUTH_ENDPOINT,
- "token_endpoint": TEST_OIDC_TOKEN_ENDPOINT,
- "userinfo_endpoint": TEST_OIDC_USERINFO_ENDPOINT,
- "user_mapping_provider": {"config": {"localpart_template": "{{ user.sub }}"}},
-}
|