diff --git a/tests/config/test_oauth_delegation.py b/tests/config/test_oauth_delegation.py
index 713bddeb90..45defcd437 100644
--- a/tests/config/test_oauth_delegation.py
+++ b/tests/config/test_oauth_delegation.py
@@ -205,17 +205,6 @@ class MSC3861OAuthDelegation(TestCase):
with self.assertRaises(ConfigError):
self.parse_config()
- def test_cas_sso_cannot_be_enabled(self) -> None:
- self.config_dict["cas_config"] = {
- "enabled": True,
- "server_url": "https://cas-server.com",
- "displayname_attribute": "name",
- "required_attributes": {"userGroup": "staff", "department": "None"},
- }
-
- with self.assertRaises(ConfigError):
- self.parse_config()
-
def test_auth_providers_cannot_be_enabled(self) -> None:
self.config_dict["modules"] = [
{
diff --git a/tests/handlers/test_cas.py b/tests/handlers/test_cas.py
deleted file mode 100644
index f41f7d36ad..0000000000
--- a/tests/handlers/test_cas.py
+++ /dev/null
@@ -1,239 +0,0 @@
-#
-# This file is licensed under the Affero General Public License (AGPL) version 3.
-#
-# Copyright 2020 The Matrix.org Foundation C.I.C.
-# Copyright (C) 2023 New Vector, Ltd
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Affero General Public License as
-# published by the Free Software Foundation, either version 3 of the
-# License, or (at your option) any later version.
-#
-# See the GNU Affero General Public License for more details:
-# <https://www.gnu.org/licenses/agpl-3.0.html>.
-#
-# Originally licensed under the Apache License, Version 2.0:
-# <http://www.apache.org/licenses/LICENSE-2.0>.
-#
-# [This file includes modifications made by New Vector Limited]
-#
-#
-from typing import Any, Dict
-from unittest.mock import AsyncMock, Mock
-
-from twisted.test.proto_helpers import MemoryReactor
-
-from synapse.handlers.cas import CasResponse
-from synapse.server import HomeServer
-from synapse.util import Clock
-
-from tests.unittest import HomeserverTestCase, override_config
-
-# These are a few constants that are used as config parameters in the tests.
-BASE_URL = "https://synapse/"
-SERVER_URL = "https://issuer/"
-
-
-class CasHandlerTestCase(HomeserverTestCase):
- def default_config(self) -> Dict[str, Any]:
- config = super().default_config()
- config["public_baseurl"] = BASE_URL
- cas_config = {
- "enabled": True,
- "server_url": SERVER_URL,
- "service_url": BASE_URL,
- }
-
- # Update this config with what's in the default config so that
- # override_config works as expected.
- cas_config.update(config.get("cas_config", {}))
- config["cas_config"] = cas_config
-
- return config
-
- def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
- hs = self.setup_test_homeserver()
-
- self.handler = hs.get_cas_handler()
-
- # Reduce the number of attempts when generating MXIDs.
- sso_handler = hs.get_sso_handler()
- sso_handler._MAP_USERNAME_RETRIES = 3
-
- return hs
-
- def test_map_cas_user_to_user(self) -> None:
- """Ensure that mapping the CAS user returned from a provider to an MXID works properly."""
-
- # stub out the auth handler
- auth_handler = self.hs.get_auth_handler()
- auth_handler.complete_sso_login = AsyncMock() # type: ignore[method-assign]
-
- cas_response = CasResponse("test_user", {})
- request = _mock_request()
- self.get_success(
- self.handler._handle_cas_response(request, cas_response, "redirect_uri", "")
- )
-
- # check that the auth handler got called as expected
- auth_handler.complete_sso_login.assert_called_once_with(
- "@test_user:test",
- "cas",
- request,
- "redirect_uri",
- None,
- new_user=True,
- auth_provider_session_id=None,
- )
-
- def test_map_cas_user_to_existing_user(self) -> None:
- """Existing users can log in with CAS account."""
- store = self.hs.get_datastores().main
- self.get_success(
- store.register_user(user_id="@test_user:test", password_hash=None)
- )
-
- # stub out the auth handler
- auth_handler = self.hs.get_auth_handler()
- auth_handler.complete_sso_login = AsyncMock() # type: ignore[method-assign]
-
- # Map a user via SSO.
- cas_response = CasResponse("test_user", {})
- request = _mock_request()
- self.get_success(
- self.handler._handle_cas_response(request, cas_response, "redirect_uri", "")
- )
-
- # check that the auth handler got called as expected
- auth_handler.complete_sso_login.assert_called_once_with(
- "@test_user:test",
- "cas",
- request,
- "redirect_uri",
- None,
- new_user=False,
- auth_provider_session_id=None,
- )
-
- # Subsequent calls should map to the same mxid.
- auth_handler.complete_sso_login.reset_mock()
- self.get_success(
- self.handler._handle_cas_response(request, cas_response, "redirect_uri", "")
- )
- auth_handler.complete_sso_login.assert_called_once_with(
- "@test_user:test",
- "cas",
- request,
- "redirect_uri",
- None,
- new_user=False,
- auth_provider_session_id=None,
- )
-
- def test_map_cas_user_to_invalid_localpart(self) -> None:
- """CAS automaps invalid characters to base-64 encoding."""
-
- # stub out the auth handler
- auth_handler = self.hs.get_auth_handler()
- auth_handler.complete_sso_login = AsyncMock() # type: ignore[method-assign]
-
- cas_response = CasResponse("föö", {})
- request = _mock_request()
- self.get_success(
- self.handler._handle_cas_response(request, cas_response, "redirect_uri", "")
- )
-
- # check that the auth handler got called as expected
- auth_handler.complete_sso_login.assert_called_once_with(
- "@f=c3=b6=c3=b6:test",
- "cas",
- request,
- "redirect_uri",
- None,
- new_user=True,
- auth_provider_session_id=None,
- )
-
- @override_config(
- {
- "cas_config": {
- "required_attributes": {"userGroup": "staff", "department": None}
- }
- }
- )
- def test_required_attributes(self) -> None:
- """The required attributes must be met from the CAS response."""
-
- # stub out the auth handler
- auth_handler = self.hs.get_auth_handler()
- auth_handler.complete_sso_login = AsyncMock() # type: ignore[method-assign]
-
- # The response doesn't have the proper userGroup or department.
- cas_response = CasResponse("test_user", {})
- request = _mock_request()
- self.get_success(
- self.handler._handle_cas_response(request, cas_response, "redirect_uri", "")
- )
- auth_handler.complete_sso_login.assert_not_called()
-
- # The response doesn't have any department.
- cas_response = CasResponse("test_user", {"userGroup": ["staff"]})
- request.reset_mock()
- self.get_success(
- self.handler._handle_cas_response(request, cas_response, "redirect_uri", "")
- )
- auth_handler.complete_sso_login.assert_not_called()
-
- # Add the proper attributes and it should succeed.
- cas_response = CasResponse(
- "test_user", {"userGroup": ["staff", "admin"], "department": ["sales"]}
- )
- request.reset_mock()
- self.get_success(
- self.handler._handle_cas_response(request, cas_response, "redirect_uri", "")
- )
-
- # check that the auth handler got called as expected
- auth_handler.complete_sso_login.assert_called_once_with(
- "@test_user:test",
- "cas",
- request,
- "redirect_uri",
- None,
- new_user=True,
- auth_provider_session_id=None,
- )
-
- @override_config({"cas_config": {"enable_registration": False}})
- def test_map_cas_user_does_not_register_new_user(self) -> None:
- """Ensures new users are not registered if the enabled registration flag is disabled."""
-
- # stub out the auth handler
- auth_handler = self.hs.get_auth_handler()
- auth_handler.complete_sso_login = AsyncMock() # type: ignore[method-assign]
-
- cas_response = CasResponse("test_user", {})
- request = _mock_request()
- self.get_success(
- self.handler._handle_cas_response(request, cas_response, "redirect_uri", "")
- )
-
- # check that the auth handler was not called as expected
- auth_handler.complete_sso_login.assert_not_called()
-
-
-def _mock_request() -> Mock:
- """Returns a mock which will stand in as a SynapseRequest"""
- mock = Mock(
- spec=[
- "finish",
- "getClientAddress",
- "getHeader",
- "setHeader",
- "setResponseCode",
- "write",
- ]
- )
- # `_disconnected` musn't be another `Mock`, otherwise it will be truthy.
- mock._disconnected = False
- return mock
diff --git a/tests/rest/client/test_login.py b/tests/rest/client/test_login.py
index c5c6604667..0cc7f60921 100644
--- a/tests/rest/client/test_login.py
+++ b/tests/rest/client/test_login.py
@@ -84,9 +84,6 @@ SYNAPSE_SERVER_PUBLIC_HOSTNAME = "synapse"
# https://....
PUBLIC_BASEURL = "http://%s/" % (SYNAPSE_SERVER_PUBLIC_HOSTNAME,)
-# CAS server used in some tests
-CAS_SERVER = "https://fake.test"
-
# just enough to tell pysaml2 where to redirect to
SAML_SERVER = "https://test.saml.server/idp/sso"
TEST_SAML_METADATA = """
@@ -638,12 +635,6 @@ class MultiSSOTestCase(unittest.HomeserverTestCase):
config["public_baseurl"] = PUBLIC_BASEURL
- config["cas_config"] = {
- "enabled": True,
- "server_url": CAS_SERVER,
- "service_url": "https://matrix.goodserver.com:8448",
- }
-
config["saml2_config"] = {
"sp_config": {
"metadata": {"inline": [TEST_SAML_METADATA]},
@@ -689,7 +680,6 @@ class MultiSSOTestCase(unittest.HomeserverTestCase):
self.assertEqual(channel.code, 200, channel.result)
expected_flow_types = [
- "m.login.cas",
"m.login.sso",
"m.login.token",
"m.login.password",
@@ -703,7 +693,6 @@ class MultiSSOTestCase(unittest.HomeserverTestCase):
self.assertCountEqual(
flows["m.login.sso"]["identity_providers"],
[
- {"id": "cas", "name": "CAS"},
{"id": "saml", "name": "SAML"},
{"id": "oidc-idp1", "name": "IDP1"},
{"id": "oidc", "name": "OIDC"},
@@ -738,60 +727,7 @@ class MultiSSOTestCase(unittest.HomeserverTestCase):
self.assertEqual(params["redirectUrl"], [TEST_CLIENT_REDIRECT_URL])
returned_idps.append(params["idp"][0])
- self.assertCountEqual(returned_idps, ["cas", "oidc", "oidc-idp1", "saml"])
-
- def test_multi_sso_redirect_to_cas(self) -> None:
- """If CAS is chosen, should redirect to the CAS server"""
-
- channel = self.make_request(
- "GET",
- "/_synapse/client/pick_idp?redirectUrl="
- + urllib.parse.quote_plus(TEST_CLIENT_REDIRECT_URL)
- + "&idp=cas",
- shorthand=False,
- )
- self.assertEqual(channel.code, 302, channel.result)
- location_headers = channel.headers.getRawHeaders("Location")
- assert location_headers
- sso_login_redirect_uri = location_headers[0]
-
- # it should redirect us to the standard login SSO redirect flow
- self.assertEqual(
- sso_login_redirect_uri,
- self.login_sso_redirect_url_builder.build_login_sso_redirect_uri(
- idp_id="cas", client_redirect_url=TEST_CLIENT_REDIRECT_URL
- ),
- )
-
- # follow the redirect
- channel = self.make_request(
- "GET",
- # We have to make this relative to be compatible with `make_request(...)`
- get_relative_uri_from_absolute_uri(sso_login_redirect_uri),
- # We have to set the Host header to match the `public_baseurl` to avoid
- # the extra redirect in the `SsoRedirectServlet` in order for the
- # cookies to be visible.
- custom_headers=[
- ("Host", SYNAPSE_SERVER_PUBLIC_HOSTNAME),
- ],
- )
-
- self.assertEqual(channel.code, 302, channel.result)
- location_headers = channel.headers.getRawHeaders("Location")
- assert location_headers
- cas_uri = location_headers[0]
- cas_uri_path, cas_uri_query = cas_uri.split("?", 1)
-
- # it should redirect us to the login page of the cas server
- self.assertEqual(cas_uri_path, CAS_SERVER + "/login")
-
- # check that the redirectUrl is correctly encoded in the service param - ie, the
- # place that CAS will redirect to
- cas_uri_params = urllib.parse.parse_qs(cas_uri_query)
- service_uri = cas_uri_params["service"][0]
- _, service_uri_query = service_uri.split("?", 1)
- service_uri_params = urllib.parse.parse_qs(service_uri_query)
- self.assertEqual(service_uri_params["redirectUrl"][0], TEST_CLIENT_REDIRECT_URL)
+ self.assertCountEqual(returned_idps, ["oidc", "oidc-idp1", "saml"])
def test_multi_sso_redirect_to_saml(self) -> None:
"""If SAML is chosen, should redirect to the SAML server"""
@@ -1019,157 +955,6 @@ class MultiSSOTestCase(unittest.HomeserverTestCase):
raise ValueError("No %s caveat in macaroon" % (key,))
-class CASTestCase(unittest.HomeserverTestCase):
- servlets = [
- login.register_servlets,
- ]
-
- def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
- self.base_url = "https://matrix.goodserver.com/"
- self.redirect_path = "_synapse/client/login/sso/redirect/confirm"
-
- config = self.default_config()
- config["public_baseurl"] = (
- config.get("public_baseurl") or "https://matrix.goodserver.com:8448"
- )
- config["cas_config"] = {
- "enabled": True,
- "server_url": CAS_SERVER,
- }
-
- cas_user_id = "username"
- self.user_id = "@%s:test" % cas_user_id
-
- async def get_raw(uri: str, args: Any) -> bytes:
- """Return an example response payload from a call to the `/proxyValidate`
- endpoint of a CAS server, copied from
- https://apereo.github.io/cas/5.0.x/protocol/CAS-Protocol-V2-Specification.html#26-proxyvalidate-cas-20
-
- This needs to be returned by an async function (as opposed to set as the
- mock's return value) because the corresponding Synapse code awaits on it.
- """
- return (
- """
- <cas:serviceResponse xmlns:cas='http://www.yale.edu/tp/cas'>
- <cas:authenticationSuccess>
- <cas:user>%s</cas:user>
- <cas:proxyGrantingTicket>PGTIOU-84678-8a9d...</cas:proxyGrantingTicket>
- <cas:proxies>
- <cas:proxy>https://proxy2/pgtUrl</cas:proxy>
- <cas:proxy>https://proxy1/pgtUrl</cas:proxy>
- </cas:proxies>
- </cas:authenticationSuccess>
- </cas:serviceResponse>
- """
- % cas_user_id
- ).encode("utf-8")
-
- mocked_http_client = Mock(spec=["get_raw"])
- mocked_http_client.get_raw.side_effect = get_raw
-
- self.hs = self.setup_test_homeserver(
- config=config,
- proxied_http_client=mocked_http_client,
- )
-
- return self.hs
-
- def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
- self.deactivate_account_handler = hs.get_deactivate_account_handler()
-
- def test_cas_redirect_confirm(self) -> None:
- """Tests that the SSO login flow serves a confirmation page before redirecting a
- user to the redirect URL.
- """
- base_url = "/_matrix/client/r0/login/cas/ticket?redirectUrl"
- redirect_url = "https://dodgy-site.com/"
-
- url_parts = list(urllib.parse.urlparse(base_url))
- query = dict(urllib.parse.parse_qsl(url_parts[4]))
- query.update({"redirectUrl": redirect_url})
- query.update({"ticket": "ticket"})
- url_parts[4] = urllib.parse.urlencode(query)
- cas_ticket_url = urllib.parse.urlunparse(url_parts)
-
- # Get Synapse to call the fake CAS and serve the template.
- channel = self.make_request("GET", cas_ticket_url)
-
- # Test that the response is HTML.
- self.assertEqual(channel.code, 200, channel.result)
- content_type_header_value = ""
- for header in channel.headers.getRawHeaders("Content-Type", []):
- content_type_header_value = header
-
- self.assertTrue(content_type_header_value.startswith("text/html"))
-
- # Test that the body isn't empty.
- self.assertTrue(len(channel.result["body"]) > 0)
-
- # And that it contains our redirect link
- self.assertIn(redirect_url, channel.result["body"].decode("UTF-8"))
-
- @override_config(
- {
- "sso": {
- "client_whitelist": [
- "https://legit-site.com/",
- "https://other-site.com/",
- ]
- }
- }
- )
- def test_cas_redirect_whitelisted(self) -> None:
- """Tests that the SSO login flow serves a redirect to a whitelisted url"""
- self._test_redirect("https://legit-site.com/")
-
- @override_config({"public_baseurl": "https://example.com"})
- def test_cas_redirect_login_fallback(self) -> None:
- self._test_redirect("https://example.com/_matrix/static/client/login")
-
- def _test_redirect(self, redirect_url: str) -> None:
- """Tests that the SSO login flow serves a redirect for the given redirect URL."""
- cas_ticket_url = (
- "/_matrix/client/r0/login/cas/ticket?redirectUrl=%s&ticket=ticket"
- % (urllib.parse.quote(redirect_url))
- )
-
- # Get Synapse to call the fake CAS and serve the template.
- channel = self.make_request("GET", cas_ticket_url)
-
- self.assertEqual(channel.code, 302)
- location_headers = channel.headers.getRawHeaders("Location")
- assert location_headers
- self.assertEqual(location_headers[0][: len(redirect_url)], redirect_url)
-
- @override_config({"sso": {"client_whitelist": ["https://legit-site.com/"]}})
- def test_deactivated_user(self) -> None:
- """Logging in as a deactivated account should error."""
- redirect_url = "https://legit-site.com/"
-
- # First login (to create the user).
- self._test_redirect(redirect_url)
-
- # Deactivate the account.
- self.get_success(
- self.deactivate_account_handler.deactivate_account(
- self.user_id, False, create_requester(self.user_id)
- )
- )
-
- # Request the CAS ticket.
- cas_ticket_url = (
- "/_matrix/client/r0/login/cas/ticket?redirectUrl=%s&ticket=ticket"
- % (urllib.parse.quote(redirect_url))
- )
-
- # Get Synapse to call the fake CAS and serve the template.
- channel = self.make_request("GET", cas_ticket_url)
-
- # Because the user is deactivated they are served an error template.
- self.assertEqual(channel.code, 403)
- self.assertIn(b"SSO account deactivated", channel.result["body"])
-
-
@skip_unless(HAS_JWT, "requires authlib")
class JWTTestCase(unittest.HomeserverTestCase):
servlets = [
|