diff --git a/tests/handlers/test_oidc.py b/tests/handlers/test_oidc.py
index 0d51705849..e880d32be6 100644
--- a/tests/handlers/test_oidc.py
+++ b/tests/handlers/test_oidc.py
@@ -12,7 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
import json
from urllib.parse import parse_qs, urlparse
@@ -24,12 +23,8 @@ import pymacaroons
from twisted.python.failure import Failure
from twisted.web._newclient import ResponseDone
-from synapse.handlers.oidc_handler import (
- MappingException,
- OidcError,
- OidcHandler,
- OidcMappingProvider,
-)
+from synapse.handlers.oidc_handler import OidcError, OidcHandler, OidcMappingProvider
+from synapse.handlers.sso import MappingException
from synapse.types import UserID
from tests.unittest import HomeserverTestCase, override_config
@@ -94,6 +89,14 @@ class TestMappingProviderExtra(TestMappingProvider):
return {"phone": userinfo["phone"]}
+class TestMappingProviderFailures(TestMappingProvider):
+ async def map_user_attributes(self, userinfo, token, failures):
+ return {
+ "localpart": userinfo["username"] + (str(failures) if failures else ""),
+ "display_name": None,
+ }
+
+
def simple_async_mock(return_value=None, raises=None):
# AsyncMock is not available in python3.5, this mimics part of its behaviour
async def cb(*args, **kwargs):
@@ -132,14 +135,13 @@ class OidcHandlerTestCase(HomeserverTestCase):
config = self.default_config()
config["public_baseurl"] = BASE_URL
- oidc_config = {}
- oidc_config["enabled"] = True
- oidc_config["client_id"] = CLIENT_ID
- oidc_config["client_secret"] = CLIENT_SECRET
- oidc_config["issuer"] = ISSUER
- oidc_config["scopes"] = SCOPES
- oidc_config["user_mapping_provider"] = {
- "module": __name__ + ".TestMappingProvider",
+ oidc_config = {
+ "enabled": True,
+ "client_id": CLIENT_ID,
+ "client_secret": CLIENT_SECRET,
+ "issuer": ISSUER,
+ "scopes": SCOPES,
+ "user_mapping_provider": {"module": __name__ + ".TestMappingProvider"},
}
# Update this config with what's in the default config so that
@@ -154,6 +156,12 @@ class OidcHandlerTestCase(HomeserverTestCase):
)
self.handler = OidcHandler(hs)
+ # Mock the render error method.
+ self.render_error = Mock(return_value=None)
+ self.handler._sso_handler.render_error = self.render_error
+
+ # Reduce the number of attempts when generating MXIDs.
+ self.handler._sso_handler._MAP_USERNAME_RETRIES = 3
return hs
@@ -161,12 +169,12 @@ class OidcHandlerTestCase(HomeserverTestCase):
return patch.dict(self.handler._provider_metadata, values)
def assertRenderedError(self, error, error_description=None):
- args = self.handler._render_error.call_args[0]
+ args = self.render_error.call_args[0]
self.assertEqual(args[1], error)
if error_description is not None:
self.assertEqual(args[2], error_description)
# Reset the render_error mock
- self.handler._render_error.reset_mock()
+ self.render_error.reset_mock()
def test_config(self):
"""Basic config correctly sets up the callback URL and client auth correctly."""
@@ -356,7 +364,6 @@ class OidcHandlerTestCase(HomeserverTestCase):
def test_callback_error(self):
"""Errors from the provider returned in the callback are displayed."""
- self.handler._render_error = Mock()
request = Mock(args={})
request.args[b"error"] = [b"invalid_client"]
self.get_success(self.handler.handle_oidc_callback(request))
@@ -387,7 +394,6 @@ class OidcHandlerTestCase(HomeserverTestCase):
"preferred_username": "bar",
}
user_id = "@foo:domain.org"
- self.handler._render_error = Mock(return_value=None)
self.handler._exchange_code = simple_async_mock(return_value=token)
self.handler._parse_id_token = simple_async_mock(return_value=userinfo)
self.handler._fetch_userinfo = simple_async_mock(return_value=userinfo)
@@ -435,7 +441,7 @@ class OidcHandlerTestCase(HomeserverTestCase):
userinfo, token, user_agent, ip_address
)
self.handler._fetch_userinfo.assert_not_called()
- self.handler._render_error.assert_not_called()
+ self.render_error.assert_not_called()
# Handle mapping errors
self.handler._map_userinfo_to_user = simple_async_mock(
@@ -469,7 +475,7 @@ class OidcHandlerTestCase(HomeserverTestCase):
userinfo, token, user_agent, ip_address
)
self.handler._fetch_userinfo.assert_called_once_with(token)
- self.handler._render_error.assert_not_called()
+ self.render_error.assert_not_called()
# Handle userinfo fetching error
self.handler._fetch_userinfo = simple_async_mock(raises=Exception())
@@ -485,7 +491,6 @@ class OidcHandlerTestCase(HomeserverTestCase):
def test_callback_session(self):
"""The callback verifies the session presence and validity"""
- self.handler._render_error = Mock(return_value=None)
request = Mock(spec=["args", "getCookie", "addCookie"])
# Missing cookie
@@ -699,19 +704,41 @@ class OidcHandlerTestCase(HomeserverTestCase):
),
MappingException,
)
- self.assertEqual(str(e.value), "mxid '@test_user_3:test' is already taken")
+ self.assertEqual(
+ str(e.value),
+ "Could not extract user attributes from SSO response: Mapping provider does not support de-duplicating Matrix IDs",
+ )
@override_config({"oidc_config": {"allow_existing_users": True}})
def test_map_userinfo_to_existing_user(self):
"""Existing users can log in with OpenID Connect when allow_existing_users is True."""
store = self.hs.get_datastore()
- user4 = UserID.from_string("@test_user_4:test")
+ user = UserID.from_string("@test_user:test")
self.get_success(
- store.register_user(user_id=user4.to_string(), password_hash=None)
+ store.register_user(user_id=user.to_string(), password_hash=None)
+ )
+
+ # Map a user via SSO.
+ userinfo = {
+ "sub": "test",
+ "username": "test_user",
+ }
+ token = {}
+ mxid = self.get_success(
+ self.handler._map_userinfo_to_user(
+ userinfo, token, "user-agent", "10.10.10.10"
+ )
)
+ self.assertEqual(mxid, "@test_user:test")
+
+ # Note that a second SSO user can be mapped to the same Matrix ID. (This
+ # requires a unique sub, but something that maps to the same matrix ID,
+ # in this case we'll just use the same username. A more realistic example
+ # would be subs which are email addresses, and mapping from the localpart
+ # of the email, e.g. bob@foo.com and bob@bar.com -> @bob:test.)
userinfo = {
- "sub": "test4",
- "username": "test_user_4",
+ "sub": "test1",
+ "username": "test_user",
}
token = {}
mxid = self.get_success(
@@ -719,4 +746,112 @@ class OidcHandlerTestCase(HomeserverTestCase):
userinfo, token, "user-agent", "10.10.10.10"
)
)
- self.assertEqual(mxid, "@test_user_4:test")
+ self.assertEqual(mxid, "@test_user:test")
+
+ # Register some non-exact matching cases.
+ user2 = UserID.from_string("@TEST_user_2:test")
+ self.get_success(
+ store.register_user(user_id=user2.to_string(), password_hash=None)
+ )
+ user2_caps = UserID.from_string("@test_USER_2:test")
+ self.get_success(
+ store.register_user(user_id=user2_caps.to_string(), password_hash=None)
+ )
+
+ # Attempting to login without matching a name exactly is an error.
+ userinfo = {
+ "sub": "test2",
+ "username": "TEST_USER_2",
+ }
+ e = self.get_failure(
+ self.handler._map_userinfo_to_user(
+ userinfo, token, "user-agent", "10.10.10.10"
+ ),
+ MappingException,
+ )
+ self.assertTrue(
+ str(e.value).startswith(
+ "Attempted to login as '@TEST_USER_2:test' but it matches more than one user inexactly:"
+ )
+ )
+
+ # Logging in when matching a name exactly should work.
+ user2 = UserID.from_string("@TEST_USER_2:test")
+ self.get_success(
+ store.register_user(user_id=user2.to_string(), password_hash=None)
+ )
+
+ mxid = self.get_success(
+ self.handler._map_userinfo_to_user(
+ userinfo, token, "user-agent", "10.10.10.10"
+ )
+ )
+ self.assertEqual(mxid, "@TEST_USER_2:test")
+
+ def test_map_userinfo_to_invalid_localpart(self):
+ """If the mapping provider generates an invalid localpart it should be rejected."""
+ userinfo = {
+ "sub": "test2",
+ "username": "föö",
+ }
+ token = {}
+
+ e = self.get_failure(
+ self.handler._map_userinfo_to_user(
+ userinfo, token, "user-agent", "10.10.10.10"
+ ),
+ MappingException,
+ )
+ self.assertEqual(str(e.value), "localpart is invalid: föö")
+
+ @override_config(
+ {
+ "oidc_config": {
+ "user_mapping_provider": {
+ "module": __name__ + ".TestMappingProviderFailures"
+ }
+ }
+ }
+ )
+ def test_map_userinfo_to_user_retries(self):
+ """The mapping provider can retry generating an MXID if the MXID is already in use."""
+ store = self.hs.get_datastore()
+ self.get_success(
+ store.register_user(user_id="@test_user:test", password_hash=None)
+ )
+ userinfo = {
+ "sub": "test",
+ "username": "test_user",
+ }
+ token = {}
+ mxid = self.get_success(
+ self.handler._map_userinfo_to_user(
+ userinfo, token, "user-agent", "10.10.10.10"
+ )
+ )
+ # test_user is already taken, so test_user1 gets registered instead.
+ self.assertEqual(mxid, "@test_user1:test")
+
+ # Register all of the potential users for a particular username.
+ self.get_success(
+ store.register_user(user_id="@tester:test", password_hash=None)
+ )
+ for i in range(1, 3):
+ self.get_success(
+ store.register_user(user_id="@tester%d:test" % i, password_hash=None)
+ )
+
+ # Now attempt to map to a username, this will fail since all potential usernames are taken.
+ userinfo = {
+ "sub": "tester",
+ "username": "tester",
+ }
+ e = self.get_failure(
+ self.handler._map_userinfo_to_user(
+ userinfo, token, "user-agent", "10.10.10.10"
+ ),
+ MappingException,
+ )
+ self.assertEqual(
+ str(e.value), "Unable to generate a Matrix ID from the SSO response"
+ )
|