summary refs log tree commit diff
path: root/tests/handlers/test_oidc.py
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--tests/handlers/test_oidc.py291
1 files changed, 287 insertions, 4 deletions
diff --git a/tests/handlers/test_oidc.py b/tests/handlers/test_oidc.py

index a81501979d..ff8e3c5cb6 100644 --- a/tests/handlers/test_oidc.py +++ b/tests/handlers/test_oidc.py
@@ -57,6 +57,7 @@ CLIENT_ID = "test-client-id" CLIENT_SECRET = "test-client-secret" BASE_URL = "https://synapse/" CALLBACK_URL = BASE_URL + "_synapse/client/oidc/callback" +TEST_REDIRECT_URI = "https://test/oidc/callback" SCOPES = ["openid"] # config for common cases @@ -70,12 +71,16 @@ DEFAULT_CONFIG = { } # extends the default config with explicit OAuth2 endpoints instead of using discovery +# +# We add "explicit" to things to make them different from the discovered values to make +# sure that the explicit values override the discovered ones. EXPLICIT_ENDPOINT_CONFIG = { **DEFAULT_CONFIG, "discover": False, - "authorization_endpoint": ISSUER + "authorize", - "token_endpoint": ISSUER + "token", - "jwks_uri": ISSUER + "jwks", + "authorization_endpoint": ISSUER + "authorize-explicit", + "token_endpoint": ISSUER + "token-explicit", + "jwks_uri": ISSUER + "jwks-explicit", + "id_token_signing_alg_values_supported": ["RS256", "<explicit>"], } @@ -259,12 +264,64 @@ class OidcHandlerTestCase(HomeserverTestCase): self.get_success(self.provider.load_metadata()) self.fake_server.get_metadata_handler.assert_not_called() + @override_config({"oidc_config": {**EXPLICIT_ENDPOINT_CONFIG, "discover": True}}) + def test_discovery_with_explicit_config(self) -> None: + """ + The handler should discover the endpoints from OIDC discovery document but + values are overriden by the explicit config. + """ + # This would throw if some metadata were invalid + metadata = self.get_success(self.provider.load_metadata()) + self.fake_server.get_metadata_handler.assert_called_once() + + self.assertEqual(metadata.issuer, self.fake_server.issuer) + # It seems like authlib does not have that defined in its metadata models + self.assertEqual( + metadata.get("userinfo_endpoint"), + self.fake_server.userinfo_endpoint, + ) + + # Ensure the values are overridden correctly since these were configured + # explicitly + self.assertEqual( + metadata.authorization_endpoint, + EXPLICIT_ENDPOINT_CONFIG["authorization_endpoint"], + ) + self.assertEqual( + metadata.token_endpoint, EXPLICIT_ENDPOINT_CONFIG["token_endpoint"] + ) + self.assertEqual(metadata.jwks_uri, EXPLICIT_ENDPOINT_CONFIG["jwks_uri"]) + self.assertEqual( + metadata.id_token_signing_alg_values_supported, + EXPLICIT_ENDPOINT_CONFIG["id_token_signing_alg_values_supported"], + ) + + # subsequent calls should be cached + self.reset_mocks() + self.get_success(self.provider.load_metadata()) + self.fake_server.get_metadata_handler.assert_not_called() + @override_config({"oidc_config": EXPLICIT_ENDPOINT_CONFIG}) def test_no_discovery(self) -> None: """When discovery is disabled, it should not try to load from discovery document.""" - self.get_success(self.provider.load_metadata()) + metadata = self.get_success(self.provider.load_metadata()) self.fake_server.get_metadata_handler.assert_not_called() + # Ensure the values are overridden correctly since these were configured + # explicitly + self.assertEqual( + metadata.authorization_endpoint, + EXPLICIT_ENDPOINT_CONFIG["authorization_endpoint"], + ) + self.assertEqual( + metadata.token_endpoint, EXPLICIT_ENDPOINT_CONFIG["token_endpoint"] + ) + self.assertEqual(metadata.jwks_uri, EXPLICIT_ENDPOINT_CONFIG["jwks_uri"]) + self.assertEqual( + metadata.id_token_signing_alg_values_supported, + EXPLICIT_ENDPOINT_CONFIG["id_token_signing_alg_values_supported"], + ) + @override_config({"oidc_config": DEFAULT_CONFIG}) def test_load_jwks(self) -> None: """JWKS loading is done once (then cached) if used.""" @@ -427,6 +484,32 @@ class OidcHandlerTestCase(HomeserverTestCase): self.assertEqual(code_verifier, "") self.assertEqual(redirect, "http://client/redirect") + @override_config( + { + "oidc_config": { + **DEFAULT_CONFIG, + "passthrough_authorization_parameters": ["additional_parameter"], + } + } + ) + def test_passthrough_parameters(self) -> None: + """The redirect request has additional parameters, one is authorized, one is not""" + req = Mock(spec=["cookies", "args"]) + req.cookies = [] + req.args = {} + req.args[b"additional_parameter"] = ["a_value".encode("utf-8")] + req.args[b"not_authorized_parameter"] = ["any".encode("utf-8")] + + url = urlparse( + self.get_success( + self.provider.handle_redirect_request(req, b"http://client/redirect") + ) + ) + + params = parse_qs(url.query) + self.assertEqual(params["additional_parameter"], ["a_value"]) + self.assertNotIn("not_authorized_parameters", params) + @override_config({"oidc_config": DEFAULT_CONFIG}) def test_redirect_request_with_code_challenge(self) -> None: """The redirect request has the right arguments & generates a valid session cookie.""" @@ -530,6 +613,24 @@ class OidcHandlerTestCase(HomeserverTestCase): code_verifier = get_value_from_macaroon(macaroon, "code_verifier") self.assertEqual(code_verifier, "") + @override_config( + {"oidc_config": {**DEFAULT_CONFIG, "redirect_uri": TEST_REDIRECT_URI}} + ) + def test_redirect_request_with_overridden_redirect_uri(self) -> None: + """The authorization endpoint redirect has the overridden `redirect_uri` value.""" + req = Mock(spec=["cookies"]) + req.cookies = [] + + url = urlparse( + self.get_success( + self.provider.handle_redirect_request(req, b"http://client/redirect") + ) + ) + + # Ensure that the redirect_uri in the returned url has been overridden. + params = parse_qs(url.query) + self.assertEqual(params["redirect_uri"], [TEST_REDIRECT_URI]) + @override_config({"oidc_config": DEFAULT_CONFIG}) def test_callback_error(self) -> None: """Errors from the provider returned in the callback are displayed.""" @@ -901,6 +1002,81 @@ class OidcHandlerTestCase(HomeserverTestCase): { "oidc_config": { **DEFAULT_CONFIG, + "redirect_uri": TEST_REDIRECT_URI, + } + } + ) + def test_code_exchange_with_overridden_redirect_uri(self) -> None: + """Code exchange behaves correctly and handles various error scenarios.""" + # Set up a fake IdP with a token endpoint handler. + token = { + "type": "Bearer", + "access_token": "aabbcc", + } + + self.fake_server.post_token_handler.side_effect = None + self.fake_server.post_token_handler.return_value = FakeResponse.json( + payload=token + ) + code = "code" + + # Exchange the code against the fake IdP. + self.get_success(self.provider._exchange_code(code, code_verifier="")) + + # Check that the `redirect_uri` parameter provided matches our + # overridden config value. + kwargs = self.fake_server.request.call_args[1] + args = parse_qs(kwargs["data"].decode("utf-8")) + self.assertEqual(args["redirect_uri"], [TEST_REDIRECT_URI]) + + @override_config( + { + "oidc_config": { + **DEFAULT_CONFIG, + "redirect_uri": TEST_REDIRECT_URI, + } + } + ) + def test_code_exchange_ignores_access_token(self) -> None: + """ + Code exchange completes successfully and doesn't validate the `at_hash` + (access token hash) field of an ID token when the access token isn't + going to be used. + + The access token won't be used in this test because Synapse (currently) + only needs it to fetch a user's metadata if it isn't included in the ID + token itself. + + Because we have included "openid" in the requested scopes for this IdP + (see `SCOPES`), user metadata is be included in the ID token. Thus the + access token isn't needed, and it's unnecessary for Synapse to validate + the access token. + + This is a regression test for a situation where an upstream identity + provider was providing an invalid `at_hash` value, which Synapse errored + on, yet Synapse wasn't using the access token for anything. + """ + # Exchange the code against the fake IdP. + userinfo = { + "sub": "foo", + "username": "foo", + "phone": "1234567", + } + with self.fake_server.id_token_override( + { + "at_hash": "invalid-hash", + } + ): + request, _ = self.start_authorization(userinfo) + self.get_success(self.handler.handle_oidc_callback(request)) + + # If no error was rendered, then we have success. + self.render_error.assert_not_called() + + @override_config( + { + "oidc_config": { + **DEFAULT_CONFIG, "user_mapping_provider": { "module": __name__ + ".TestMappingProviderExtra" }, @@ -1271,6 +1447,113 @@ class OidcHandlerTestCase(HomeserverTestCase): { "oidc_config": { **DEFAULT_CONFIG, + "attribute_requirements": [ + {"attribute": "test", "one_of": ["foo", "bar"]} + ], + } + } + ) + def test_attribute_requirements_one_of_succeeds(self) -> None: + """Test that auth succeeds if userinfo attribute has multiple values and CONTAINS required value""" + # userinfo with "test": ["bar"] attribute should succeed. + userinfo = { + "sub": "tester", + "username": "tester", + "test": ["bar"], + } + request, _ = self.start_authorization(userinfo) + self.get_success(self.handler.handle_oidc_callback(request)) + + # check that the auth handler got called as expected + self.complete_sso_login.assert_called_once_with( + "@tester:test", + self.provider.idp_id, + request, + ANY, + None, + new_user=True, + auth_provider_session_id=None, + ) + + @override_config( + { + "oidc_config": { + **DEFAULT_CONFIG, + "attribute_requirements": [ + {"attribute": "test", "one_of": ["foo", "bar"]} + ], + } + } + ) + def test_attribute_requirements_one_of_fails(self) -> None: + """Test that auth fails if userinfo attribute has multiple values yet + DOES NOT CONTAIN a required value + """ + # userinfo with "test": ["something else"] attribute should fail. + userinfo = { + "sub": "tester", + "username": "tester", + "test": ["something else"], + } + request, _ = self.start_authorization(userinfo) + self.get_success(self.handler.handle_oidc_callback(request)) + self.complete_sso_login.assert_not_called() + + @override_config( + { + "oidc_config": { + **DEFAULT_CONFIG, + "attribute_requirements": [{"attribute": "test"}], + } + } + ) + def test_attribute_requirements_does_not_exist(self) -> None: + """OIDC login fails if the required attribute does not exist in the OIDC userinfo response.""" + # userinfo lacking "test" attribute should fail. + userinfo = { + "sub": "tester", + "username": "tester", + } + request, _ = self.start_authorization(userinfo) + self.get_success(self.handler.handle_oidc_callback(request)) + self.complete_sso_login.assert_not_called() + + @override_config( + { + "oidc_config": { + **DEFAULT_CONFIG, + "attribute_requirements": [{"attribute": "test"}], + } + } + ) + def test_attribute_requirements_exist(self) -> None: + """OIDC login succeeds if the required attribute exist (regardless of value) + in the OIDC userinfo response. + """ + # userinfo with "test" attribute and random value should succeed. + userinfo = { + "sub": "tester", + "username": "tester", + "test": random_string(5), # value does not matter + } + request, _ = self.start_authorization(userinfo) + self.get_success(self.handler.handle_oidc_callback(request)) + + # check that the auth handler got called as expected + self.complete_sso_login.assert_called_once_with( + "@tester:test", + self.provider.idp_id, + request, + ANY, + None, + new_user=True, + auth_provider_session_id=None, + ) + + @override_config( + { + "oidc_config": { + **DEFAULT_CONFIG, "attribute_requirements": [{"attribute": "test", "value": "foobar"}], } }