summary refs log tree commit diff
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/handlers/test_saml.py427
-rw-r--r--tests/rest/client/test_login.py74
-rw-r--r--tests/utils.py1
3 files changed, 2 insertions, 500 deletions
diff --git a/tests/handlers/test_saml.py b/tests/handlers/test_saml.py
deleted file mode 100644

index 1aca354826..0000000000 --- a/tests/handlers/test_saml.py +++ /dev/null
@@ -1,427 +0,0 @@ -# -# This file is licensed under the Affero General Public License (AGPL) version 3. -# -# Copyright 2020 The Matrix.org Foundation C.I.C. -# Copyright (C) 2023 New Vector, Ltd -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as -# published by the Free Software Foundation, either version 3 of the -# License, or (at your option) any later version. -# -# See the GNU Affero General Public License for more details: -# <https://www.gnu.org/licenses/agpl-3.0.html>. -# -# Originally licensed under the Apache License, Version 2.0: -# <http://www.apache.org/licenses/LICENSE-2.0>. -# -# [This file includes modifications made by New Vector Limited] -# -# - -from typing import Any, Dict, Optional, Set, Tuple -from unittest.mock import AsyncMock, Mock - -import attr - -from twisted.test.proto_helpers import MemoryReactor - -from synapse.api.errors import RedirectException -from synapse.module_api import ModuleApi -from synapse.server import HomeServer -from synapse.types import JsonDict -from synapse.util import Clock - -from tests.unittest import HomeserverTestCase, override_config - -# Check if we have the dependencies to run the tests. -try: - import saml2.config - import saml2.response - from saml2.sigver import SigverError - - has_saml2 = True - - # pysaml2 can be installed and imported, but might not be able to find xmlsec1. - config = saml2.config.SPConfig() - try: - config.load({"metadata": {}}) - has_xmlsec1 = True - except SigverError: - has_xmlsec1 = False -except ImportError: - has_saml2 = False - has_xmlsec1 = False - -# These are a few constants that are used as config parameters in the tests. -BASE_URL = "https://synapse/" - - -@attr.s -class FakeAuthnResponse: - ava = attr.ib(type=dict) - assertions = attr.ib(type=list, factory=list) - in_response_to = attr.ib(type=Optional[str], default=None) - - -class TestMappingProvider: - def __init__(self, config: None, module: ModuleApi): - pass - - @staticmethod - def parse_config(config: JsonDict) -> None: - return None - - @staticmethod - def get_saml_attributes(config: None) -> Tuple[Set[str], Set[str]]: - return {"uid"}, {"displayName"} - - def get_remote_user_id( - self, saml_response: "saml2.response.AuthnResponse", client_redirect_url: str - ) -> str: - return saml_response.ava["uid"] - - def saml_response_to_user_attributes( - self, - saml_response: "saml2.response.AuthnResponse", - failures: int, - client_redirect_url: str, - ) -> dict: - localpart = saml_response.ava["username"] + (str(failures) if failures else "") - return {"mxid_localpart": localpart, "displayname": None} - - -class TestRedirectMappingProvider(TestMappingProvider): - def saml_response_to_user_attributes( - self, - saml_response: "saml2.response.AuthnResponse", - failures: int, - client_redirect_url: str, - ) -> dict: - raise RedirectException(b"https://custom-saml-redirect/") - - -class SamlHandlerTestCase(HomeserverTestCase): - def default_config(self) -> Dict[str, Any]: - config = super().default_config() - config["public_baseurl"] = BASE_URL - saml_config: Dict[str, Any] = { - "sp_config": {"metadata": {}}, - # Disable grandfathering. - "grandfathered_mxid_source_attribute": None, - "user_mapping_provider": {"module": __name__ + ".TestMappingProvider"}, - } - - # Update this config with what's in the default config so that - # override_config works as expected. - saml_config.update(config.get("saml2_config", {})) - config["saml2_config"] = saml_config - - return config - - def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer: - hs = self.setup_test_homeserver() - - self.handler = hs.get_saml_handler() - - # Reduce the number of attempts when generating MXIDs. - sso_handler = hs.get_sso_handler() - sso_handler._MAP_USERNAME_RETRIES = 3 - - return hs - - if not has_saml2: - skip = "Requires pysaml2" - elif not has_xmlsec1: - skip = "Requires xmlsec1" - - def test_map_saml_response_to_user(self) -> None: - """Ensure that mapping the SAML response returned from a provider to an MXID works properly.""" - - # stub out the auth handler - auth_handler = self.hs.get_auth_handler() - auth_handler.complete_sso_login = AsyncMock() # type: ignore[method-assign] - - # send a mocked-up SAML response to the callback - saml_response = FakeAuthnResponse({"uid": "test_user", "username": "test_user"}) - request = _mock_request() - self.get_success( - self.handler._handle_authn_response(request, saml_response, "redirect_uri") - ) - - # check that the auth handler got called as expected - auth_handler.complete_sso_login.assert_called_once_with( - "@test_user:test", - "saml", - request, - "redirect_uri", - None, - new_user=True, - auth_provider_session_id=None, - ) - - @override_config({"saml2_config": {"grandfathered_mxid_source_attribute": "mxid"}}) - def test_map_saml_response_to_existing_user(self) -> None: - """Existing users can log in with SAML account.""" - store = self.hs.get_datastores().main - self.get_success( - store.register_user(user_id="@test_user:test", password_hash=None) - ) - - # stub out the auth handler - auth_handler = self.hs.get_auth_handler() - auth_handler.complete_sso_login = AsyncMock() # type: ignore[method-assign] - - # Map a user via SSO. - saml_response = FakeAuthnResponse( - {"uid": "tester", "mxid": ["test_user"], "username": "test_user"} - ) - request = _mock_request() - self.get_success( - self.handler._handle_authn_response(request, saml_response, "") - ) - - # check that the auth handler got called as expected - auth_handler.complete_sso_login.assert_called_once_with( - "@test_user:test", - "saml", - request, - "", - None, - new_user=False, - auth_provider_session_id=None, - ) - - # Subsequent calls should map to the same mxid. - auth_handler.complete_sso_login.reset_mock() - self.get_success( - self.handler._handle_authn_response(request, saml_response, "") - ) - auth_handler.complete_sso_login.assert_called_once_with( - "@test_user:test", - "saml", - request, - "", - None, - new_user=False, - auth_provider_session_id=None, - ) - - def test_map_saml_response_to_invalid_localpart(self) -> None: - """If the mapping provider generates an invalid localpart it should be rejected.""" - - # stub out the auth handler - auth_handler = self.hs.get_auth_handler() - auth_handler.complete_sso_login = AsyncMock() # type: ignore[method-assign] - - # mock out the error renderer too - sso_handler = self.hs.get_sso_handler() - sso_handler.render_error = Mock(return_value=None) # type: ignore[method-assign] - - saml_response = FakeAuthnResponse({"uid": "test", "username": "föö"}) - request = _mock_request() - self.get_success( - self.handler._handle_authn_response(request, saml_response, ""), - ) - sso_handler.render_error.assert_called_once_with( - request, "mapping_error", "localpart is invalid: föö" - ) - auth_handler.complete_sso_login.assert_not_called() - - def test_map_saml_response_to_user_retries(self) -> None: - """The mapping provider can retry generating an MXID if the MXID is already in use.""" - - # stub out the auth handler and error renderer - auth_handler = self.hs.get_auth_handler() - auth_handler.complete_sso_login = AsyncMock() # type: ignore[method-assign] - sso_handler = self.hs.get_sso_handler() - sso_handler.render_error = Mock(return_value=None) # type: ignore[method-assign] - - # register a user to occupy the first-choice MXID - store = self.hs.get_datastores().main - self.get_success( - store.register_user(user_id="@test_user:test", password_hash=None) - ) - - # send the fake SAML response - saml_response = FakeAuthnResponse({"uid": "test", "username": "test_user"}) - request = _mock_request() - self.get_success( - self.handler._handle_authn_response(request, saml_response, ""), - ) - - # test_user is already taken, so test_user1 gets registered instead. - auth_handler.complete_sso_login.assert_called_once_with( - "@test_user1:test", - "saml", - request, - "", - None, - new_user=True, - auth_provider_session_id=None, - ) - auth_handler.complete_sso_login.reset_mock() - - # Register all of the potential mxids for a particular SAML username. - self.get_success( - store.register_user(user_id="@tester:test", password_hash=None) - ) - for i in range(1, 3): - self.get_success( - store.register_user(user_id="@tester%d:test" % i, password_hash=None) - ) - - # Now attempt to map to a username, this will fail since all potential usernames are taken. - saml_response = FakeAuthnResponse({"uid": "tester", "username": "tester"}) - self.get_success( - self.handler._handle_authn_response(request, saml_response, ""), - ) - sso_handler.render_error.assert_called_once_with( - request, - "mapping_error", - "Unable to generate a Matrix ID from the SSO response", - ) - auth_handler.complete_sso_login.assert_not_called() - - @override_config( - { - "saml2_config": { - "user_mapping_provider": { - "module": __name__ + ".TestRedirectMappingProvider" - }, - } - } - ) - def test_map_saml_response_redirect(self) -> None: - """Test a mapping provider that raises a RedirectException""" - - saml_response = FakeAuthnResponse({"uid": "test", "username": "test_user"}) - request = _mock_request() - e = self.get_failure( - self.handler._handle_authn_response(request, saml_response, ""), - RedirectException, - ) - self.assertEqual(e.value.location, b"https://custom-saml-redirect/") - - @override_config( - { - "saml2_config": { - "attribute_requirements": [ - {"attribute": "userGroup", "value": "staff"}, - {"attribute": "department", "value": "sales"}, - ], - }, - } - ) - def test_attribute_requirements(self) -> None: - """The required attributes must be met from the SAML response.""" - - # stub out the auth handler - auth_handler = self.hs.get_auth_handler() - auth_handler.complete_sso_login = AsyncMock() # type: ignore[method-assign] - - # The response doesn't have the proper userGroup or department. - saml_response = FakeAuthnResponse({"uid": "test_user", "username": "test_user"}) - request = _mock_request() - self.get_success( - self.handler._handle_authn_response(request, saml_response, "redirect_uri") - ) - auth_handler.complete_sso_login.assert_not_called() - - # The response doesn't have the proper department. - saml_response = FakeAuthnResponse( - {"uid": "test_user", "username": "test_user", "userGroup": ["staff"]} - ) - request = _mock_request() - self.get_success( - self.handler._handle_authn_response(request, saml_response, "redirect_uri") - ) - auth_handler.complete_sso_login.assert_not_called() - - # Add the proper attributes and it should succeed. - saml_response = FakeAuthnResponse( - { - "uid": "test_user", - "username": "test_user", - "userGroup": ["staff", "admin"], - "department": ["sales"], - } - ) - request.reset_mock() - self.get_success( - self.handler._handle_authn_response(request, saml_response, "redirect_uri") - ) - - # check that the auth handler got called as expected - auth_handler.complete_sso_login.assert_called_once_with( - "@test_user:test", - "saml", - request, - "redirect_uri", - None, - new_user=True, - auth_provider_session_id=None, - ) - - @override_config( - { - "saml2_config": { - "attribute_requirements": [ - {"attribute": "userGroup", "one_of": ["staff", "admin"]}, - ], - }, - } - ) - def test_attribute_requirements_one_of(self) -> None: - """The required attributes can be comma-separated.""" - - # stub out the auth handler - auth_handler = self.hs.get_auth_handler() - auth_handler.complete_sso_login = AsyncMock() # type: ignore[method-assign] - - # The response doesn't have the proper department. - saml_response = FakeAuthnResponse( - {"uid": "test_user", "username": "test_user", "userGroup": ["nogroup"]} - ) - request = _mock_request() - self.get_success( - self.handler._handle_authn_response(request, saml_response, "redirect_uri") - ) - auth_handler.complete_sso_login.assert_not_called() - - # Add the proper attributes and it should succeed. - saml_response = FakeAuthnResponse( - {"uid": "test_user", "username": "test_user", "userGroup": ["admin"]} - ) - request.reset_mock() - self.get_success( - self.handler._handle_authn_response(request, saml_response, "redirect_uri") - ) - - # check that the auth handler got called as expected - auth_handler.complete_sso_login.assert_called_once_with( - "@test_user:test", - "saml", - request, - "redirect_uri", - None, - new_user=True, - auth_provider_session_id=None, - ) - - -def _mock_request() -> Mock: - """Returns a mock which will stand in as a SynapseRequest""" - mock = Mock( - spec=[ - "finish", - "getClientAddress", - "getHeader", - "setHeader", - "setResponseCode", - "write", - ] - ) - # `_disconnected` musn't be another `Mock`, otherwise it will be truthy. - mock._disconnected = False - return mock diff --git a/tests/rest/client/test_login.py b/tests/rest/client/test_login.py
index 0cc7f60921..a7e85fbbf1 100644 --- a/tests/rest/client/test_login.py +++ b/tests/rest/client/test_login.py
@@ -56,7 +56,6 @@ from synapse.util import Clock from tests import unittest from tests.handlers.test_oidc import HAS_OIDC -from tests.handlers.test_saml import has_saml2 from tests.rest.client.utils import TEST_OIDC_CONFIG from tests.server import FakeChannel from tests.test_utils.html_parsers import TestHtmlParser @@ -84,18 +83,6 @@ SYNAPSE_SERVER_PUBLIC_HOSTNAME = "synapse" # https://.... PUBLIC_BASEURL = "http://%s/" % (SYNAPSE_SERVER_PUBLIC_HOSTNAME,) -# just enough to tell pysaml2 where to redirect to -SAML_SERVER = "https://test.saml.server/idp/sso" -TEST_SAML_METADATA = """ -<md:EntityDescriptor xmlns:md="urn:oasis:names:tc:SAML:2.0:metadata"> - <md:IDPSSODescriptor protocolSupportEnumeration="urn:oasis:names:tc:SAML:2.0:protocol"> - <md:SingleSignOnService Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect" Location="%(SAML_SERVER)s"/> - </md:IDPSSODescriptor> -</md:EntityDescriptor> -""" % { - "SAML_SERVER": SAML_SERVER, -} - LOGIN_URL = b"/_matrix/client/r0/login" TEST_URL = b"/_matrix/client/r0/account/whoami" @@ -622,7 +609,7 @@ class LoginRestServletTestCase(unittest.HomeserverTestCase): ) -@skip_unless(has_saml2 and HAS_OIDC, "Requires SAML2 and OIDC") +@skip_unless(HAS_OIDC, "Requires OIDC") class MultiSSOTestCase(unittest.HomeserverTestCase): """Tests for homeservers with multiple SSO providers enabled""" @@ -635,14 +622,6 @@ class MultiSSOTestCase(unittest.HomeserverTestCase): config["public_baseurl"] = PUBLIC_BASEURL - config["saml2_config"] = { - "sp_config": { - "metadata": {"inline": [TEST_SAML_METADATA]}, - # use the XMLSecurity backend to avoid relying on xmlsec1 - "crypto_backend": "XMLSecurity", - }, - } - # default OIDC provider config["oidc_config"] = TEST_OIDC_CONFIG @@ -693,7 +672,6 @@ class MultiSSOTestCase(unittest.HomeserverTestCase): self.assertCountEqual( flows["m.login.sso"]["identity_providers"], [ - {"id": "saml", "name": "SAML"}, {"id": "oidc-idp1", "name": "IDP1"}, {"id": "oidc", "name": "OIDC"}, ], @@ -727,55 +705,7 @@ class MultiSSOTestCase(unittest.HomeserverTestCase): self.assertEqual(params["redirectUrl"], [TEST_CLIENT_REDIRECT_URL]) returned_idps.append(params["idp"][0]) - self.assertCountEqual(returned_idps, ["oidc", "oidc-idp1", "saml"]) - - def test_multi_sso_redirect_to_saml(self) -> None: - """If SAML is chosen, should redirect to the SAML server""" - channel = self.make_request( - "GET", - "/_synapse/client/pick_idp?redirectUrl=" - + urllib.parse.quote_plus(TEST_CLIENT_REDIRECT_URL) - + "&idp=saml", - ) - self.assertEqual(channel.code, 302, channel.result) - location_headers = channel.headers.getRawHeaders("Location") - assert location_headers - sso_login_redirect_uri = location_headers[0] - - # it should redirect us to the standard login SSO redirect flow - self.assertEqual( - sso_login_redirect_uri, - self.login_sso_redirect_url_builder.build_login_sso_redirect_uri( - idp_id="saml", client_redirect_url=TEST_CLIENT_REDIRECT_URL - ), - ) - - # follow the redirect - channel = self.make_request( - "GET", - # We have to make this relative to be compatible with `make_request(...)` - get_relative_uri_from_absolute_uri(sso_login_redirect_uri), - # We have to set the Host header to match the `public_baseurl` to avoid - # the extra redirect in the `SsoRedirectServlet` in order for the - # cookies to be visible. - custom_headers=[ - ("Host", SYNAPSE_SERVER_PUBLIC_HOSTNAME), - ], - ) - - self.assertEqual(channel.code, 302, channel.result) - location_headers = channel.headers.getRawHeaders("Location") - assert location_headers - saml_uri = location_headers[0] - saml_uri_path, saml_uri_query = saml_uri.split("?", 1) - - # it should redirect us to the login page of the SAML server - self.assertEqual(saml_uri_path, SAML_SERVER) - - # the RelayState is used to carry the client redirect url - saml_uri_params = urllib.parse.parse_qs(saml_uri_query) - relay_state_param = saml_uri_params["RelayState"][0] - self.assertEqual(relay_state_param, TEST_CLIENT_REDIRECT_URL) + self.assertCountEqual(returned_idps, ["oidc", "oidc-idp1"]) def test_login_via_oidc(self) -> None: """If OIDC is chosen, should redirect to the OIDC auth endpoint""" diff --git a/tests/utils.py b/tests/utils.py
index 0006bd7a8d..5a57c015a9 100644 --- a/tests/utils.py +++ b/tests/utils.py
@@ -202,7 +202,6 @@ def default_config( }, "rc_3pid_validation": {"per_second": 10000, "burst_count": 10000}, "rc_presence": {"per_user": {"per_second": 10000, "burst_count": 10000}}, - "saml2_enabled": False, "public_baseurl": None, "default_identity_server": None, "key_refresh_interval": 24 * 60 * 60 * 1000,