diff --git a/tests/handlers/test_saml.py b/tests/handlers/test_saml.py
deleted file mode 100644
index 1aca354826..0000000000
--- a/tests/handlers/test_saml.py
+++ /dev/null
@@ -1,427 +0,0 @@
-#
-# This file is licensed under the Affero General Public License (AGPL) version 3.
-#
-# Copyright 2020 The Matrix.org Foundation C.I.C.
-# Copyright (C) 2023 New Vector, Ltd
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Affero General Public License as
-# published by the Free Software Foundation, either version 3 of the
-# License, or (at your option) any later version.
-#
-# See the GNU Affero General Public License for more details:
-# <https://www.gnu.org/licenses/agpl-3.0.html>.
-#
-# Originally licensed under the Apache License, Version 2.0:
-# <http://www.apache.org/licenses/LICENSE-2.0>.
-#
-# [This file includes modifications made by New Vector Limited]
-#
-#
-
-from typing import Any, Dict, Optional, Set, Tuple
-from unittest.mock import AsyncMock, Mock
-
-import attr
-
-from twisted.test.proto_helpers import MemoryReactor
-
-from synapse.api.errors import RedirectException
-from synapse.module_api import ModuleApi
-from synapse.server import HomeServer
-from synapse.types import JsonDict
-from synapse.util import Clock
-
-from tests.unittest import HomeserverTestCase, override_config
-
-# Check if we have the dependencies to run the tests.
-try:
- import saml2.config
- import saml2.response
- from saml2.sigver import SigverError
-
- has_saml2 = True
-
- # pysaml2 can be installed and imported, but might not be able to find xmlsec1.
- config = saml2.config.SPConfig()
- try:
- config.load({"metadata": {}})
- has_xmlsec1 = True
- except SigverError:
- has_xmlsec1 = False
-except ImportError:
- has_saml2 = False
- has_xmlsec1 = False
-
-# These are a few constants that are used as config parameters in the tests.
-BASE_URL = "https://synapse/"
-
-
-@attr.s
-class FakeAuthnResponse:
- ava = attr.ib(type=dict)
- assertions = attr.ib(type=list, factory=list)
- in_response_to = attr.ib(type=Optional[str], default=None)
-
-
-class TestMappingProvider:
- def __init__(self, config: None, module: ModuleApi):
- pass
-
- @staticmethod
- def parse_config(config: JsonDict) -> None:
- return None
-
- @staticmethod
- def get_saml_attributes(config: None) -> Tuple[Set[str], Set[str]]:
- return {"uid"}, {"displayName"}
-
- def get_remote_user_id(
- self, saml_response: "saml2.response.AuthnResponse", client_redirect_url: str
- ) -> str:
- return saml_response.ava["uid"]
-
- def saml_response_to_user_attributes(
- self,
- saml_response: "saml2.response.AuthnResponse",
- failures: int,
- client_redirect_url: str,
- ) -> dict:
- localpart = saml_response.ava["username"] + (str(failures) if failures else "")
- return {"mxid_localpart": localpart, "displayname": None}
-
-
-class TestRedirectMappingProvider(TestMappingProvider):
- def saml_response_to_user_attributes(
- self,
- saml_response: "saml2.response.AuthnResponse",
- failures: int,
- client_redirect_url: str,
- ) -> dict:
- raise RedirectException(b"https://custom-saml-redirect/")
-
-
-class SamlHandlerTestCase(HomeserverTestCase):
- def default_config(self) -> Dict[str, Any]:
- config = super().default_config()
- config["public_baseurl"] = BASE_URL
- saml_config: Dict[str, Any] = {
- "sp_config": {"metadata": {}},
- # Disable grandfathering.
- "grandfathered_mxid_source_attribute": None,
- "user_mapping_provider": {"module": __name__ + ".TestMappingProvider"},
- }
-
- # Update this config with what's in the default config so that
- # override_config works as expected.
- saml_config.update(config.get("saml2_config", {}))
- config["saml2_config"] = saml_config
-
- return config
-
- def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
- hs = self.setup_test_homeserver()
-
- self.handler = hs.get_saml_handler()
-
- # Reduce the number of attempts when generating MXIDs.
- sso_handler = hs.get_sso_handler()
- sso_handler._MAP_USERNAME_RETRIES = 3
-
- return hs
-
- if not has_saml2:
- skip = "Requires pysaml2"
- elif not has_xmlsec1:
- skip = "Requires xmlsec1"
-
- def test_map_saml_response_to_user(self) -> None:
- """Ensure that mapping the SAML response returned from a provider to an MXID works properly."""
-
- # stub out the auth handler
- auth_handler = self.hs.get_auth_handler()
- auth_handler.complete_sso_login = AsyncMock() # type: ignore[method-assign]
-
- # send a mocked-up SAML response to the callback
- saml_response = FakeAuthnResponse({"uid": "test_user", "username": "test_user"})
- request = _mock_request()
- self.get_success(
- self.handler._handle_authn_response(request, saml_response, "redirect_uri")
- )
-
- # check that the auth handler got called as expected
- auth_handler.complete_sso_login.assert_called_once_with(
- "@test_user:test",
- "saml",
- request,
- "redirect_uri",
- None,
- new_user=True,
- auth_provider_session_id=None,
- )
-
- @override_config({"saml2_config": {"grandfathered_mxid_source_attribute": "mxid"}})
- def test_map_saml_response_to_existing_user(self) -> None:
- """Existing users can log in with SAML account."""
- store = self.hs.get_datastores().main
- self.get_success(
- store.register_user(user_id="@test_user:test", password_hash=None)
- )
-
- # stub out the auth handler
- auth_handler = self.hs.get_auth_handler()
- auth_handler.complete_sso_login = AsyncMock() # type: ignore[method-assign]
-
- # Map a user via SSO.
- saml_response = FakeAuthnResponse(
- {"uid": "tester", "mxid": ["test_user"], "username": "test_user"}
- )
- request = _mock_request()
- self.get_success(
- self.handler._handle_authn_response(request, saml_response, "")
- )
-
- # check that the auth handler got called as expected
- auth_handler.complete_sso_login.assert_called_once_with(
- "@test_user:test",
- "saml",
- request,
- "",
- None,
- new_user=False,
- auth_provider_session_id=None,
- )
-
- # Subsequent calls should map to the same mxid.
- auth_handler.complete_sso_login.reset_mock()
- self.get_success(
- self.handler._handle_authn_response(request, saml_response, "")
- )
- auth_handler.complete_sso_login.assert_called_once_with(
- "@test_user:test",
- "saml",
- request,
- "",
- None,
- new_user=False,
- auth_provider_session_id=None,
- )
-
- def test_map_saml_response_to_invalid_localpart(self) -> None:
- """If the mapping provider generates an invalid localpart it should be rejected."""
-
- # stub out the auth handler
- auth_handler = self.hs.get_auth_handler()
- auth_handler.complete_sso_login = AsyncMock() # type: ignore[method-assign]
-
- # mock out the error renderer too
- sso_handler = self.hs.get_sso_handler()
- sso_handler.render_error = Mock(return_value=None) # type: ignore[method-assign]
-
- saml_response = FakeAuthnResponse({"uid": "test", "username": "föö"})
- request = _mock_request()
- self.get_success(
- self.handler._handle_authn_response(request, saml_response, ""),
- )
- sso_handler.render_error.assert_called_once_with(
- request, "mapping_error", "localpart is invalid: föö"
- )
- auth_handler.complete_sso_login.assert_not_called()
-
- def test_map_saml_response_to_user_retries(self) -> None:
- """The mapping provider can retry generating an MXID if the MXID is already in use."""
-
- # stub out the auth handler and error renderer
- auth_handler = self.hs.get_auth_handler()
- auth_handler.complete_sso_login = AsyncMock() # type: ignore[method-assign]
- sso_handler = self.hs.get_sso_handler()
- sso_handler.render_error = Mock(return_value=None) # type: ignore[method-assign]
-
- # register a user to occupy the first-choice MXID
- store = self.hs.get_datastores().main
- self.get_success(
- store.register_user(user_id="@test_user:test", password_hash=None)
- )
-
- # send the fake SAML response
- saml_response = FakeAuthnResponse({"uid": "test", "username": "test_user"})
- request = _mock_request()
- self.get_success(
- self.handler._handle_authn_response(request, saml_response, ""),
- )
-
- # test_user is already taken, so test_user1 gets registered instead.
- auth_handler.complete_sso_login.assert_called_once_with(
- "@test_user1:test",
- "saml",
- request,
- "",
- None,
- new_user=True,
- auth_provider_session_id=None,
- )
- auth_handler.complete_sso_login.reset_mock()
-
- # Register all of the potential mxids for a particular SAML username.
- self.get_success(
- store.register_user(user_id="@tester:test", password_hash=None)
- )
- for i in range(1, 3):
- self.get_success(
- store.register_user(user_id="@tester%d:test" % i, password_hash=None)
- )
-
- # Now attempt to map to a username, this will fail since all potential usernames are taken.
- saml_response = FakeAuthnResponse({"uid": "tester", "username": "tester"})
- self.get_success(
- self.handler._handle_authn_response(request, saml_response, ""),
- )
- sso_handler.render_error.assert_called_once_with(
- request,
- "mapping_error",
- "Unable to generate a Matrix ID from the SSO response",
- )
- auth_handler.complete_sso_login.assert_not_called()
-
- @override_config(
- {
- "saml2_config": {
- "user_mapping_provider": {
- "module": __name__ + ".TestRedirectMappingProvider"
- },
- }
- }
- )
- def test_map_saml_response_redirect(self) -> None:
- """Test a mapping provider that raises a RedirectException"""
-
- saml_response = FakeAuthnResponse({"uid": "test", "username": "test_user"})
- request = _mock_request()
- e = self.get_failure(
- self.handler._handle_authn_response(request, saml_response, ""),
- RedirectException,
- )
- self.assertEqual(e.value.location, b"https://custom-saml-redirect/")
-
- @override_config(
- {
- "saml2_config": {
- "attribute_requirements": [
- {"attribute": "userGroup", "value": "staff"},
- {"attribute": "department", "value": "sales"},
- ],
- },
- }
- )
- def test_attribute_requirements(self) -> None:
- """The required attributes must be met from the SAML response."""
-
- # stub out the auth handler
- auth_handler = self.hs.get_auth_handler()
- auth_handler.complete_sso_login = AsyncMock() # type: ignore[method-assign]
-
- # The response doesn't have the proper userGroup or department.
- saml_response = FakeAuthnResponse({"uid": "test_user", "username": "test_user"})
- request = _mock_request()
- self.get_success(
- self.handler._handle_authn_response(request, saml_response, "redirect_uri")
- )
- auth_handler.complete_sso_login.assert_not_called()
-
- # The response doesn't have the proper department.
- saml_response = FakeAuthnResponse(
- {"uid": "test_user", "username": "test_user", "userGroup": ["staff"]}
- )
- request = _mock_request()
- self.get_success(
- self.handler._handle_authn_response(request, saml_response, "redirect_uri")
- )
- auth_handler.complete_sso_login.assert_not_called()
-
- # Add the proper attributes and it should succeed.
- saml_response = FakeAuthnResponse(
- {
- "uid": "test_user",
- "username": "test_user",
- "userGroup": ["staff", "admin"],
- "department": ["sales"],
- }
- )
- request.reset_mock()
- self.get_success(
- self.handler._handle_authn_response(request, saml_response, "redirect_uri")
- )
-
- # check that the auth handler got called as expected
- auth_handler.complete_sso_login.assert_called_once_with(
- "@test_user:test",
- "saml",
- request,
- "redirect_uri",
- None,
- new_user=True,
- auth_provider_session_id=None,
- )
-
- @override_config(
- {
- "saml2_config": {
- "attribute_requirements": [
- {"attribute": "userGroup", "one_of": ["staff", "admin"]},
- ],
- },
- }
- )
- def test_attribute_requirements_one_of(self) -> None:
- """The required attributes can be comma-separated."""
-
- # stub out the auth handler
- auth_handler = self.hs.get_auth_handler()
- auth_handler.complete_sso_login = AsyncMock() # type: ignore[method-assign]
-
- # The response doesn't have the proper department.
- saml_response = FakeAuthnResponse(
- {"uid": "test_user", "username": "test_user", "userGroup": ["nogroup"]}
- )
- request = _mock_request()
- self.get_success(
- self.handler._handle_authn_response(request, saml_response, "redirect_uri")
- )
- auth_handler.complete_sso_login.assert_not_called()
-
- # Add the proper attributes and it should succeed.
- saml_response = FakeAuthnResponse(
- {"uid": "test_user", "username": "test_user", "userGroup": ["admin"]}
- )
- request.reset_mock()
- self.get_success(
- self.handler._handle_authn_response(request, saml_response, "redirect_uri")
- )
-
- # check that the auth handler got called as expected
- auth_handler.complete_sso_login.assert_called_once_with(
- "@test_user:test",
- "saml",
- request,
- "redirect_uri",
- None,
- new_user=True,
- auth_provider_session_id=None,
- )
-
-
-def _mock_request() -> Mock:
- """Returns a mock which will stand in as a SynapseRequest"""
- mock = Mock(
- spec=[
- "finish",
- "getClientAddress",
- "getHeader",
- "setHeader",
- "setResponseCode",
- "write",
- ]
- )
- # `_disconnected` musn't be another `Mock`, otherwise it will be truthy.
- mock._disconnected = False
- return mock
diff --git a/tests/rest/client/test_login.py b/tests/rest/client/test_login.py
index 0cc7f60921..a7e85fbbf1 100644
--- a/tests/rest/client/test_login.py
+++ b/tests/rest/client/test_login.py
@@ -56,7 +56,6 @@ from synapse.util import Clock
from tests import unittest
from tests.handlers.test_oidc import HAS_OIDC
-from tests.handlers.test_saml import has_saml2
from tests.rest.client.utils import TEST_OIDC_CONFIG
from tests.server import FakeChannel
from tests.test_utils.html_parsers import TestHtmlParser
@@ -84,18 +83,6 @@ SYNAPSE_SERVER_PUBLIC_HOSTNAME = "synapse"
# https://....
PUBLIC_BASEURL = "http://%s/" % (SYNAPSE_SERVER_PUBLIC_HOSTNAME,)
-# just enough to tell pysaml2 where to redirect to
-SAML_SERVER = "https://test.saml.server/idp/sso"
-TEST_SAML_METADATA = """
-<md:EntityDescriptor xmlns:md="urn:oasis:names:tc:SAML:2.0:metadata">
- <md:IDPSSODescriptor protocolSupportEnumeration="urn:oasis:names:tc:SAML:2.0:protocol">
- <md:SingleSignOnService Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect" Location="%(SAML_SERVER)s"/>
- </md:IDPSSODescriptor>
-</md:EntityDescriptor>
-""" % {
- "SAML_SERVER": SAML_SERVER,
-}
-
LOGIN_URL = b"/_matrix/client/r0/login"
TEST_URL = b"/_matrix/client/r0/account/whoami"
@@ -622,7 +609,7 @@ class LoginRestServletTestCase(unittest.HomeserverTestCase):
)
-@skip_unless(has_saml2 and HAS_OIDC, "Requires SAML2 and OIDC")
+@skip_unless(HAS_OIDC, "Requires OIDC")
class MultiSSOTestCase(unittest.HomeserverTestCase):
"""Tests for homeservers with multiple SSO providers enabled"""
@@ -635,14 +622,6 @@ class MultiSSOTestCase(unittest.HomeserverTestCase):
config["public_baseurl"] = PUBLIC_BASEURL
- config["saml2_config"] = {
- "sp_config": {
- "metadata": {"inline": [TEST_SAML_METADATA]},
- # use the XMLSecurity backend to avoid relying on xmlsec1
- "crypto_backend": "XMLSecurity",
- },
- }
-
# default OIDC provider
config["oidc_config"] = TEST_OIDC_CONFIG
@@ -693,7 +672,6 @@ class MultiSSOTestCase(unittest.HomeserverTestCase):
self.assertCountEqual(
flows["m.login.sso"]["identity_providers"],
[
- {"id": "saml", "name": "SAML"},
{"id": "oidc-idp1", "name": "IDP1"},
{"id": "oidc", "name": "OIDC"},
],
@@ -727,55 +705,7 @@ class MultiSSOTestCase(unittest.HomeserverTestCase):
self.assertEqual(params["redirectUrl"], [TEST_CLIENT_REDIRECT_URL])
returned_idps.append(params["idp"][0])
- self.assertCountEqual(returned_idps, ["oidc", "oidc-idp1", "saml"])
-
- def test_multi_sso_redirect_to_saml(self) -> None:
- """If SAML is chosen, should redirect to the SAML server"""
- channel = self.make_request(
- "GET",
- "/_synapse/client/pick_idp?redirectUrl="
- + urllib.parse.quote_plus(TEST_CLIENT_REDIRECT_URL)
- + "&idp=saml",
- )
- self.assertEqual(channel.code, 302, channel.result)
- location_headers = channel.headers.getRawHeaders("Location")
- assert location_headers
- sso_login_redirect_uri = location_headers[0]
-
- # it should redirect us to the standard login SSO redirect flow
- self.assertEqual(
- sso_login_redirect_uri,
- self.login_sso_redirect_url_builder.build_login_sso_redirect_uri(
- idp_id="saml", client_redirect_url=TEST_CLIENT_REDIRECT_URL
- ),
- )
-
- # follow the redirect
- channel = self.make_request(
- "GET",
- # We have to make this relative to be compatible with `make_request(...)`
- get_relative_uri_from_absolute_uri(sso_login_redirect_uri),
- # We have to set the Host header to match the `public_baseurl` to avoid
- # the extra redirect in the `SsoRedirectServlet` in order for the
- # cookies to be visible.
- custom_headers=[
- ("Host", SYNAPSE_SERVER_PUBLIC_HOSTNAME),
- ],
- )
-
- self.assertEqual(channel.code, 302, channel.result)
- location_headers = channel.headers.getRawHeaders("Location")
- assert location_headers
- saml_uri = location_headers[0]
- saml_uri_path, saml_uri_query = saml_uri.split("?", 1)
-
- # it should redirect us to the login page of the SAML server
- self.assertEqual(saml_uri_path, SAML_SERVER)
-
- # the RelayState is used to carry the client redirect url
- saml_uri_params = urllib.parse.parse_qs(saml_uri_query)
- relay_state_param = saml_uri_params["RelayState"][0]
- self.assertEqual(relay_state_param, TEST_CLIENT_REDIRECT_URL)
+ self.assertCountEqual(returned_idps, ["oidc", "oidc-idp1"])
def test_login_via_oidc(self) -> None:
"""If OIDC is chosen, should redirect to the OIDC auth endpoint"""
diff --git a/tests/utils.py b/tests/utils.py
index 0006bd7a8d..5a57c015a9 100644
--- a/tests/utils.py
+++ b/tests/utils.py
@@ -202,7 +202,6 @@ def default_config(
},
"rc_3pid_validation": {"per_second": 10000, "burst_count": 10000},
"rc_presence": {"per_user": {"per_second": 10000, "burst_count": 10000}},
- "saml2_enabled": False,
"public_baseurl": None,
"default_identity_server": None,
"key_refresh_interval": 24 * 60 * 60 * 1000,
|