diff --git a/tests/handlers/test_oidc.py b/tests/handlers/test_oidc.py
index 5e9c9c2e88..c7796fb837 100644
--- a/tests/handlers/test_oidc.py
+++ b/tests/handlers/test_oidc.py
@@ -989,6 +989,138 @@ class OidcHandlerTestCase(HomeserverTestCase):
self.get_success(_make_callback_with_userinfo(self.hs, userinfo))
self.assertRenderedError("mapping_error", "localpart is invalid: ")
+ @override_config(
+ {
+ "oidc_config": {
+ **DEFAULT_CONFIG,
+ "attribute_requirements": [{"attribute": "test", "value": "foobar"}],
+ }
+ }
+ )
+ def test_attribute_requirements(self):
+ """The required attributes must be met from the OIDC userinfo response."""
+ auth_handler = self.hs.get_auth_handler()
+ auth_handler.complete_sso_login = simple_async_mock()
+
+ # userinfo lacking "test": "foobar" attribute should fail.
+ userinfo = {
+ "sub": "tester",
+ "username": "tester",
+ }
+ self.get_success(_make_callback_with_userinfo(self.hs, userinfo))
+ auth_handler.complete_sso_login.assert_not_called()
+
+ # userinfo with "test": "foobar" attribute should succeed.
+ userinfo = {
+ "sub": "tester",
+ "username": "tester",
+ "test": "foobar",
+ }
+ self.get_success(_make_callback_with_userinfo(self.hs, userinfo))
+
+ # check that the auth handler got called as expected
+ auth_handler.complete_sso_login.assert_called_once_with(
+ "@tester:test", "oidc", ANY, ANY, None, new_user=True
+ )
+
+ @override_config(
+ {
+ "oidc_config": {
+ **DEFAULT_CONFIG,
+ "attribute_requirements": [{"attribute": "test", "value": "foobar"}],
+ }
+ }
+ )
+ def test_attribute_requirements_contains(self):
+ """Test that auth succeeds if userinfo attribute CONTAINS required value"""
+ auth_handler = self.hs.get_auth_handler()
+ auth_handler.complete_sso_login = simple_async_mock()
+ # userinfo with "test": ["foobar", "foo", "bar"] attribute should succeed.
+ userinfo = {
+ "sub": "tester",
+ "username": "tester",
+ "test": ["foobar", "foo", "bar"],
+ }
+ self.get_success(_make_callback_with_userinfo(self.hs, userinfo))
+
+ # check that the auth handler got called as expected
+ auth_handler.complete_sso_login.assert_called_once_with(
+ "@tester:test", "oidc", ANY, ANY, None, new_user=True
+ )
+
+ @override_config(
+ {
+ "oidc_config": {
+ **DEFAULT_CONFIG,
+ "attribute_requirements": [{"attribute": "test", "value": "foobar"}],
+ }
+ }
+ )
+ def test_attribute_requirements_mismatch(self):
+ """
+ Test that auth fails if attributes exist but don't match,
+ or are non-string values.
+ """
+ auth_handler = self.hs.get_auth_handler()
+ auth_handler.complete_sso_login = simple_async_mock()
+ # userinfo with "test": "not_foobar" attribute should fail
+ userinfo = {
+ "sub": "tester",
+ "username": "tester",
+ "test": "not_foobar",
+ }
+ self.get_success(_make_callback_with_userinfo(self.hs, userinfo))
+ auth_handler.complete_sso_login.assert_not_called()
+
+ # userinfo with "test": ["foo", "bar"] attribute should fail
+ userinfo = {
+ "sub": "tester",
+ "username": "tester",
+ "test": ["foo", "bar"],
+ }
+ self.get_success(_make_callback_with_userinfo(self.hs, userinfo))
+ auth_handler.complete_sso_login.assert_not_called()
+
+ # userinfo with "test": False attribute should fail
+ # this is largely just to ensure we don't crash here
+ userinfo = {
+ "sub": "tester",
+ "username": "tester",
+ "test": False,
+ }
+ self.get_success(_make_callback_with_userinfo(self.hs, userinfo))
+ auth_handler.complete_sso_login.assert_not_called()
+
+ # userinfo with "test": None attribute should fail
+ # a value of None breaks the OIDC spec, but it's important to not crash here
+ userinfo = {
+ "sub": "tester",
+ "username": "tester",
+ "test": None,
+ }
+ self.get_success(_make_callback_with_userinfo(self.hs, userinfo))
+ auth_handler.complete_sso_login.assert_not_called()
+
+ # userinfo with "test": 1 attribute should fail
+ # this is largely just to ensure we don't crash here
+ userinfo = {
+ "sub": "tester",
+ "username": "tester",
+ "test": 1,
+ }
+ self.get_success(_make_callback_with_userinfo(self.hs, userinfo))
+ auth_handler.complete_sso_login.assert_not_called()
+
+ # userinfo with "test": 3.14 attribute should fail
+ # this is largely just to ensure we don't crash here
+ userinfo = {
+ "sub": "tester",
+ "username": "tester",
+ "test": 3.14,
+ }
+ self.get_success(_make_callback_with_userinfo(self.hs, userinfo))
+ auth_handler.complete_sso_login.assert_not_called()
+
def _generate_oidc_session_token(
self,
state: str,
diff --git a/tests/rest/client/v2_alpha/test_capabilities.py b/tests/rest/client/v2_alpha/test_capabilities.py
index e808339fb3..287a1a485c 100644
--- a/tests/rest/client/v2_alpha/test_capabilities.py
+++ b/tests/rest/client/v2_alpha/test_capabilities.py
@@ -18,6 +18,7 @@ from synapse.rest.client.v1 import login
from synapse.rest.client.v2_alpha import capabilities
from tests import unittest
+from tests.unittest import override_config
class CapabilitiesTestCase(unittest.HomeserverTestCase):
@@ -33,6 +34,7 @@ class CapabilitiesTestCase(unittest.HomeserverTestCase):
hs = self.setup_test_homeserver()
self.store = hs.get_datastore()
self.config = hs.config
+ self.auth_handler = hs.get_auth_handler()
return hs
def test_check_auth_required(self):
@@ -56,7 +58,7 @@ class CapabilitiesTestCase(unittest.HomeserverTestCase):
capabilities["m.room_versions"]["default"],
)
- def test_get_change_password_capabilities(self):
+ def test_get_change_password_capabilities_password_login(self):
localpart = "user"
password = "pass"
user = self.register_user(localpart, password)
@@ -66,10 +68,36 @@ class CapabilitiesTestCase(unittest.HomeserverTestCase):
capabilities = channel.json_body["capabilities"]
self.assertEqual(channel.code, 200)
-
- # Test case where password is handled outside of Synapse
self.assertTrue(capabilities["m.change_password"]["enabled"])
- self.get_success(self.store.user_set_password_hash(user, None))
+
+ @override_config({"password_config": {"localdb_enabled": False}})
+ def test_get_change_password_capabilities_localdb_disabled(self):
+ localpart = "user"
+ password = "pass"
+ user = self.register_user(localpart, password)
+ access_token = self.get_success(
+ self.auth_handler.get_access_token_for_user_id(
+ user, device_id=None, valid_until_ms=None
+ )
+ )
+
+ channel = self.make_request("GET", self.url, access_token=access_token)
+ capabilities = channel.json_body["capabilities"]
+
+ self.assertEqual(channel.code, 200)
+ self.assertFalse(capabilities["m.change_password"]["enabled"])
+
+ @override_config({"password_config": {"enabled": False}})
+ def test_get_change_password_capabilities_password_disabled(self):
+ localpart = "user"
+ password = "pass"
+ user = self.register_user(localpart, password)
+ access_token = self.get_success(
+ self.auth_handler.get_access_token_for_user_id(
+ user, device_id=None, valid_until_ms=None
+ )
+ )
+
channel = self.make_request("GET", self.url, access_token=access_token)
capabilities = channel.json_body["capabilities"]
diff --git a/tests/unittest.py b/tests/unittest.py
index ca7031c724..224f037ce1 100644
--- a/tests/unittest.py
+++ b/tests/unittest.py
@@ -140,7 +140,7 @@ class TestCase(unittest.TestCase):
try:
self.assertEquals(attrs[key], getattr(obj, key))
except AssertionError as e:
- raise (type(e))(e.message + " for '.%s'" % key)
+ raise (type(e))("Assert error for '.{}':".format(key)) from e
def assert_dict(self, required, actual):
"""Does a partial assert of a dict.
|