summary refs log tree commit diff
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/config/test_oauth_delegation.py11
-rw-r--r--tests/handlers/test_cas.py239
-rw-r--r--tests/rest/client/test_login.py217
3 files changed, 1 insertions, 466 deletions
diff --git a/tests/config/test_oauth_delegation.py b/tests/config/test_oauth_delegation.py

index 713bddeb90..45defcd437 100644 --- a/tests/config/test_oauth_delegation.py +++ b/tests/config/test_oauth_delegation.py
@@ -205,17 +205,6 @@ class MSC3861OAuthDelegation(TestCase): with self.assertRaises(ConfigError): self.parse_config() - def test_cas_sso_cannot_be_enabled(self) -> None: - self.config_dict["cas_config"] = { - "enabled": True, - "server_url": "https://cas-server.com", - "displayname_attribute": "name", - "required_attributes": {"userGroup": "staff", "department": "None"}, - } - - with self.assertRaises(ConfigError): - self.parse_config() - def test_auth_providers_cannot_be_enabled(self) -> None: self.config_dict["modules"] = [ { diff --git a/tests/handlers/test_cas.py b/tests/handlers/test_cas.py deleted file mode 100644
index f41f7d36ad..0000000000 --- a/tests/handlers/test_cas.py +++ /dev/null
@@ -1,239 +0,0 @@ -# -# This file is licensed under the Affero General Public License (AGPL) version 3. -# -# Copyright 2020 The Matrix.org Foundation C.I.C. -# Copyright (C) 2023 New Vector, Ltd -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as -# published by the Free Software Foundation, either version 3 of the -# License, or (at your option) any later version. -# -# See the GNU Affero General Public License for more details: -# <https://www.gnu.org/licenses/agpl-3.0.html>. -# -# Originally licensed under the Apache License, Version 2.0: -# <http://www.apache.org/licenses/LICENSE-2.0>. -# -# [This file includes modifications made by New Vector Limited] -# -# -from typing import Any, Dict -from unittest.mock import AsyncMock, Mock - -from twisted.test.proto_helpers import MemoryReactor - -from synapse.handlers.cas import CasResponse -from synapse.server import HomeServer -from synapse.util import Clock - -from tests.unittest import HomeserverTestCase, override_config - -# These are a few constants that are used as config parameters in the tests. -BASE_URL = "https://synapse/" -SERVER_URL = "https://issuer/" - - -class CasHandlerTestCase(HomeserverTestCase): - def default_config(self) -> Dict[str, Any]: - config = super().default_config() - config["public_baseurl"] = BASE_URL - cas_config = { - "enabled": True, - "server_url": SERVER_URL, - "service_url": BASE_URL, - } - - # Update this config with what's in the default config so that - # override_config works as expected. - cas_config.update(config.get("cas_config", {})) - config["cas_config"] = cas_config - - return config - - def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer: - hs = self.setup_test_homeserver() - - self.handler = hs.get_cas_handler() - - # Reduce the number of attempts when generating MXIDs. - sso_handler = hs.get_sso_handler() - sso_handler._MAP_USERNAME_RETRIES = 3 - - return hs - - def test_map_cas_user_to_user(self) -> None: - """Ensure that mapping the CAS user returned from a provider to an MXID works properly.""" - - # stub out the auth handler - auth_handler = self.hs.get_auth_handler() - auth_handler.complete_sso_login = AsyncMock() # type: ignore[method-assign] - - cas_response = CasResponse("test_user", {}) - request = _mock_request() - self.get_success( - self.handler._handle_cas_response(request, cas_response, "redirect_uri", "") - ) - - # check that the auth handler got called as expected - auth_handler.complete_sso_login.assert_called_once_with( - "@test_user:test", - "cas", - request, - "redirect_uri", - None, - new_user=True, - auth_provider_session_id=None, - ) - - def test_map_cas_user_to_existing_user(self) -> None: - """Existing users can log in with CAS account.""" - store = self.hs.get_datastores().main - self.get_success( - store.register_user(user_id="@test_user:test", password_hash=None) - ) - - # stub out the auth handler - auth_handler = self.hs.get_auth_handler() - auth_handler.complete_sso_login = AsyncMock() # type: ignore[method-assign] - - # Map a user via SSO. - cas_response = CasResponse("test_user", {}) - request = _mock_request() - self.get_success( - self.handler._handle_cas_response(request, cas_response, "redirect_uri", "") - ) - - # check that the auth handler got called as expected - auth_handler.complete_sso_login.assert_called_once_with( - "@test_user:test", - "cas", - request, - "redirect_uri", - None, - new_user=False, - auth_provider_session_id=None, - ) - - # Subsequent calls should map to the same mxid. - auth_handler.complete_sso_login.reset_mock() - self.get_success( - self.handler._handle_cas_response(request, cas_response, "redirect_uri", "") - ) - auth_handler.complete_sso_login.assert_called_once_with( - "@test_user:test", - "cas", - request, - "redirect_uri", - None, - new_user=False, - auth_provider_session_id=None, - ) - - def test_map_cas_user_to_invalid_localpart(self) -> None: - """CAS automaps invalid characters to base-64 encoding.""" - - # stub out the auth handler - auth_handler = self.hs.get_auth_handler() - auth_handler.complete_sso_login = AsyncMock() # type: ignore[method-assign] - - cas_response = CasResponse("föö", {}) - request = _mock_request() - self.get_success( - self.handler._handle_cas_response(request, cas_response, "redirect_uri", "") - ) - - # check that the auth handler got called as expected - auth_handler.complete_sso_login.assert_called_once_with( - "@f=c3=b6=c3=b6:test", - "cas", - request, - "redirect_uri", - None, - new_user=True, - auth_provider_session_id=None, - ) - - @override_config( - { - "cas_config": { - "required_attributes": {"userGroup": "staff", "department": None} - } - } - ) - def test_required_attributes(self) -> None: - """The required attributes must be met from the CAS response.""" - - # stub out the auth handler - auth_handler = self.hs.get_auth_handler() - auth_handler.complete_sso_login = AsyncMock() # type: ignore[method-assign] - - # The response doesn't have the proper userGroup or department. - cas_response = CasResponse("test_user", {}) - request = _mock_request() - self.get_success( - self.handler._handle_cas_response(request, cas_response, "redirect_uri", "") - ) - auth_handler.complete_sso_login.assert_not_called() - - # The response doesn't have any department. - cas_response = CasResponse("test_user", {"userGroup": ["staff"]}) - request.reset_mock() - self.get_success( - self.handler._handle_cas_response(request, cas_response, "redirect_uri", "") - ) - auth_handler.complete_sso_login.assert_not_called() - - # Add the proper attributes and it should succeed. - cas_response = CasResponse( - "test_user", {"userGroup": ["staff", "admin"], "department": ["sales"]} - ) - request.reset_mock() - self.get_success( - self.handler._handle_cas_response(request, cas_response, "redirect_uri", "") - ) - - # check that the auth handler got called as expected - auth_handler.complete_sso_login.assert_called_once_with( - "@test_user:test", - "cas", - request, - "redirect_uri", - None, - new_user=True, - auth_provider_session_id=None, - ) - - @override_config({"cas_config": {"enable_registration": False}}) - def test_map_cas_user_does_not_register_new_user(self) -> None: - """Ensures new users are not registered if the enabled registration flag is disabled.""" - - # stub out the auth handler - auth_handler = self.hs.get_auth_handler() - auth_handler.complete_sso_login = AsyncMock() # type: ignore[method-assign] - - cas_response = CasResponse("test_user", {}) - request = _mock_request() - self.get_success( - self.handler._handle_cas_response(request, cas_response, "redirect_uri", "") - ) - - # check that the auth handler was not called as expected - auth_handler.complete_sso_login.assert_not_called() - - -def _mock_request() -> Mock: - """Returns a mock which will stand in as a SynapseRequest""" - mock = Mock( - spec=[ - "finish", - "getClientAddress", - "getHeader", - "setHeader", - "setResponseCode", - "write", - ] - ) - # `_disconnected` musn't be another `Mock`, otherwise it will be truthy. - mock._disconnected = False - return mock diff --git a/tests/rest/client/test_login.py b/tests/rest/client/test_login.py
index c5c6604667..0cc7f60921 100644 --- a/tests/rest/client/test_login.py +++ b/tests/rest/client/test_login.py
@@ -84,9 +84,6 @@ SYNAPSE_SERVER_PUBLIC_HOSTNAME = "synapse" # https://.... PUBLIC_BASEURL = "http://%s/" % (SYNAPSE_SERVER_PUBLIC_HOSTNAME,) -# CAS server used in some tests -CAS_SERVER = "https://fake.test" - # just enough to tell pysaml2 where to redirect to SAML_SERVER = "https://test.saml.server/idp/sso" TEST_SAML_METADATA = """ @@ -638,12 +635,6 @@ class MultiSSOTestCase(unittest.HomeserverTestCase): config["public_baseurl"] = PUBLIC_BASEURL - config["cas_config"] = { - "enabled": True, - "server_url": CAS_SERVER, - "service_url": "https://matrix.goodserver.com:8448", - } - config["saml2_config"] = { "sp_config": { "metadata": {"inline": [TEST_SAML_METADATA]}, @@ -689,7 +680,6 @@ class MultiSSOTestCase(unittest.HomeserverTestCase): self.assertEqual(channel.code, 200, channel.result) expected_flow_types = [ - "m.login.cas", "m.login.sso", "m.login.token", "m.login.password", @@ -703,7 +693,6 @@ class MultiSSOTestCase(unittest.HomeserverTestCase): self.assertCountEqual( flows["m.login.sso"]["identity_providers"], [ - {"id": "cas", "name": "CAS"}, {"id": "saml", "name": "SAML"}, {"id": "oidc-idp1", "name": "IDP1"}, {"id": "oidc", "name": "OIDC"}, @@ -738,60 +727,7 @@ class MultiSSOTestCase(unittest.HomeserverTestCase): self.assertEqual(params["redirectUrl"], [TEST_CLIENT_REDIRECT_URL]) returned_idps.append(params["idp"][0]) - self.assertCountEqual(returned_idps, ["cas", "oidc", "oidc-idp1", "saml"]) - - def test_multi_sso_redirect_to_cas(self) -> None: - """If CAS is chosen, should redirect to the CAS server""" - - channel = self.make_request( - "GET", - "/_synapse/client/pick_idp?redirectUrl=" - + urllib.parse.quote_plus(TEST_CLIENT_REDIRECT_URL) - + "&idp=cas", - shorthand=False, - ) - self.assertEqual(channel.code, 302, channel.result) - location_headers = channel.headers.getRawHeaders("Location") - assert location_headers - sso_login_redirect_uri = location_headers[0] - - # it should redirect us to the standard login SSO redirect flow - self.assertEqual( - sso_login_redirect_uri, - self.login_sso_redirect_url_builder.build_login_sso_redirect_uri( - idp_id="cas", client_redirect_url=TEST_CLIENT_REDIRECT_URL - ), - ) - - # follow the redirect - channel = self.make_request( - "GET", - # We have to make this relative to be compatible with `make_request(...)` - get_relative_uri_from_absolute_uri(sso_login_redirect_uri), - # We have to set the Host header to match the `public_baseurl` to avoid - # the extra redirect in the `SsoRedirectServlet` in order for the - # cookies to be visible. - custom_headers=[ - ("Host", SYNAPSE_SERVER_PUBLIC_HOSTNAME), - ], - ) - - self.assertEqual(channel.code, 302, channel.result) - location_headers = channel.headers.getRawHeaders("Location") - assert location_headers - cas_uri = location_headers[0] - cas_uri_path, cas_uri_query = cas_uri.split("?", 1) - - # it should redirect us to the login page of the cas server - self.assertEqual(cas_uri_path, CAS_SERVER + "/login") - - # check that the redirectUrl is correctly encoded in the service param - ie, the - # place that CAS will redirect to - cas_uri_params = urllib.parse.parse_qs(cas_uri_query) - service_uri = cas_uri_params["service"][0] - _, service_uri_query = service_uri.split("?", 1) - service_uri_params = urllib.parse.parse_qs(service_uri_query) - self.assertEqual(service_uri_params["redirectUrl"][0], TEST_CLIENT_REDIRECT_URL) + self.assertCountEqual(returned_idps, ["oidc", "oidc-idp1", "saml"]) def test_multi_sso_redirect_to_saml(self) -> None: """If SAML is chosen, should redirect to the SAML server""" @@ -1019,157 +955,6 @@ class MultiSSOTestCase(unittest.HomeserverTestCase): raise ValueError("No %s caveat in macaroon" % (key,)) -class CASTestCase(unittest.HomeserverTestCase): - servlets = [ - login.register_servlets, - ] - - def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer: - self.base_url = "https://matrix.goodserver.com/" - self.redirect_path = "_synapse/client/login/sso/redirect/confirm" - - config = self.default_config() - config["public_baseurl"] = ( - config.get("public_baseurl") or "https://matrix.goodserver.com:8448" - ) - config["cas_config"] = { - "enabled": True, - "server_url": CAS_SERVER, - } - - cas_user_id = "username" - self.user_id = "@%s:test" % cas_user_id - - async def get_raw(uri: str, args: Any) -> bytes: - """Return an example response payload from a call to the `/proxyValidate` - endpoint of a CAS server, copied from - https://apereo.github.io/cas/5.0.x/protocol/CAS-Protocol-V2-Specification.html#26-proxyvalidate-cas-20 - - This needs to be returned by an async function (as opposed to set as the - mock's return value) because the corresponding Synapse code awaits on it. - """ - return ( - """ - <cas:serviceResponse xmlns:cas='http://www.yale.edu/tp/cas'> - <cas:authenticationSuccess> - <cas:user>%s</cas:user> - <cas:proxyGrantingTicket>PGTIOU-84678-8a9d...</cas:proxyGrantingTicket> - <cas:proxies> - <cas:proxy>https://proxy2/pgtUrl</cas:proxy> - <cas:proxy>https://proxy1/pgtUrl</cas:proxy> - </cas:proxies> - </cas:authenticationSuccess> - </cas:serviceResponse> - """ - % cas_user_id - ).encode("utf-8") - - mocked_http_client = Mock(spec=["get_raw"]) - mocked_http_client.get_raw.side_effect = get_raw - - self.hs = self.setup_test_homeserver( - config=config, - proxied_http_client=mocked_http_client, - ) - - return self.hs - - def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: - self.deactivate_account_handler = hs.get_deactivate_account_handler() - - def test_cas_redirect_confirm(self) -> None: - """Tests that the SSO login flow serves a confirmation page before redirecting a - user to the redirect URL. - """ - base_url = "/_matrix/client/r0/login/cas/ticket?redirectUrl" - redirect_url = "https://dodgy-site.com/" - - url_parts = list(urllib.parse.urlparse(base_url)) - query = dict(urllib.parse.parse_qsl(url_parts[4])) - query.update({"redirectUrl": redirect_url}) - query.update({"ticket": "ticket"}) - url_parts[4] = urllib.parse.urlencode(query) - cas_ticket_url = urllib.parse.urlunparse(url_parts) - - # Get Synapse to call the fake CAS and serve the template. - channel = self.make_request("GET", cas_ticket_url) - - # Test that the response is HTML. - self.assertEqual(channel.code, 200, channel.result) - content_type_header_value = "" - for header in channel.headers.getRawHeaders("Content-Type", []): - content_type_header_value = header - - self.assertTrue(content_type_header_value.startswith("text/html")) - - # Test that the body isn't empty. - self.assertTrue(len(channel.result["body"]) > 0) - - # And that it contains our redirect link - self.assertIn(redirect_url, channel.result["body"].decode("UTF-8")) - - @override_config( - { - "sso": { - "client_whitelist": [ - "https://legit-site.com/", - "https://other-site.com/", - ] - } - } - ) - def test_cas_redirect_whitelisted(self) -> None: - """Tests that the SSO login flow serves a redirect to a whitelisted url""" - self._test_redirect("https://legit-site.com/") - - @override_config({"public_baseurl": "https://example.com"}) - def test_cas_redirect_login_fallback(self) -> None: - self._test_redirect("https://example.com/_matrix/static/client/login") - - def _test_redirect(self, redirect_url: str) -> None: - """Tests that the SSO login flow serves a redirect for the given redirect URL.""" - cas_ticket_url = ( - "/_matrix/client/r0/login/cas/ticket?redirectUrl=%s&ticket=ticket" - % (urllib.parse.quote(redirect_url)) - ) - - # Get Synapse to call the fake CAS and serve the template. - channel = self.make_request("GET", cas_ticket_url) - - self.assertEqual(channel.code, 302) - location_headers = channel.headers.getRawHeaders("Location") - assert location_headers - self.assertEqual(location_headers[0][: len(redirect_url)], redirect_url) - - @override_config({"sso": {"client_whitelist": ["https://legit-site.com/"]}}) - def test_deactivated_user(self) -> None: - """Logging in as a deactivated account should error.""" - redirect_url = "https://legit-site.com/" - - # First login (to create the user). - self._test_redirect(redirect_url) - - # Deactivate the account. - self.get_success( - self.deactivate_account_handler.deactivate_account( - self.user_id, False, create_requester(self.user_id) - ) - ) - - # Request the CAS ticket. - cas_ticket_url = ( - "/_matrix/client/r0/login/cas/ticket?redirectUrl=%s&ticket=ticket" - % (urllib.parse.quote(redirect_url)) - ) - - # Get Synapse to call the fake CAS and serve the template. - channel = self.make_request("GET", cas_ticket_url) - - # Because the user is deactivated they are served an error template. - self.assertEqual(channel.code, 403) - self.assertIn(b"SSO account deactivated", channel.result["body"]) - - @skip_unless(HAS_JWT, "requires authlib") class JWTTestCase(unittest.HomeserverTestCase): servlets = [