summary refs log tree commit diff
path: root/tests/handlers
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--tests/handlers/test_oidc.py88
1 files changed, 87 insertions, 1 deletions
diff --git a/tests/handlers/test_oidc.py b/tests/handlers/test_oidc.py
index b4fa02acc4..e880d32be6 100644
--- a/tests/handlers/test_oidc.py
+++ b/tests/handlers/test_oidc.py
@@ -89,6 +89,14 @@ class TestMappingProviderExtra(TestMappingProvider):
         return {"phone": userinfo["phone"]}
 
 
+class TestMappingProviderFailures(TestMappingProvider):
+    async def map_user_attributes(self, userinfo, token, failures):
+        return {
+            "localpart": userinfo["username"] + (str(failures) if failures else ""),
+            "display_name": None,
+        }
+
+
 def simple_async_mock(return_value=None, raises=None):
     # AsyncMock is not available in python3.5, this mimics part of its behaviour
     async def cb(*args, **kwargs):
@@ -152,6 +160,9 @@ class OidcHandlerTestCase(HomeserverTestCase):
         self.render_error = Mock(return_value=None)
         self.handler._sso_handler.render_error = self.render_error
 
+        # Reduce the number of attempts when generating MXIDs.
+        self.handler._sso_handler._MAP_USERNAME_RETRIES = 3
+
         return hs
 
     def metadata_edit(self, values):
@@ -693,7 +704,10 @@ class OidcHandlerTestCase(HomeserverTestCase):
             ),
             MappingException,
         )
-        self.assertEqual(str(e.value), "mxid '@test_user_3:test' is already taken")
+        self.assertEqual(
+            str(e.value),
+            "Could not extract user attributes from SSO response: Mapping provider does not support de-duplicating Matrix IDs",
+        )
 
     @override_config({"oidc_config": {"allow_existing_users": True}})
     def test_map_userinfo_to_existing_user(self):
@@ -703,6 +717,8 @@ class OidcHandlerTestCase(HomeserverTestCase):
         self.get_success(
             store.register_user(user_id=user.to_string(), password_hash=None)
         )
+
+        # Map a user via SSO.
         userinfo = {
             "sub": "test",
             "username": "test_user",
@@ -715,6 +731,23 @@ class OidcHandlerTestCase(HomeserverTestCase):
         )
         self.assertEqual(mxid, "@test_user:test")
 
+        # Note that a second SSO user can be mapped to the same Matrix ID. (This
+        # requires a unique sub, but something that maps to the same matrix ID,
+        # in this case we'll just use the same username. A more realistic example
+        # would be subs which are email addresses, and mapping from the localpart
+        # of the email, e.g. bob@foo.com and bob@bar.com -> @bob:test.)
+        userinfo = {
+            "sub": "test1",
+            "username": "test_user",
+        }
+        token = {}
+        mxid = self.get_success(
+            self.handler._map_userinfo_to_user(
+                userinfo, token, "user-agent", "10.10.10.10"
+            )
+        )
+        self.assertEqual(mxid, "@test_user:test")
+
         # Register some non-exact matching cases.
         user2 = UserID.from_string("@TEST_user_2:test")
         self.get_success(
@@ -762,6 +795,7 @@ class OidcHandlerTestCase(HomeserverTestCase):
             "username": "föö",
         }
         token = {}
+
         e = self.get_failure(
             self.handler._map_userinfo_to_user(
                 userinfo, token, "user-agent", "10.10.10.10"
@@ -769,3 +803,55 @@ class OidcHandlerTestCase(HomeserverTestCase):
             MappingException,
         )
         self.assertEqual(str(e.value), "localpart is invalid: föö")
+
+    @override_config(
+        {
+            "oidc_config": {
+                "user_mapping_provider": {
+                    "module": __name__ + ".TestMappingProviderFailures"
+                }
+            }
+        }
+    )
+    def test_map_userinfo_to_user_retries(self):
+        """The mapping provider can retry generating an MXID if the MXID is already in use."""
+        store = self.hs.get_datastore()
+        self.get_success(
+            store.register_user(user_id="@test_user:test", password_hash=None)
+        )
+        userinfo = {
+            "sub": "test",
+            "username": "test_user",
+        }
+        token = {}
+        mxid = self.get_success(
+            self.handler._map_userinfo_to_user(
+                userinfo, token, "user-agent", "10.10.10.10"
+            )
+        )
+        # test_user is already taken, so test_user1 gets registered instead.
+        self.assertEqual(mxid, "@test_user1:test")
+
+        # Register all of the potential users for a particular username.
+        self.get_success(
+            store.register_user(user_id="@tester:test", password_hash=None)
+        )
+        for i in range(1, 3):
+            self.get_success(
+                store.register_user(user_id="@tester%d:test" % i, password_hash=None)
+            )
+
+        # Now attempt to map to a username, this will fail since all potential usernames are taken.
+        userinfo = {
+            "sub": "tester",
+            "username": "tester",
+        }
+        e = self.get_failure(
+            self.handler._map_userinfo_to_user(
+                userinfo, token, "user-agent", "10.10.10.10"
+            ),
+            MappingException,
+        )
+        self.assertEqual(
+            str(e.value), "Unable to generate a Matrix ID from the SSO response"
+        )