diff --git a/tests/handlers/test_oidc.py b/tests/handlers/test_oidc.py
index a81501979d..ff8e3c5cb6 100644
--- a/tests/handlers/test_oidc.py
+++ b/tests/handlers/test_oidc.py
@@ -57,6 +57,7 @@ CLIENT_ID = "test-client-id"
CLIENT_SECRET = "test-client-secret"
BASE_URL = "https://synapse/"
CALLBACK_URL = BASE_URL + "_synapse/client/oidc/callback"
+TEST_REDIRECT_URI = "https://test/oidc/callback"
SCOPES = ["openid"]
# config for common cases
@@ -70,12 +71,16 @@ DEFAULT_CONFIG = {
}
# extends the default config with explicit OAuth2 endpoints instead of using discovery
+#
+# We add "explicit" to things to make them different from the discovered values to make
+# sure that the explicit values override the discovered ones.
EXPLICIT_ENDPOINT_CONFIG = {
**DEFAULT_CONFIG,
"discover": False,
- "authorization_endpoint": ISSUER + "authorize",
- "token_endpoint": ISSUER + "token",
- "jwks_uri": ISSUER + "jwks",
+ "authorization_endpoint": ISSUER + "authorize-explicit",
+ "token_endpoint": ISSUER + "token-explicit",
+ "jwks_uri": ISSUER + "jwks-explicit",
+ "id_token_signing_alg_values_supported": ["RS256", "<explicit>"],
}
@@ -259,12 +264,64 @@ class OidcHandlerTestCase(HomeserverTestCase):
self.get_success(self.provider.load_metadata())
self.fake_server.get_metadata_handler.assert_not_called()
+ @override_config({"oidc_config": {**EXPLICIT_ENDPOINT_CONFIG, "discover": True}})
+ def test_discovery_with_explicit_config(self) -> None:
+ """
+ The handler should discover the endpoints from OIDC discovery document but
+ values are overriden by the explicit config.
+ """
+ # This would throw if some metadata were invalid
+ metadata = self.get_success(self.provider.load_metadata())
+ self.fake_server.get_metadata_handler.assert_called_once()
+
+ self.assertEqual(metadata.issuer, self.fake_server.issuer)
+ # It seems like authlib does not have that defined in its metadata models
+ self.assertEqual(
+ metadata.get("userinfo_endpoint"),
+ self.fake_server.userinfo_endpoint,
+ )
+
+ # Ensure the values are overridden correctly since these were configured
+ # explicitly
+ self.assertEqual(
+ metadata.authorization_endpoint,
+ EXPLICIT_ENDPOINT_CONFIG["authorization_endpoint"],
+ )
+ self.assertEqual(
+ metadata.token_endpoint, EXPLICIT_ENDPOINT_CONFIG["token_endpoint"]
+ )
+ self.assertEqual(metadata.jwks_uri, EXPLICIT_ENDPOINT_CONFIG["jwks_uri"])
+ self.assertEqual(
+ metadata.id_token_signing_alg_values_supported,
+ EXPLICIT_ENDPOINT_CONFIG["id_token_signing_alg_values_supported"],
+ )
+
+ # subsequent calls should be cached
+ self.reset_mocks()
+ self.get_success(self.provider.load_metadata())
+ self.fake_server.get_metadata_handler.assert_not_called()
+
@override_config({"oidc_config": EXPLICIT_ENDPOINT_CONFIG})
def test_no_discovery(self) -> None:
"""When discovery is disabled, it should not try to load from discovery document."""
- self.get_success(self.provider.load_metadata())
+ metadata = self.get_success(self.provider.load_metadata())
self.fake_server.get_metadata_handler.assert_not_called()
+ # Ensure the values are overridden correctly since these were configured
+ # explicitly
+ self.assertEqual(
+ metadata.authorization_endpoint,
+ EXPLICIT_ENDPOINT_CONFIG["authorization_endpoint"],
+ )
+ self.assertEqual(
+ metadata.token_endpoint, EXPLICIT_ENDPOINT_CONFIG["token_endpoint"]
+ )
+ self.assertEqual(metadata.jwks_uri, EXPLICIT_ENDPOINT_CONFIG["jwks_uri"])
+ self.assertEqual(
+ metadata.id_token_signing_alg_values_supported,
+ EXPLICIT_ENDPOINT_CONFIG["id_token_signing_alg_values_supported"],
+ )
+
@override_config({"oidc_config": DEFAULT_CONFIG})
def test_load_jwks(self) -> None:
"""JWKS loading is done once (then cached) if used."""
@@ -427,6 +484,32 @@ class OidcHandlerTestCase(HomeserverTestCase):
self.assertEqual(code_verifier, "")
self.assertEqual(redirect, "http://client/redirect")
+ @override_config(
+ {
+ "oidc_config": {
+ **DEFAULT_CONFIG,
+ "passthrough_authorization_parameters": ["additional_parameter"],
+ }
+ }
+ )
+ def test_passthrough_parameters(self) -> None:
+ """The redirect request has additional parameters, one is authorized, one is not"""
+ req = Mock(spec=["cookies", "args"])
+ req.cookies = []
+ req.args = {}
+ req.args[b"additional_parameter"] = ["a_value".encode("utf-8")]
+ req.args[b"not_authorized_parameter"] = ["any".encode("utf-8")]
+
+ url = urlparse(
+ self.get_success(
+ self.provider.handle_redirect_request(req, b"http://client/redirect")
+ )
+ )
+
+ params = parse_qs(url.query)
+ self.assertEqual(params["additional_parameter"], ["a_value"])
+ self.assertNotIn("not_authorized_parameters", params)
+
@override_config({"oidc_config": DEFAULT_CONFIG})
def test_redirect_request_with_code_challenge(self) -> None:
"""The redirect request has the right arguments & generates a valid session cookie."""
@@ -530,6 +613,24 @@ class OidcHandlerTestCase(HomeserverTestCase):
code_verifier = get_value_from_macaroon(macaroon, "code_verifier")
self.assertEqual(code_verifier, "")
+ @override_config(
+ {"oidc_config": {**DEFAULT_CONFIG, "redirect_uri": TEST_REDIRECT_URI}}
+ )
+ def test_redirect_request_with_overridden_redirect_uri(self) -> None:
+ """The authorization endpoint redirect has the overridden `redirect_uri` value."""
+ req = Mock(spec=["cookies"])
+ req.cookies = []
+
+ url = urlparse(
+ self.get_success(
+ self.provider.handle_redirect_request(req, b"http://client/redirect")
+ )
+ )
+
+ # Ensure that the redirect_uri in the returned url has been overridden.
+ params = parse_qs(url.query)
+ self.assertEqual(params["redirect_uri"], [TEST_REDIRECT_URI])
+
@override_config({"oidc_config": DEFAULT_CONFIG})
def test_callback_error(self) -> None:
"""Errors from the provider returned in the callback are displayed."""
@@ -901,6 +1002,81 @@ class OidcHandlerTestCase(HomeserverTestCase):
{
"oidc_config": {
**DEFAULT_CONFIG,
+ "redirect_uri": TEST_REDIRECT_URI,
+ }
+ }
+ )
+ def test_code_exchange_with_overridden_redirect_uri(self) -> None:
+ """Code exchange behaves correctly and handles various error scenarios."""
+ # Set up a fake IdP with a token endpoint handler.
+ token = {
+ "type": "Bearer",
+ "access_token": "aabbcc",
+ }
+
+ self.fake_server.post_token_handler.side_effect = None
+ self.fake_server.post_token_handler.return_value = FakeResponse.json(
+ payload=token
+ )
+ code = "code"
+
+ # Exchange the code against the fake IdP.
+ self.get_success(self.provider._exchange_code(code, code_verifier=""))
+
+ # Check that the `redirect_uri` parameter provided matches our
+ # overridden config value.
+ kwargs = self.fake_server.request.call_args[1]
+ args = parse_qs(kwargs["data"].decode("utf-8"))
+ self.assertEqual(args["redirect_uri"], [TEST_REDIRECT_URI])
+
+ @override_config(
+ {
+ "oidc_config": {
+ **DEFAULT_CONFIG,
+ "redirect_uri": TEST_REDIRECT_URI,
+ }
+ }
+ )
+ def test_code_exchange_ignores_access_token(self) -> None:
+ """
+ Code exchange completes successfully and doesn't validate the `at_hash`
+ (access token hash) field of an ID token when the access token isn't
+ going to be used.
+
+ The access token won't be used in this test because Synapse (currently)
+ only needs it to fetch a user's metadata if it isn't included in the ID
+ token itself.
+
+ Because we have included "openid" in the requested scopes for this IdP
+ (see `SCOPES`), user metadata is be included in the ID token. Thus the
+ access token isn't needed, and it's unnecessary for Synapse to validate
+ the access token.
+
+ This is a regression test for a situation where an upstream identity
+ provider was providing an invalid `at_hash` value, which Synapse errored
+ on, yet Synapse wasn't using the access token for anything.
+ """
+ # Exchange the code against the fake IdP.
+ userinfo = {
+ "sub": "foo",
+ "username": "foo",
+ "phone": "1234567",
+ }
+ with self.fake_server.id_token_override(
+ {
+ "at_hash": "invalid-hash",
+ }
+ ):
+ request, _ = self.start_authorization(userinfo)
+ self.get_success(self.handler.handle_oidc_callback(request))
+
+ # If no error was rendered, then we have success.
+ self.render_error.assert_not_called()
+
+ @override_config(
+ {
+ "oidc_config": {
+ **DEFAULT_CONFIG,
"user_mapping_provider": {
"module": __name__ + ".TestMappingProviderExtra"
},
@@ -1271,6 +1447,113 @@ class OidcHandlerTestCase(HomeserverTestCase):
{
"oidc_config": {
**DEFAULT_CONFIG,
+ "attribute_requirements": [
+ {"attribute": "test", "one_of": ["foo", "bar"]}
+ ],
+ }
+ }
+ )
+ def test_attribute_requirements_one_of_succeeds(self) -> None:
+ """Test that auth succeeds if userinfo attribute has multiple values and CONTAINS required value"""
+ # userinfo with "test": ["bar"] attribute should succeed.
+ userinfo = {
+ "sub": "tester",
+ "username": "tester",
+ "test": ["bar"],
+ }
+ request, _ = self.start_authorization(userinfo)
+ self.get_success(self.handler.handle_oidc_callback(request))
+
+ # check that the auth handler got called as expected
+ self.complete_sso_login.assert_called_once_with(
+ "@tester:test",
+ self.provider.idp_id,
+ request,
+ ANY,
+ None,
+ new_user=True,
+ auth_provider_session_id=None,
+ )
+
+ @override_config(
+ {
+ "oidc_config": {
+ **DEFAULT_CONFIG,
+ "attribute_requirements": [
+ {"attribute": "test", "one_of": ["foo", "bar"]}
+ ],
+ }
+ }
+ )
+ def test_attribute_requirements_one_of_fails(self) -> None:
+ """Test that auth fails if userinfo attribute has multiple values yet
+ DOES NOT CONTAIN a required value
+ """
+ # userinfo with "test": ["something else"] attribute should fail.
+ userinfo = {
+ "sub": "tester",
+ "username": "tester",
+ "test": ["something else"],
+ }
+ request, _ = self.start_authorization(userinfo)
+ self.get_success(self.handler.handle_oidc_callback(request))
+ self.complete_sso_login.assert_not_called()
+
+ @override_config(
+ {
+ "oidc_config": {
+ **DEFAULT_CONFIG,
+ "attribute_requirements": [{"attribute": "test"}],
+ }
+ }
+ )
+ def test_attribute_requirements_does_not_exist(self) -> None:
+ """OIDC login fails if the required attribute does not exist in the OIDC userinfo response."""
+ # userinfo lacking "test" attribute should fail.
+ userinfo = {
+ "sub": "tester",
+ "username": "tester",
+ }
+ request, _ = self.start_authorization(userinfo)
+ self.get_success(self.handler.handle_oidc_callback(request))
+ self.complete_sso_login.assert_not_called()
+
+ @override_config(
+ {
+ "oidc_config": {
+ **DEFAULT_CONFIG,
+ "attribute_requirements": [{"attribute": "test"}],
+ }
+ }
+ )
+ def test_attribute_requirements_exist(self) -> None:
+ """OIDC login succeeds if the required attribute exist (regardless of value)
+ in the OIDC userinfo response.
+ """
+ # userinfo with "test" attribute and random value should succeed.
+ userinfo = {
+ "sub": "tester",
+ "username": "tester",
+ "test": random_string(5), # value does not matter
+ }
+ request, _ = self.start_authorization(userinfo)
+ self.get_success(self.handler.handle_oidc_callback(request))
+
+ # check that the auth handler got called as expected
+ self.complete_sso_login.assert_called_once_with(
+ "@tester:test",
+ self.provider.idp_id,
+ request,
+ ANY,
+ None,
+ new_user=True,
+ auth_provider_session_id=None,
+ )
+
+ @override_config(
+ {
+ "oidc_config": {
+ **DEFAULT_CONFIG,
"attribute_requirements": [{"attribute": "test", "value": "foobar"}],
}
}
|