diff --git a/docs/development/cas.md b/docs/development/cas.md
deleted file mode 100644
index 7c0668e034..0000000000
--- a/docs/development/cas.md
+++ /dev/null
@@ -1,64 +0,0 @@
-# How to test CAS as a developer without a server
-
-The [django-mama-cas](https://github.com/jbittel/django-mama-cas) project is an
-easy to run CAS implementation built on top of Django.
-
-## Prerequisites
-
-1. Create a new virtualenv: `python3 -m venv <your virtualenv>`
-2. Activate your virtualenv: `source /path/to/your/virtualenv/bin/activate`
-3. Install Django and django-mama-cas:
- ```sh
- python -m pip install "django<3" "django-mama-cas==2.4.0"
- ```
-4. Create a Django project in the current directory:
- ```sh
- django-admin startproject cas_test .
- ```
-5. Follow the [install directions](https://django-mama-cas.readthedocs.io/en/latest/installation.html#configuring) for django-mama-cas
-6. Setup the SQLite database: `python manage.py migrate`
-7. Create a user:
- ```sh
- python manage.py createsuperuser
- ```
- 1. Use whatever you want as the username and password.
- 2. Leave the other fields blank.
-8. Use the built-in Django test server to serve the CAS endpoints on port 8000:
- ```sh
- python manage.py runserver
- ```
-
-You should now have a Django project configured to serve CAS authentication with
-a single user created.
-
-## Configure Synapse (and Element) to use CAS
-
-1. Modify your `homeserver.yaml` to enable CAS and point it to your locally
- running Django test server:
- ```yaml
- cas_config:
- enabled: true
- server_url: "http://localhost:8000"
- service_url: "http://localhost:8081"
- #displayname_attribute: name
- #required_attributes:
- # name: value
- ```
-2. Restart Synapse.
-
-Note that the above configuration assumes the homeserver is running on port 8081
-and that the CAS server is on port 8000, both on localhost.
-
-## Testing the configuration
-
-Then in Element:
-
-1. Visit the login page with a Element pointing at your homeserver.
-2. Click the Single Sign-On button.
-3. Login using the credentials created with `createsuperuser`.
-4. You should be logged in.
-
-If you want to repeat this process you'll need to manually logout first:
-
-1. http://localhost:8000/admin/
-2. Click "logout" in the top right.
diff --git a/docs/usage/configuration/config_documentation.md b/docs/usage/configuration/config_documentation.md
index 1e8953ae37..b72fb36439 100644
--- a/docs/usage/configuration/config_documentation.md
+++ b/docs/usage/configuration/config_documentation.md
@@ -3514,53 +3514,6 @@ oidc_providers:
value: synapseUsers
```
---
-### `cas_config`
-
-*(object)* Enable Central Authentication Service (CAS) for registration and login.
-
-This setting has the following sub-options:
-
-* `enabled` (boolean): Set this to true to enable authorization against a CAS server. Defaults to `false`.
-
-* `idp_name` (string): A user-facing name for this identity provider, which is used to offer the user a choice of login mechanisms.
-
-* `idp_icon` (string|null): An optional icon for this identity provider, which is presented by clients and Synapse's own IdP picker page. If given, must be an MXC URI of the format `mxc://<server-name>/<media-id>`. (An easy way to obtain such an MXC URI is to upload an image to an (unencrypted) room and then copy the URL from the source of the event.) Defaults to `null`.
-
-* `idp_brand` (string|null): An optional brand for this identity provider, allowing clients to style the login flow according to the identity provider in question. See the [spec](https://spec.matrix.org/latest/) for possible options here. Defaults to `null`.
-
-* `server_url` (string): The URL of the CAS authorization endpoint.
-
-* `protocol_version` (integer|null): The CAS protocol version. (Version 3 is required if you want to use `required_attributes`). Defaults to `null`.
-
-* `displayname_attribute` (string|null): The attribute of the CAS response to use as the display name. If no name is given here, no displayname will be set. Defaults to `null`.
-
-* `required_attributes` (object): It is possible to configure Synapse to only allow logins if CAS attributes match particular values. All of the keys given below must exist and the values must match the given value. Alternately if the given value is `None` then any value is allowed (the attribute just must exist). All of the listed attributes must match for the login to be permitted. Defaults to `{}`.
-
-* `enable_registration` (boolean): Set to `false` to disable automatic registration of new users. This allows the CAS SSO flow to be limited to sign in only, rather than automatically registering users that have a valid SSO login but do not have a pre-registered account. Defaults to `true`.
-
-* `allow_numeric_ids` (boolean): Set to `true` allow numeric user IDs. This allows CAS SSO flow to provide user IDs composed of numbers only. These identifiers will be prefixed by the letter "u" by default. The prefix can be configured using the `numeric_ids_prefix` option. Be careful to choose the prefix correctly to avoid any possible conflicts (e.g. user 1234 becomes u1234 when a user u1234 already exists). Defaults to `false`.
-
-* `numeric_ids_prefix` (string): The prefix you wish to add in front of a numeric user ID when the `allow_numeric_ids` option is set to `true`. Only alphanumeric characters are allowed.
-
- *Added in Synapse 1.93.0.*
-
- Defaults to `"u"`.
-
-Example configuration:
-```yaml
-cas_config:
- enabled: true
- server_url: https://cas-server.com
- protocol_version: 3
- displayname_attribute: name
- required_attributes:
- userGroup: staff
- department: None
- enable_registration: true
- allow_numeric_ids: true
- numeric_ids_prefix: numericuser
-```
----
### `sso`
*(object)* Additional settings to use with single-sign on systems such as OpenID Connect, SAML2 and CAS.
diff --git a/docs/usage/configuration/user_authentication/single_sign_on/cas.md b/docs/usage/configuration/user_authentication/single_sign_on/cas.md
deleted file mode 100644
index 899face876..0000000000
--- a/docs/usage/configuration/user_authentication/single_sign_on/cas.md
+++ /dev/null
@@ -1,8 +0,0 @@
-# CAS
-
-Synapse supports authenticating users via the [Central Authentication
-Service protocol](https://en.wikipedia.org/wiki/Central_Authentication_Service)
-(CAS) natively.
-
-Please see the [cas_config](../../../configuration/config_documentation.md#cas_config) and [sso](../../../configuration/config_documentation.md#sso)
-sections of the configuration manual for more details.
\ No newline at end of file
diff --git a/docs/workers.md b/docs/workers.md
index 45a00696f3..a254740251 100644
--- a/docs/workers.md
+++ b/docs/workers.md
@@ -315,9 +315,6 @@ using):
# SAML requests.
^/_synapse/client/saml2/authn_response$
- # CAS requests.
- ^/_matrix/client/(api/v1|r0|v3|unstable)/login/cas/ticket$
-
Ensure that all SSO logins go to a single process.
For multiple workers not handling the SSO endpoints properly, see
[#7530](https://github.com/matrix-org/synapse/issues/7530) and
diff --git a/schema/synapse-config.schema.yaml b/schema/synapse-config.schema.yaml
index 52a6d5cf71..d3756cb074 100644
--- a/schema/synapse-config.schema.yaml
+++ b/schema/synapse-config.schema.yaml
@@ -4271,102 +4271,6 @@ properties:
attribute_requirements:
- attribute: userGroup
value: synapseUsers
- cas_config:
- type: object
- description: Enable Central Authentication Service (CAS) for registration and login.
- properties:
- enabled:
- type: boolean
- description: Set this to true to enable authorization against a CAS server.
- default: false
- idp_name:
- type: string
- description: >-
- A user-facing name for this identity provider, which is used to offer
- the user a choice of login mechanisms.
- idp_icon:
- type: ["string", "null"]
- description: >-
- An optional icon for this identity provider, which is presented by
- clients and Synapse's own IdP picker page. If given, must be an MXC
- URI of the format `mxc://<server-name>/<media-id>`. (An easy way to
- obtain such an MXC URI is to upload an image to an (unencrypted) room
- and then copy the URL from the source of the event.)
- default: null
- idp_brand:
- type: ["string", "null"]
- description: >-
- An optional brand for this identity provider, allowing clients to
- style the login flow according to the identity provider in question.
- See the [spec](https://spec.matrix.org/latest/) for possible options
- here.
- default: null
- server_url:
- type: string
- description: The URL of the CAS authorization endpoint.
- protocol_version:
- type: ["integer", "null"]
- description: >-
- The CAS protocol version. (Version 3 is required if you want to use
- `required_attributes`).
- default: null
- displayname_attribute:
- type: ["string", "null"]
- description: >-
- The attribute of the CAS response to use as the display name. If no
- name is given here, no displayname will be set.
- default: null
- required_attributes:
- type: object
- description: >-
- It is possible to configure Synapse to only allow logins if CAS
- attributes match particular values. All of the keys given below must
- exist and the values must match the given value. Alternately if the
- given value is `None` then any value is allowed (the attribute just
- must exist). All of the listed attributes must match for the login to
- be permitted.
- additionalProperties:
- type: ["string", "null"]
- default: {}
- enable_registration:
- type: boolean
- description: >-
- Set to `false` to disable automatic registration of new users. This
- allows the CAS SSO flow to be limited to sign in only, rather than
- automatically registering users that have a valid SSO login but do not
- have a pre-registered account.
- default: true
- allow_numeric_ids:
- type: boolean
- description: >-
- Set to `true` allow numeric user IDs. This allows CAS SSO flow to
- provide user IDs composed of numbers only. These identifiers will be
- prefixed by the letter "u" by default. The prefix can be configured
- using the `numeric_ids_prefix` option. Be careful to choose the prefix
- correctly to avoid any possible conflicts (e.g. user 1234 becomes
- u1234 when a user u1234 already exists).
- default: false
- numeric_ids_prefix:
- type: string
- description: >-
- The prefix you wish to add in front of a numeric user ID when the
- `allow_numeric_ids` option is set to `true`. Only alphanumeric
- characters are allowed.
-
-
- *Added in Synapse 1.93.0.*
- default: u
- examples:
- - enabled: true
- server_url: "https://cas-server.com"
- protocol_version: 3
- displayname_attribute: name
- required_attributes:
- userGroup: staff
- department: None
- enable_registration: true
- allow_numeric_ids: true
- numeric_ids_prefix: numericuser
sso:
type: object
description: >-
diff --git a/synapse/config/_base.pyi b/synapse/config/_base.pyi
index 8b065f175d..336745be87 100644
--- a/synapse/config/_base.pyi
+++ b/synapse/config/_base.pyi
@@ -98,7 +98,6 @@ class RootConfig:
appservice: appservice.AppServiceConfig
key: key.KeyConfig
saml2: saml2.SAML2Config
- cas: cas.CasConfig
sso: sso.SSOConfig
oidc: oidc.OIDCConfig
jwt: jwt.JWTConfig
diff --git a/synapse/config/cas.py b/synapse/config/cas.py
deleted file mode 100644
index c32bf36951..0000000000
--- a/synapse/config/cas.py
+++ /dev/null
@@ -1,113 +0,0 @@
-#
-# This file is licensed under the Affero General Public License (AGPL) version 3.
-#
-# Copyright 2021 The Matrix.org Foundation C.I.C.
-# Copyright 2015, 2016 OpenMarket Ltd
-# Copyright (C) 2023 New Vector, Ltd
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Affero General Public License as
-# published by the Free Software Foundation, either version 3 of the
-# License, or (at your option) any later version.
-#
-# See the GNU Affero General Public License for more details:
-# <https://www.gnu.org/licenses/agpl-3.0.html>.
-#
-# Originally licensed under the Apache License, Version 2.0:
-# <http://www.apache.org/licenses/LICENSE-2.0>.
-#
-# [This file includes modifications made by New Vector Limited]
-#
-#
-
-from typing import Any, List, Optional
-
-from synapse.config.sso import SsoAttributeRequirement
-from synapse.types import JsonDict
-
-from ._base import Config, ConfigError
-from ._util import validate_config
-
-
-class CasConfig(Config):
- """Cas Configuration
-
- cas_server_url: URL of CAS server
- """
-
- section = "cas"
-
- def read_config(self, config: JsonDict, **kwargs: Any) -> None:
- cas_config = config.get("cas_config", None)
- self.cas_enabled = cas_config and cas_config.get("enabled", True)
-
- if self.cas_enabled:
- self.cas_server_url = cas_config["server_url"]
-
- # TODO Update this to a _synapse URL.
- public_baseurl = self.root.server.public_baseurl
- self.cas_service_url: Optional[str] = (
- public_baseurl + "_matrix/client/r0/login/cas/ticket"
- )
-
- self.cas_protocol_version = cas_config.get("protocol_version")
- if (
- self.cas_protocol_version is not None
- and self.cas_protocol_version not in [1, 2, 3]
- ):
- raise ConfigError(
- "Unsupported CAS protocol version %s (only versions 1, 2, 3 are supported)"
- % (self.cas_protocol_version,),
- ("cas_config", "protocol_version"),
- )
- self.cas_displayname_attribute = cas_config.get("displayname_attribute")
- required_attributes = cas_config.get("required_attributes") or {}
- self.cas_required_attributes = _parsed_required_attributes_def(
- required_attributes
- )
-
- self.cas_enable_registration = cas_config.get("enable_registration", True)
-
- self.cas_allow_numeric_ids = cas_config.get("allow_numeric_ids")
- self.cas_numeric_ids_prefix = cas_config.get("numeric_ids_prefix")
- if (
- self.cas_numeric_ids_prefix is not None
- and self.cas_numeric_ids_prefix.isalnum() is False
- ):
- raise ConfigError(
- "Only alphanumeric characters are allowed for numeric IDs prefix",
- ("cas_config", "numeric_ids_prefix"),
- )
-
- self.idp_name = cas_config.get("idp_name", "CAS")
- self.idp_icon = cas_config.get("idp_icon")
- self.idp_brand = cas_config.get("idp_brand")
-
- else:
- self.cas_server_url = None
- self.cas_service_url = None
- self.cas_protocol_version = None
- self.cas_displayname_attribute = None
- self.cas_required_attributes = []
- self.cas_enable_registration = False
- self.cas_allow_numeric_ids = False
- self.cas_numeric_ids_prefix = "u"
-
-
-# CAS uses a legacy required attributes mapping, not the one provided by
-# SsoAttributeRequirement.
-REQUIRED_ATTRIBUTES_SCHEMA = {
- "type": "object",
- "additionalProperties": {"anyOf": [{"type": "string"}, {"type": "null"}]},
-}
-
-
-def _parsed_required_attributes_def(
- required_attributes: Any,
-) -> List[SsoAttributeRequirement]:
- validate_config(
- REQUIRED_ATTRIBUTES_SCHEMA,
- required_attributes,
- config_path=("cas_config", "required_attributes"),
- )
- return [SsoAttributeRequirement(k, v) for k, v in required_attributes.items()]
diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py
index 259b2c70cb..adc37ded95 100644
--- a/synapse/config/experimental.py
+++ b/synapse/config/experimental.py
@@ -305,7 +305,6 @@ class MSC3861:
if (
root.oidc.oidc_enabled
or root.saml2.saml2_enabled
- or root.cas.cas_enabled
or root.jwt.jwt_enabled
):
raise ConfigError("SSO cannot be enabled when OAuth delegation is enabled")
diff --git a/synapse/config/homeserver.py b/synapse/config/homeserver.py
index 0b2413a83b..430df67a20 100644
--- a/synapse/config/homeserver.py
+++ b/synapse/config/homeserver.py
@@ -27,7 +27,6 @@ from .auto_accept_invites import AutoAcceptInvitesConfig
from .background_updates import BackgroundUpdateConfig
from .cache import CacheConfig
from .captcha import CaptchaConfig
-from .cas import CasConfig
from .consent import ConsentConfig
from .database import DatabaseConfig
from .emailconfig import EmailConfig
@@ -87,7 +86,6 @@ class HomeServerConfig(RootConfig):
KeyConfig,
SAML2Config,
OIDCConfig,
- CasConfig,
SSOConfig,
JWTConfig,
AuthConfig,
diff --git a/synapse/handlers/cas.py b/synapse/handlers/cas.py
deleted file mode 100644
index cc3d641b7d..0000000000
--- a/synapse/handlers/cas.py
+++ /dev/null
@@ -1,412 +0,0 @@
-#
-# This file is licensed under the Affero General Public License (AGPL) version 3.
-#
-# Copyright 2020 The Matrix.org Foundation C.I.C.
-# Copyright (C) 2023 New Vector, Ltd
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Affero General Public License as
-# published by the Free Software Foundation, either version 3 of the
-# License, or (at your option) any later version.
-#
-# See the GNU Affero General Public License for more details:
-# <https://www.gnu.org/licenses/agpl-3.0.html>.
-#
-# Originally licensed under the Apache License, Version 2.0:
-# <http://www.apache.org/licenses/LICENSE-2.0>.
-#
-# [This file includes modifications made by New Vector Limited]
-#
-#
-import logging
-import urllib.parse
-from typing import TYPE_CHECKING, Dict, List, Optional
-from xml.etree import ElementTree as ET
-
-import attr
-
-from twisted.web.client import PartialDownloadError
-
-from synapse.api.errors import HttpResponseException
-from synapse.handlers.sso import MappingException, UserAttributes
-from synapse.http.site import SynapseRequest
-from synapse.types import UserID, map_username_to_mxid_localpart
-
-if TYPE_CHECKING:
- from synapse.server import HomeServer
-
-logger = logging.getLogger(__name__)
-
-
-class CasError(Exception):
- """Used to catch errors when validating the CAS ticket."""
-
- def __init__(self, error: str, error_description: Optional[str] = None):
- self.error = error
- self.error_description = error_description
-
- def __str__(self) -> str:
- if self.error_description:
- return f"{self.error}: {self.error_description}"
- return self.error
-
-
-@attr.s(slots=True, frozen=True, auto_attribs=True)
-class CasResponse:
- username: str
- attributes: Dict[str, List[Optional[str]]]
-
-
-class CasHandler:
- """
- Utility class for to handle the response from a CAS SSO service.
-
- Args:
- hs
- """
-
- def __init__(self, hs: "HomeServer"):
- self.hs = hs
- self._hostname = hs.hostname
- self._store = hs.get_datastores().main
- self._auth_handler = hs.get_auth_handler()
- self._registration_handler = hs.get_registration_handler()
-
- self._cas_server_url = hs.config.cas.cas_server_url
- self._cas_service_url = hs.config.cas.cas_service_url
- self._cas_protocol_version = hs.config.cas.cas_protocol_version
- self._cas_displayname_attribute = hs.config.cas.cas_displayname_attribute
- self._cas_required_attributes = hs.config.cas.cas_required_attributes
- self._cas_enable_registration = hs.config.cas.cas_enable_registration
- self._cas_allow_numeric_ids = hs.config.cas.cas_allow_numeric_ids
- self._cas_numeric_ids_prefix = hs.config.cas.cas_numeric_ids_prefix
-
- self._http_client = hs.get_proxied_http_client()
-
- # identifier for the external_ids table
- self.idp_id = "cas"
-
- # user-facing name of this auth provider
- self.idp_name = hs.config.cas.idp_name
-
- # MXC URI for icon for this auth provider
- self.idp_icon = hs.config.cas.idp_icon
-
- # optional brand identifier for this auth provider
- self.idp_brand = hs.config.cas.idp_brand
-
- self._sso_handler = hs.get_sso_handler()
-
- self._sso_handler.register_identity_provider(self)
-
- def _build_service_param(self, args: Dict[str, str]) -> str:
- """
- Generates a value to use as the "service" parameter when redirecting or
- querying the CAS service.
-
- Args:
- args: Additional arguments to include in the final redirect URL.
-
- Returns:
- The URL to use as a "service" parameter.
- """
- return "%s?%s" % (
- self._cas_service_url,
- urllib.parse.urlencode(args),
- )
-
- async def _validate_ticket(
- self, ticket: str, service_args: Dict[str, str]
- ) -> CasResponse:
- """
- Validate a CAS ticket with the server, and return the parsed the response.
-
- Args:
- ticket: The CAS ticket from the client.
- service_args: Additional arguments to include in the service URL.
- Should be the same as those passed to `handle_redirect_request`.
-
- Raises:
- CasError: If there's an error parsing the CAS response.
-
- Returns:
- The parsed CAS response.
- """
- if self._cas_protocol_version == 3:
- uri = self._cas_server_url + "/p3/proxyValidate"
- else:
- uri = self._cas_server_url + "/proxyValidate"
- args = {
- "ticket": ticket,
- "service": self._build_service_param(service_args),
- }
- try:
- body = await self._http_client.get_raw(uri, args)
- except PartialDownloadError as pde:
- # Twisted raises this error if the connection is closed,
- # even if that's being used old-http style to signal end-of-data
- # Assertion is for mypy's benefit. Error.response is Optional[bytes],
- # but a PartialDownloadError should always have a non-None response.
- assert pde.response is not None
- body = pde.response
- except HttpResponseException as e:
- description = (
- 'Authorization server responded with a "{status}" error '
- "while exchanging the authorization code."
- ).format(status=e.code)
- raise CasError("server_error", description) from e
-
- return self._parse_cas_response(body)
-
- def _parse_cas_response(self, cas_response_body: bytes) -> CasResponse:
- """
- Retrieve the user and other parameters from the CAS response.
-
- Args:
- cas_response_body: The response from the CAS query.
-
- Raises:
- CasError: If there's an error parsing the CAS response.
-
- Returns:
- The parsed CAS response.
- """
-
- # Ensure the response is valid.
- root = ET.fromstring(cas_response_body)
- if not root.tag.endswith("serviceResponse"):
- raise CasError(
- "missing_service_response",
- "root of CAS response is not serviceResponse",
- )
-
- success = root[0].tag.endswith("authenticationSuccess")
- if not success:
- raise CasError("unsucessful_response", "Unsuccessful CAS response")
-
- # Iterate through the nodes and pull out the user and any extra attributes.
- user = None
- attributes: Dict[str, List[Optional[str]]] = {}
- for child in root[0]:
- if child.tag.endswith("user"):
- user = child.text
- # if numeric user IDs are allowed and username is numeric then we add the prefix so Synapse can handle it
- if self._cas_allow_numeric_ids and user is not None and user.isdigit():
- user = f"{self._cas_numeric_ids_prefix}{user}"
- if child.tag.endswith("attributes"):
- for attribute in child:
- # ElementTree library expands the namespace in
- # attribute tags to the full URL of the namespace.
- # We don't care about namespace here and it will always
- # be encased in curly braces, so we remove them.
- tag = attribute.tag
- if "}" in tag:
- tag = tag.split("}")[1]
- attributes.setdefault(tag, []).append(attribute.text)
-
- # Ensure a user was found.
- if user is None:
- raise CasError("no_user", "CAS response does not contain user")
-
- return CasResponse(user, attributes)
-
- async def handle_redirect_request(
- self,
- request: SynapseRequest,
- client_redirect_url: Optional[bytes],
- ui_auth_session_id: Optional[str] = None,
- ) -> str:
- """Generates a URL for the CAS server where the client should be redirected.
-
- Args:
- request: the incoming HTTP request
- client_redirect_url: the URL that we should redirect the
- client to after login (or None for UI Auth).
- ui_auth_session_id: The session ID of the ongoing UI Auth (or
- None if this is a login).
-
- Returns:
- URL to redirect to
- """
-
- if ui_auth_session_id:
- service_args = {"session": ui_auth_session_id}
- else:
- assert client_redirect_url
- service_args = {"redirectUrl": client_redirect_url.decode("utf8")}
-
- args = urllib.parse.urlencode(
- {"service": self._build_service_param(service_args)}
- )
-
- return "%s/login?%s" % (self._cas_server_url, args)
-
- async def handle_ticket(
- self,
- request: SynapseRequest,
- ticket: str,
- client_redirect_url: Optional[str],
- session: Optional[str],
- ) -> None:
- """
- Called once the user has successfully authenticated with the SSO.
- Validates a CAS ticket sent by the client and completes the auth process.
-
- If the user interactive authentication session is provided, marks the
- UI Auth session as complete, then returns an HTML page notifying the
- user they are done.
-
- Otherwise, this registers the user if necessary, and then returns a
- redirect (with a login token) to the client.
-
- Args:
- request: the incoming request from the browser. We'll
- respond to it with a redirect or an HTML page.
-
- ticket: The CAS ticket provided by the client.
-
- client_redirect_url: the redirectUrl parameter from the `/cas/ticket` HTTP request, if given.
- This should be the same as the redirectUrl from the original `/login/sso/redirect` request.
-
- session: The session parameter from the `/cas/ticket` HTTP request, if given.
- This should be the UI Auth session id.
- """
- args = {}
- if client_redirect_url:
- args["redirectUrl"] = client_redirect_url
- if session:
- args["session"] = session
-
- try:
- cas_response = await self._validate_ticket(ticket, args)
- except CasError as e:
- logger.exception("Could not validate ticket")
- self._sso_handler.render_error(request, e.error, e.error_description, 401)
- return
-
- await self._handle_cas_response(
- request, cas_response, client_redirect_url, session
- )
-
- async def _handle_cas_response(
- self,
- request: SynapseRequest,
- cas_response: CasResponse,
- client_redirect_url: Optional[str],
- session: Optional[str],
- ) -> None:
- """Handle a CAS response to a ticket request.
-
- Assumes that the response has been validated. Maps the user onto an MXID,
- registering them if necessary, and returns a response to the browser.
-
- Args:
- request: the incoming request from the browser. We'll respond to it with an
- HTML page or a redirect
-
- cas_response: The parsed CAS response.
-
- client_redirect_url: the redirectUrl parameter from the `/cas/ticket` HTTP request, if given.
- This should be the same as the redirectUrl from the original `/login/sso/redirect` request.
-
- session: The session parameter from the `/cas/ticket` HTTP request, if given.
- This should be the UI Auth session id.
- """
-
- # first check if we're doing a UIA
- if session:
- return await self._sso_handler.complete_sso_ui_auth_request(
- self.idp_id,
- cas_response.username,
- session,
- request,
- )
-
- # otherwise, we're handling a login request.
-
- # Ensure that the attributes of the logged in user meet the required
- # attributes.
- if not self._sso_handler.check_required_attributes(
- request, cas_response.attributes, self._cas_required_attributes
- ):
- return
-
- # Call the mapper to register/login the user
-
- # If this not a UI auth request than there must be a redirect URL.
- assert client_redirect_url is not None
-
- try:
- await self._complete_cas_login(cas_response, request, client_redirect_url)
- except MappingException as e:
- logger.exception("Could not map user")
- self._sso_handler.render_error(request, "mapping_error", str(e))
-
- async def _complete_cas_login(
- self,
- cas_response: CasResponse,
- request: SynapseRequest,
- client_redirect_url: str,
- ) -> None:
- """
- Given a CAS response, complete the login flow
-
- Retrieves the remote user ID, registers the user if necessary, and serves
- a redirect back to the client with a login-token.
-
- Args:
- cas_response: The parsed CAS response.
- request: The request to respond to
- client_redirect_url: The redirect URL passed in by the client.
-
- Raises:
- MappingException if there was a problem mapping the response to a user.
- RedirectException: some mapping providers may raise this if they need
- to redirect to an interstitial page.
- """
- # Note that CAS does not support a mapping provider, so the logic is hard-coded.
- localpart = map_username_to_mxid_localpart(cas_response.username)
-
- async def cas_response_to_user_attributes(failures: int) -> UserAttributes:
- """
- Map from CAS attributes to user attributes.
- """
- # Due to the grandfathering logic matching any previously registered
- # mxids it isn't expected for there to be any failures.
- if failures:
- raise RuntimeError("CAS is not expected to de-duplicate Matrix IDs")
-
- # Arbitrarily use the first attribute found.
- display_name = cas_response.attributes.get(
- self._cas_displayname_attribute, [None]
- )[0]
-
- return UserAttributes(localpart=localpart, display_name=display_name)
-
- async def grandfather_existing_users() -> Optional[str]:
- # Since CAS did not always use the user_external_ids table, always
- # to attempt to map to existing users.
- user_id = UserID(localpart, self._hostname).to_string()
-
- logger.debug(
- "Looking for existing account based on mapped %s",
- user_id,
- )
-
- users = await self._store.get_users_by_id_case_insensitive(user_id)
- if users:
- registered_user_id = list(users.keys())[0]
- logger.info("Grandfathering mapping to %s", registered_user_id)
- return registered_user_id
-
- return None
-
- await self._sso_handler.complete_sso_login_request(
- self.idp_id,
- cas_response.username,
- request,
- client_redirect_url,
- cas_response_to_user_attributes,
- grandfather_existing_users,
- registration_enabled=self._cas_enable_registration,
- )
diff --git a/synapse/rest/client/login.py b/synapse/rest/client/login.py
index 72b219447b..f65f8f2130 100644
--- a/synapse/rest/client/login.py
+++ b/synapse/rest/client/login.py
@@ -81,7 +81,6 @@ class LoginRestServlet(RestServlet):
PATTERNS = client_patterns("/login$", v1=True)
CATEGORY = "Registration/login requests"
- CAS_TYPE = "m.login.cas"
SSO_TYPE = "m.login.sso"
TOKEN_TYPE = "m.login.token"
JWT_TYPE = "org.matrix.login.jwt"
@@ -98,7 +97,6 @@ class LoginRestServlet(RestServlet):
# SSO configuration.
self.saml2_enabled = hs.config.saml2.saml2_enabled
- self.cas_enabled = hs.config.cas.cas_enabled
self.oidc_enabled = hs.config.oidc.oidc_enabled
self._refresh_tokens_enabled = (
hs.config.registration.refreshable_access_token_lifetime is not None
@@ -135,7 +133,7 @@ class LoginRestServlet(RestServlet):
cfg=self.hs.config.ratelimiting.rc_login_account,
)
- # ensure the CAS/SAML/OIDC handlers are loaded on this worker instance.
+ # ensure the SAML/OIDC handlers are loaded on this worker instance.
# The reason for this is to ensure that the auth_provider_ids are registered
# with SsoHandler, which in turn ensures that the login/registration prometheus
# counters are initialised for the auth_provider_ids.
@@ -146,15 +144,10 @@ class LoginRestServlet(RestServlet):
if self.jwt_enabled:
flows.append({"type": LoginRestServlet.JWT_TYPE})
- if self.cas_enabled:
- # we advertise CAS for backwards compat, though MSC1721 renamed it
- # to SSO.
- flows.append({"type": LoginRestServlet.CAS_TYPE})
-
# The login token flow requires m.login.token to be advertised.
support_login_token_flow = self._get_login_token_enabled
- if self.cas_enabled or self.saml2_enabled or self.oidc_enabled:
+ if self.saml2_enabled or self.oidc_enabled:
flows.append(
{
"type": LoginRestServlet.SSO_TYPE,
@@ -626,7 +619,7 @@ class RefreshTokenServlet(RestServlet):
class SsoRedirectServlet(RestServlet):
- PATTERNS = list(client_patterns("/login/(cas|sso)/redirect$", v1=True)) + [
+ PATTERNS = list(client_patterns("/login/sso/redirect$", v1=True)) + [
re.compile(
"^"
+ CLIENT_API_PREFIX
@@ -683,31 +676,6 @@ class SsoRedirectServlet(RestServlet):
finish_request(request)
-class CasTicketServlet(RestServlet):
- PATTERNS = client_patterns("/login/cas/ticket", v1=True)
-
- def __init__(self, hs: "HomeServer"):
- super().__init__()
- self._cas_handler = hs.get_cas_handler()
-
- async def on_GET(self, request: SynapseRequest) -> None:
- client_redirect_url = parse_string(request, "redirectUrl")
- ticket = parse_string(request, "ticket", required=True)
-
- # Maybe get a session ID (if this ticket is from user interactive
- # authentication).
- session = parse_string(request, "session")
-
- # Either client_redirect_url or session must be provided.
- if not client_redirect_url and not session:
- message = "Missing string query parameter redirectUrl or session"
- raise SynapseError(400, message, errcode=Codes.MISSING_PARAM)
-
- await self._cas_handler.handle_ticket(
- request, ticket, client_redirect_url, session
- )
-
-
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
if hs.config.experimental.msc3861.enabled:
return
@@ -719,25 +687,20 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
):
RefreshTokenServlet(hs).register(http_server)
if (
- hs.config.cas.cas_enabled
- or hs.config.saml2.saml2_enabled
+ hs.config.saml2.saml2_enabled
or hs.config.oidc.oidc_enabled
):
SsoRedirectServlet(hs).register(http_server)
- if hs.config.cas.cas_enabled:
- CasTicketServlet(hs).register(http_server)
def _load_sso_handlers(hs: "HomeServer") -> None:
"""Ensure that the SSO handlers are loaded, if they are enabled by configuration.
- This is mostly useful to ensure that the CAS/SAML/OIDC handlers register themselves
+ This is mostly useful to ensure that the SAML/OIDC handlers register themselves
with the main SsoHandler.
It's safe to call this multiple times.
"""
- if hs.config.cas.cas_enabled:
- hs.get_cas_handler()
if hs.config.saml2.saml2_enabled:
hs.get_saml_handler()
if hs.config.oidc.oidc_enabled:
diff --git a/synapse/server.py b/synapse/server.py
index 2add4d4e6e..add34b8e8d 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -66,7 +66,6 @@ from synapse.handlers.account_validity import AccountValidityHandler
from synapse.handlers.admin import AdminHandler
from synapse.handlers.appservice import ApplicationServicesHandler
from synapse.handlers.auth import AuthHandler, PasswordAuthProvider
-from synapse.handlers.cas import CasHandler
from synapse.handlers.deactivate_account import DeactivateAccountHandler
from synapse.handlers.delayed_events import DelayedEventsHandler
from synapse.handlers.device import DeviceHandler, DeviceWorkerHandler
@@ -793,10 +792,6 @@ class HomeServer(metaclass=abc.ABCMeta):
return AccountValidityHandler(self)
@cache_in_self
- def get_cas_handler(self) -> CasHandler:
- return CasHandler(self)
-
- @cache_in_self
def get_saml_handler(self) -> "SamlHandler":
from synapse.handlers.saml import SamlHandler
diff --git a/tests/config/test_oauth_delegation.py b/tests/config/test_oauth_delegation.py
index 713bddeb90..45defcd437 100644
--- a/tests/config/test_oauth_delegation.py
+++ b/tests/config/test_oauth_delegation.py
@@ -205,17 +205,6 @@ class MSC3861OAuthDelegation(TestCase):
with self.assertRaises(ConfigError):
self.parse_config()
- def test_cas_sso_cannot_be_enabled(self) -> None:
- self.config_dict["cas_config"] = {
- "enabled": True,
- "server_url": "https://cas-server.com",
- "displayname_attribute": "name",
- "required_attributes": {"userGroup": "staff", "department": "None"},
- }
-
- with self.assertRaises(ConfigError):
- self.parse_config()
-
def test_auth_providers_cannot_be_enabled(self) -> None:
self.config_dict["modules"] = [
{
diff --git a/tests/handlers/test_cas.py b/tests/handlers/test_cas.py
deleted file mode 100644
index f41f7d36ad..0000000000
--- a/tests/handlers/test_cas.py
+++ /dev/null
@@ -1,239 +0,0 @@
-#
-# This file is licensed under the Affero General Public License (AGPL) version 3.
-#
-# Copyright 2020 The Matrix.org Foundation C.I.C.
-# Copyright (C) 2023 New Vector, Ltd
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Affero General Public License as
-# published by the Free Software Foundation, either version 3 of the
-# License, or (at your option) any later version.
-#
-# See the GNU Affero General Public License for more details:
-# <https://www.gnu.org/licenses/agpl-3.0.html>.
-#
-# Originally licensed under the Apache License, Version 2.0:
-# <http://www.apache.org/licenses/LICENSE-2.0>.
-#
-# [This file includes modifications made by New Vector Limited]
-#
-#
-from typing import Any, Dict
-from unittest.mock import AsyncMock, Mock
-
-from twisted.test.proto_helpers import MemoryReactor
-
-from synapse.handlers.cas import CasResponse
-from synapse.server import HomeServer
-from synapse.util import Clock
-
-from tests.unittest import HomeserverTestCase, override_config
-
-# These are a few constants that are used as config parameters in the tests.
-BASE_URL = "https://synapse/"
-SERVER_URL = "https://issuer/"
-
-
-class CasHandlerTestCase(HomeserverTestCase):
- def default_config(self) -> Dict[str, Any]:
- config = super().default_config()
- config["public_baseurl"] = BASE_URL
- cas_config = {
- "enabled": True,
- "server_url": SERVER_URL,
- "service_url": BASE_URL,
- }
-
- # Update this config with what's in the default config so that
- # override_config works as expected.
- cas_config.update(config.get("cas_config", {}))
- config["cas_config"] = cas_config
-
- return config
-
- def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
- hs = self.setup_test_homeserver()
-
- self.handler = hs.get_cas_handler()
-
- # Reduce the number of attempts when generating MXIDs.
- sso_handler = hs.get_sso_handler()
- sso_handler._MAP_USERNAME_RETRIES = 3
-
- return hs
-
- def test_map_cas_user_to_user(self) -> None:
- """Ensure that mapping the CAS user returned from a provider to an MXID works properly."""
-
- # stub out the auth handler
- auth_handler = self.hs.get_auth_handler()
- auth_handler.complete_sso_login = AsyncMock() # type: ignore[method-assign]
-
- cas_response = CasResponse("test_user", {})
- request = _mock_request()
- self.get_success(
- self.handler._handle_cas_response(request, cas_response, "redirect_uri", "")
- )
-
- # check that the auth handler got called as expected
- auth_handler.complete_sso_login.assert_called_once_with(
- "@test_user:test",
- "cas",
- request,
- "redirect_uri",
- None,
- new_user=True,
- auth_provider_session_id=None,
- )
-
- def test_map_cas_user_to_existing_user(self) -> None:
- """Existing users can log in with CAS account."""
- store = self.hs.get_datastores().main
- self.get_success(
- store.register_user(user_id="@test_user:test", password_hash=None)
- )
-
- # stub out the auth handler
- auth_handler = self.hs.get_auth_handler()
- auth_handler.complete_sso_login = AsyncMock() # type: ignore[method-assign]
-
- # Map a user via SSO.
- cas_response = CasResponse("test_user", {})
- request = _mock_request()
- self.get_success(
- self.handler._handle_cas_response(request, cas_response, "redirect_uri", "")
- )
-
- # check that the auth handler got called as expected
- auth_handler.complete_sso_login.assert_called_once_with(
- "@test_user:test",
- "cas",
- request,
- "redirect_uri",
- None,
- new_user=False,
- auth_provider_session_id=None,
- )
-
- # Subsequent calls should map to the same mxid.
- auth_handler.complete_sso_login.reset_mock()
- self.get_success(
- self.handler._handle_cas_response(request, cas_response, "redirect_uri", "")
- )
- auth_handler.complete_sso_login.assert_called_once_with(
- "@test_user:test",
- "cas",
- request,
- "redirect_uri",
- None,
- new_user=False,
- auth_provider_session_id=None,
- )
-
- def test_map_cas_user_to_invalid_localpart(self) -> None:
- """CAS automaps invalid characters to base-64 encoding."""
-
- # stub out the auth handler
- auth_handler = self.hs.get_auth_handler()
- auth_handler.complete_sso_login = AsyncMock() # type: ignore[method-assign]
-
- cas_response = CasResponse("föö", {})
- request = _mock_request()
- self.get_success(
- self.handler._handle_cas_response(request, cas_response, "redirect_uri", "")
- )
-
- # check that the auth handler got called as expected
- auth_handler.complete_sso_login.assert_called_once_with(
- "@f=c3=b6=c3=b6:test",
- "cas",
- request,
- "redirect_uri",
- None,
- new_user=True,
- auth_provider_session_id=None,
- )
-
- @override_config(
- {
- "cas_config": {
- "required_attributes": {"userGroup": "staff", "department": None}
- }
- }
- )
- def test_required_attributes(self) -> None:
- """The required attributes must be met from the CAS response."""
-
- # stub out the auth handler
- auth_handler = self.hs.get_auth_handler()
- auth_handler.complete_sso_login = AsyncMock() # type: ignore[method-assign]
-
- # The response doesn't have the proper userGroup or department.
- cas_response = CasResponse("test_user", {})
- request = _mock_request()
- self.get_success(
- self.handler._handle_cas_response(request, cas_response, "redirect_uri", "")
- )
- auth_handler.complete_sso_login.assert_not_called()
-
- # The response doesn't have any department.
- cas_response = CasResponse("test_user", {"userGroup": ["staff"]})
- request.reset_mock()
- self.get_success(
- self.handler._handle_cas_response(request, cas_response, "redirect_uri", "")
- )
- auth_handler.complete_sso_login.assert_not_called()
-
- # Add the proper attributes and it should succeed.
- cas_response = CasResponse(
- "test_user", {"userGroup": ["staff", "admin"], "department": ["sales"]}
- )
- request.reset_mock()
- self.get_success(
- self.handler._handle_cas_response(request, cas_response, "redirect_uri", "")
- )
-
- # check that the auth handler got called as expected
- auth_handler.complete_sso_login.assert_called_once_with(
- "@test_user:test",
- "cas",
- request,
- "redirect_uri",
- None,
- new_user=True,
- auth_provider_session_id=None,
- )
-
- @override_config({"cas_config": {"enable_registration": False}})
- def test_map_cas_user_does_not_register_new_user(self) -> None:
- """Ensures new users are not registered if the enabled registration flag is disabled."""
-
- # stub out the auth handler
- auth_handler = self.hs.get_auth_handler()
- auth_handler.complete_sso_login = AsyncMock() # type: ignore[method-assign]
-
- cas_response = CasResponse("test_user", {})
- request = _mock_request()
- self.get_success(
- self.handler._handle_cas_response(request, cas_response, "redirect_uri", "")
- )
-
- # check that the auth handler was not called as expected
- auth_handler.complete_sso_login.assert_not_called()
-
-
-def _mock_request() -> Mock:
- """Returns a mock which will stand in as a SynapseRequest"""
- mock = Mock(
- spec=[
- "finish",
- "getClientAddress",
- "getHeader",
- "setHeader",
- "setResponseCode",
- "write",
- ]
- )
- # `_disconnected` musn't be another `Mock`, otherwise it will be truthy.
- mock._disconnected = False
- return mock
diff --git a/tests/rest/client/test_login.py b/tests/rest/client/test_login.py
index c5c6604667..0cc7f60921 100644
--- a/tests/rest/client/test_login.py
+++ b/tests/rest/client/test_login.py
@@ -84,9 +84,6 @@ SYNAPSE_SERVER_PUBLIC_HOSTNAME = "synapse"
# https://....
PUBLIC_BASEURL = "http://%s/" % (SYNAPSE_SERVER_PUBLIC_HOSTNAME,)
-# CAS server used in some tests
-CAS_SERVER = "https://fake.test"
-
# just enough to tell pysaml2 where to redirect to
SAML_SERVER = "https://test.saml.server/idp/sso"
TEST_SAML_METADATA = """
@@ -638,12 +635,6 @@ class MultiSSOTestCase(unittest.HomeserverTestCase):
config["public_baseurl"] = PUBLIC_BASEURL
- config["cas_config"] = {
- "enabled": True,
- "server_url": CAS_SERVER,
- "service_url": "https://matrix.goodserver.com:8448",
- }
-
config["saml2_config"] = {
"sp_config": {
"metadata": {"inline": [TEST_SAML_METADATA]},
@@ -689,7 +680,6 @@ class MultiSSOTestCase(unittest.HomeserverTestCase):
self.assertEqual(channel.code, 200, channel.result)
expected_flow_types = [
- "m.login.cas",
"m.login.sso",
"m.login.token",
"m.login.password",
@@ -703,7 +693,6 @@ class MultiSSOTestCase(unittest.HomeserverTestCase):
self.assertCountEqual(
flows["m.login.sso"]["identity_providers"],
[
- {"id": "cas", "name": "CAS"},
{"id": "saml", "name": "SAML"},
{"id": "oidc-idp1", "name": "IDP1"},
{"id": "oidc", "name": "OIDC"},
@@ -738,60 +727,7 @@ class MultiSSOTestCase(unittest.HomeserverTestCase):
self.assertEqual(params["redirectUrl"], [TEST_CLIENT_REDIRECT_URL])
returned_idps.append(params["idp"][0])
- self.assertCountEqual(returned_idps, ["cas", "oidc", "oidc-idp1", "saml"])
-
- def test_multi_sso_redirect_to_cas(self) -> None:
- """If CAS is chosen, should redirect to the CAS server"""
-
- channel = self.make_request(
- "GET",
- "/_synapse/client/pick_idp?redirectUrl="
- + urllib.parse.quote_plus(TEST_CLIENT_REDIRECT_URL)
- + "&idp=cas",
- shorthand=False,
- )
- self.assertEqual(channel.code, 302, channel.result)
- location_headers = channel.headers.getRawHeaders("Location")
- assert location_headers
- sso_login_redirect_uri = location_headers[0]
-
- # it should redirect us to the standard login SSO redirect flow
- self.assertEqual(
- sso_login_redirect_uri,
- self.login_sso_redirect_url_builder.build_login_sso_redirect_uri(
- idp_id="cas", client_redirect_url=TEST_CLIENT_REDIRECT_URL
- ),
- )
-
- # follow the redirect
- channel = self.make_request(
- "GET",
- # We have to make this relative to be compatible with `make_request(...)`
- get_relative_uri_from_absolute_uri(sso_login_redirect_uri),
- # We have to set the Host header to match the `public_baseurl` to avoid
- # the extra redirect in the `SsoRedirectServlet` in order for the
- # cookies to be visible.
- custom_headers=[
- ("Host", SYNAPSE_SERVER_PUBLIC_HOSTNAME),
- ],
- )
-
- self.assertEqual(channel.code, 302, channel.result)
- location_headers = channel.headers.getRawHeaders("Location")
- assert location_headers
- cas_uri = location_headers[0]
- cas_uri_path, cas_uri_query = cas_uri.split("?", 1)
-
- # it should redirect us to the login page of the cas server
- self.assertEqual(cas_uri_path, CAS_SERVER + "/login")
-
- # check that the redirectUrl is correctly encoded in the service param - ie, the
- # place that CAS will redirect to
- cas_uri_params = urllib.parse.parse_qs(cas_uri_query)
- service_uri = cas_uri_params["service"][0]
- _, service_uri_query = service_uri.split("?", 1)
- service_uri_params = urllib.parse.parse_qs(service_uri_query)
- self.assertEqual(service_uri_params["redirectUrl"][0], TEST_CLIENT_REDIRECT_URL)
+ self.assertCountEqual(returned_idps, ["oidc", "oidc-idp1", "saml"])
def test_multi_sso_redirect_to_saml(self) -> None:
"""If SAML is chosen, should redirect to the SAML server"""
@@ -1019,157 +955,6 @@ class MultiSSOTestCase(unittest.HomeserverTestCase):
raise ValueError("No %s caveat in macaroon" % (key,))
-class CASTestCase(unittest.HomeserverTestCase):
- servlets = [
- login.register_servlets,
- ]
-
- def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
- self.base_url = "https://matrix.goodserver.com/"
- self.redirect_path = "_synapse/client/login/sso/redirect/confirm"
-
- config = self.default_config()
- config["public_baseurl"] = (
- config.get("public_baseurl") or "https://matrix.goodserver.com:8448"
- )
- config["cas_config"] = {
- "enabled": True,
- "server_url": CAS_SERVER,
- }
-
- cas_user_id = "username"
- self.user_id = "@%s:test" % cas_user_id
-
- async def get_raw(uri: str, args: Any) -> bytes:
- """Return an example response payload from a call to the `/proxyValidate`
- endpoint of a CAS server, copied from
- https://apereo.github.io/cas/5.0.x/protocol/CAS-Protocol-V2-Specification.html#26-proxyvalidate-cas-20
-
- This needs to be returned by an async function (as opposed to set as the
- mock's return value) because the corresponding Synapse code awaits on it.
- """
- return (
- """
- <cas:serviceResponse xmlns:cas='http://www.yale.edu/tp/cas'>
- <cas:authenticationSuccess>
- <cas:user>%s</cas:user>
- <cas:proxyGrantingTicket>PGTIOU-84678-8a9d...</cas:proxyGrantingTicket>
- <cas:proxies>
- <cas:proxy>https://proxy2/pgtUrl</cas:proxy>
- <cas:proxy>https://proxy1/pgtUrl</cas:proxy>
- </cas:proxies>
- </cas:authenticationSuccess>
- </cas:serviceResponse>
- """
- % cas_user_id
- ).encode("utf-8")
-
- mocked_http_client = Mock(spec=["get_raw"])
- mocked_http_client.get_raw.side_effect = get_raw
-
- self.hs = self.setup_test_homeserver(
- config=config,
- proxied_http_client=mocked_http_client,
- )
-
- return self.hs
-
- def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
- self.deactivate_account_handler = hs.get_deactivate_account_handler()
-
- def test_cas_redirect_confirm(self) -> None:
- """Tests that the SSO login flow serves a confirmation page before redirecting a
- user to the redirect URL.
- """
- base_url = "/_matrix/client/r0/login/cas/ticket?redirectUrl"
- redirect_url = "https://dodgy-site.com/"
-
- url_parts = list(urllib.parse.urlparse(base_url))
- query = dict(urllib.parse.parse_qsl(url_parts[4]))
- query.update({"redirectUrl": redirect_url})
- query.update({"ticket": "ticket"})
- url_parts[4] = urllib.parse.urlencode(query)
- cas_ticket_url = urllib.parse.urlunparse(url_parts)
-
- # Get Synapse to call the fake CAS and serve the template.
- channel = self.make_request("GET", cas_ticket_url)
-
- # Test that the response is HTML.
- self.assertEqual(channel.code, 200, channel.result)
- content_type_header_value = ""
- for header in channel.headers.getRawHeaders("Content-Type", []):
- content_type_header_value = header
-
- self.assertTrue(content_type_header_value.startswith("text/html"))
-
- # Test that the body isn't empty.
- self.assertTrue(len(channel.result["body"]) > 0)
-
- # And that it contains our redirect link
- self.assertIn(redirect_url, channel.result["body"].decode("UTF-8"))
-
- @override_config(
- {
- "sso": {
- "client_whitelist": [
- "https://legit-site.com/",
- "https://other-site.com/",
- ]
- }
- }
- )
- def test_cas_redirect_whitelisted(self) -> None:
- """Tests that the SSO login flow serves a redirect to a whitelisted url"""
- self._test_redirect("https://legit-site.com/")
-
- @override_config({"public_baseurl": "https://example.com"})
- def test_cas_redirect_login_fallback(self) -> None:
- self._test_redirect("https://example.com/_matrix/static/client/login")
-
- def _test_redirect(self, redirect_url: str) -> None:
- """Tests that the SSO login flow serves a redirect for the given redirect URL."""
- cas_ticket_url = (
- "/_matrix/client/r0/login/cas/ticket?redirectUrl=%s&ticket=ticket"
- % (urllib.parse.quote(redirect_url))
- )
-
- # Get Synapse to call the fake CAS and serve the template.
- channel = self.make_request("GET", cas_ticket_url)
-
- self.assertEqual(channel.code, 302)
- location_headers = channel.headers.getRawHeaders("Location")
- assert location_headers
- self.assertEqual(location_headers[0][: len(redirect_url)], redirect_url)
-
- @override_config({"sso": {"client_whitelist": ["https://legit-site.com/"]}})
- def test_deactivated_user(self) -> None:
- """Logging in as a deactivated account should error."""
- redirect_url = "https://legit-site.com/"
-
- # First login (to create the user).
- self._test_redirect(redirect_url)
-
- # Deactivate the account.
- self.get_success(
- self.deactivate_account_handler.deactivate_account(
- self.user_id, False, create_requester(self.user_id)
- )
- )
-
- # Request the CAS ticket.
- cas_ticket_url = (
- "/_matrix/client/r0/login/cas/ticket?redirectUrl=%s&ticket=ticket"
- % (urllib.parse.quote(redirect_url))
- )
-
- # Get Synapse to call the fake CAS and serve the template.
- channel = self.make_request("GET", cas_ticket_url)
-
- # Because the user is deactivated they are served an error template.
- self.assertEqual(channel.code, 403)
- self.assertIn(b"SSO account deactivated", channel.result["body"])
-
-
@skip_unless(HAS_JWT, "requires authlib")
class JWTTestCase(unittest.HomeserverTestCase):
servlets = [
|