summary refs log tree commit diff
path: root/tests/rest/client
diff options
context:
space:
mode:
Diffstat (limited to 'tests/rest/client')
-rw-r--r--tests/rest/client/test_auth.py32
-rw-r--r--tests/rest/client/test_login.py40
-rw-r--r--tests/rest/client/utils.py136
3 files changed, 120 insertions, 88 deletions
diff --git a/tests/rest/client/test_auth.py b/tests/rest/client/test_auth.py
index 090cef5216..ebf653d018 100644
--- a/tests/rest/client/test_auth.py
+++ b/tests/rest/client/test_auth.py
@@ -465,9 +465,11 @@ class UIAuthTests(unittest.HomeserverTestCase):
           * checking that the original operation succeeds
         """
 
+        fake_oidc_server = self.helper.fake_oidc_server()
+
         # log the user in
         remote_user_id = UserID.from_string(self.user).localpart
-        login_resp = self.helper.login_via_oidc(remote_user_id)
+        login_resp, _ = self.helper.login_via_oidc(fake_oidc_server, remote_user_id)
         self.assertEqual(login_resp["user_id"], self.user)
 
         # initiate a UI Auth process by attempting to delete the device
@@ -481,8 +483,8 @@ class UIAuthTests(unittest.HomeserverTestCase):
 
         # run the UIA-via-SSO flow
         session_id = channel.json_body["session"]
-        channel = self.helper.auth_via_oidc(
-            {"sub": remote_user_id}, ui_auth_session_id=session_id
+        channel, _ = self.helper.auth_via_oidc(
+            fake_oidc_server, {"sub": remote_user_id}, ui_auth_session_id=session_id
         )
 
         # that should serve a confirmation page
@@ -499,7 +501,8 @@ class UIAuthTests(unittest.HomeserverTestCase):
     @skip_unless(HAS_OIDC, "requires OIDC")
     @override_config({"oidc_config": TEST_OIDC_CONFIG})
     def test_does_not_offer_password_for_sso_user(self) -> None:
-        login_resp = self.helper.login_via_oidc("username")
+        fake_oidc_server = self.helper.fake_oidc_server()
+        login_resp, _ = self.helper.login_via_oidc(fake_oidc_server, "username")
         user_tok = login_resp["access_token"]
         device_id = login_resp["device_id"]
 
@@ -522,7 +525,10 @@ class UIAuthTests(unittest.HomeserverTestCase):
     @override_config({"oidc_config": TEST_OIDC_CONFIG})
     def test_offers_both_flows_for_upgraded_user(self) -> None:
         """A user that had a password and then logged in with SSO should get both flows"""
-        login_resp = self.helper.login_via_oidc(UserID.from_string(self.user).localpart)
+        fake_oidc_server = self.helper.fake_oidc_server()
+        login_resp, _ = self.helper.login_via_oidc(
+            fake_oidc_server, UserID.from_string(self.user).localpart
+        )
         self.assertEqual(login_resp["user_id"], self.user)
 
         channel = self.delete_device(
@@ -539,8 +545,13 @@ class UIAuthTests(unittest.HomeserverTestCase):
     @override_config({"oidc_config": TEST_OIDC_CONFIG})
     def test_ui_auth_fails_for_incorrect_sso_user(self) -> None:
         """If the user tries to authenticate with the wrong SSO user, they get an error"""
+
+        fake_oidc_server = self.helper.fake_oidc_server()
+
         # log the user in
-        login_resp = self.helper.login_via_oidc(UserID.from_string(self.user).localpart)
+        login_resp, _ = self.helper.login_via_oidc(
+            fake_oidc_server, UserID.from_string(self.user).localpart
+        )
         self.assertEqual(login_resp["user_id"], self.user)
 
         # start a UI Auth flow by attempting to delete a device
@@ -553,8 +564,8 @@ class UIAuthTests(unittest.HomeserverTestCase):
         session_id = channel.json_body["session"]
 
         # do the OIDC auth, but auth as the wrong user
-        channel = self.helper.auth_via_oidc(
-            {"sub": "wrong_user"}, ui_auth_session_id=session_id
+        channel, _ = self.helper.auth_via_oidc(
+            fake_oidc_server, {"sub": "wrong_user"}, ui_auth_session_id=session_id
         )
 
         # that should return a failure message
@@ -584,7 +595,10 @@ class UIAuthTests(unittest.HomeserverTestCase):
         """Tests that if we register a user via SSO while requiring approval for new
         accounts, we still raise the correct error before logging the user in.
         """
-        login_resp = self.helper.login_via_oidc("username", expected_status=403)
+        fake_oidc_server = self.helper.fake_oidc_server()
+        login_resp, _ = self.helper.login_via_oidc(
+            fake_oidc_server, "username", expected_status=403
+        )
 
         self.assertEqual(login_resp["errcode"], Codes.USER_AWAITING_APPROVAL)
         self.assertEqual(
diff --git a/tests/rest/client/test_login.py b/tests/rest/client/test_login.py
index e801ba8c8b..ff5baa9f0a 100644
--- a/tests/rest/client/test_login.py
+++ b/tests/rest/client/test_login.py
@@ -36,7 +36,7 @@ from synapse.util import Clock
 from tests import unittest
 from tests.handlers.test_oidc import HAS_OIDC
 from tests.handlers.test_saml import has_saml2
-from tests.rest.client.utils import TEST_OIDC_AUTH_ENDPOINT, TEST_OIDC_CONFIG
+from tests.rest.client.utils import TEST_OIDC_CONFIG
 from tests.server import FakeChannel
 from tests.test_utils.html_parsers import TestHtmlParser
 from tests.unittest import HomeserverTestCase, override_config, skip_unless
@@ -612,13 +612,16 @@ class MultiSSOTestCase(unittest.HomeserverTestCase):
     def test_login_via_oidc(self) -> None:
         """If OIDC is chosen, should redirect to the OIDC auth endpoint"""
 
-        # pick the default OIDC provider
-        channel = self.make_request(
-            "GET",
-            "/_synapse/client/pick_idp?redirectUrl="
-            + urllib.parse.quote_plus(TEST_CLIENT_REDIRECT_URL)
-            + "&idp=oidc",
-        )
+        fake_oidc_server = self.helper.fake_oidc_server()
+
+        with fake_oidc_server.patch_homeserver(hs=self.hs):
+            # pick the default OIDC provider
+            channel = self.make_request(
+                "GET",
+                "/_synapse/client/pick_idp?redirectUrl="
+                + urllib.parse.quote_plus(TEST_CLIENT_REDIRECT_URL)
+                + "&idp=oidc",
+            )
         self.assertEqual(channel.code, 302, channel.result)
         location_headers = channel.headers.getRawHeaders("Location")
         assert location_headers
@@ -626,7 +629,7 @@ class MultiSSOTestCase(unittest.HomeserverTestCase):
         oidc_uri_path, oidc_uri_query = oidc_uri.split("?", 1)
 
         # it should redirect us to the auth page of the OIDC server
-        self.assertEqual(oidc_uri_path, TEST_OIDC_AUTH_ENDPOINT)
+        self.assertEqual(oidc_uri_path, fake_oidc_server.authorization_endpoint)
 
         # ... and should have set a cookie including the redirect url
         cookie_headers = channel.headers.getRawHeaders("Set-Cookie")
@@ -643,7 +646,9 @@ class MultiSSOTestCase(unittest.HomeserverTestCase):
             TEST_CLIENT_REDIRECT_URL,
         )
 
-        channel = self.helper.complete_oidc_auth(oidc_uri, cookies, {"sub": "user1"})
+        channel, _ = self.helper.complete_oidc_auth(
+            fake_oidc_server, oidc_uri, cookies, {"sub": "user1"}
+        )
 
         # that should serve a confirmation page
         self.assertEqual(channel.code, 200, channel.result)
@@ -693,7 +698,10 @@ class MultiSSOTestCase(unittest.HomeserverTestCase):
 
     def test_client_idp_redirect_to_oidc(self) -> None:
         """If the client pick a known IdP, redirect to it"""
-        channel = self._make_sso_redirect_request("oidc")
+        fake_oidc_server = self.helper.fake_oidc_server()
+
+        with fake_oidc_server.patch_homeserver(hs=self.hs):
+            channel = self._make_sso_redirect_request("oidc")
         self.assertEqual(channel.code, 302, channel.result)
         location_headers = channel.headers.getRawHeaders("Location")
         assert location_headers
@@ -701,7 +709,7 @@ class MultiSSOTestCase(unittest.HomeserverTestCase):
         oidc_uri_path, oidc_uri_query = oidc_uri.split("?", 1)
 
         # it should redirect us to the auth page of the OIDC server
-        self.assertEqual(oidc_uri_path, TEST_OIDC_AUTH_ENDPOINT)
+        self.assertEqual(oidc_uri_path, fake_oidc_server.authorization_endpoint)
 
     def _make_sso_redirect_request(self, idp_prov: Optional[str] = None) -> FakeChannel:
         """Send a request to /_matrix/client/r0/login/sso/redirect
@@ -1280,9 +1288,13 @@ class UsernamePickerTestCase(HomeserverTestCase):
     def test_username_picker(self) -> None:
         """Test the happy path of a username picker flow."""
 
+        fake_oidc_server = self.helper.fake_oidc_server()
+
         # do the start of the login flow
-        channel = self.helper.auth_via_oidc(
-            {"sub": "tester", "displayname": "Jonny"}, TEST_CLIENT_REDIRECT_URL
+        channel, _ = self.helper.auth_via_oidc(
+            fake_oidc_server,
+            {"sub": "tester", "displayname": "Jonny"},
+            TEST_CLIENT_REDIRECT_URL,
         )
 
         # that should redirect to the username picker
diff --git a/tests/rest/client/utils.py b/tests/rest/client/utils.py
index c249a42bb6..967d229223 100644
--- a/tests/rest/client/utils.py
+++ b/tests/rest/client/utils.py
@@ -31,7 +31,6 @@ from typing import (
     Tuple,
     overload,
 )
-from unittest.mock import patch
 from urllib.parse import urlencode
 
 import attr
@@ -46,8 +45,19 @@ from synapse.server import HomeServer
 from synapse.types import JsonDict
 
 from tests.server import FakeChannel, FakeSite, make_request
-from tests.test_utils import FakeResponse
 from tests.test_utils.html_parsers import TestHtmlParser
+from tests.test_utils.oidc import FakeAuthorizationGrant, FakeOidcServer
+
+# an 'oidc_config' suitable for login_via_oidc.
+TEST_OIDC_ISSUER = "https://issuer.test/"
+TEST_OIDC_CONFIG = {
+    "enabled": True,
+    "issuer": TEST_OIDC_ISSUER,
+    "client_id": "test-client-id",
+    "client_secret": "test-client-secret",
+    "scopes": ["openid"],
+    "user_mapping_provider": {"config": {"localpart_template": "{{ user.sub }}"}},
+}
 
 
 @attr.s(auto_attribs=True)
@@ -543,12 +553,28 @@ class RestHelper:
 
         return channel.json_body
 
+    def fake_oidc_server(self, issuer: str = TEST_OIDC_ISSUER) -> FakeOidcServer:
+        """Create a ``FakeOidcServer``.
+
+        This can be used in conjuction with ``login_via_oidc``::
+
+            fake_oidc_server = self.helper.fake_oidc_server()
+            login_data, _ = self.helper.login_via_oidc(fake_oidc_server, "user")
+        """
+
+        return FakeOidcServer(
+            clock=self.hs.get_clock(),
+            issuer=issuer,
+        )
+
     def login_via_oidc(
         self,
+        fake_server: FakeOidcServer,
         remote_user_id: str,
+        with_sid: bool = False,
         expected_status: int = 200,
-    ) -> JsonDict:
-        """Log in via OIDC
+    ) -> Tuple[JsonDict, FakeAuthorizationGrant]:
+        """Log in (as a new user) via OIDC
 
         Returns the result of the final token login.
 
@@ -560,7 +586,10 @@ class RestHelper:
         the normal places.
         """
         client_redirect_url = "https://x"
-        channel = self.auth_via_oidc({"sub": remote_user_id}, client_redirect_url)
+        userinfo = {"sub": remote_user_id}
+        channel, grant = self.auth_via_oidc(
+            fake_server, userinfo, client_redirect_url, with_sid=with_sid
+        )
 
         # expect a confirmation page
         assert channel.code == HTTPStatus.OK, channel.result
@@ -585,14 +614,16 @@ class RestHelper:
         assert (
             channel.code == expected_status
         ), f"unexpected status in response: {channel.code}"
-        return channel.json_body
+        return channel.json_body, grant
 
     def auth_via_oidc(
         self,
+        fake_server: FakeOidcServer,
         user_info_dict: JsonDict,
         client_redirect_url: Optional[str] = None,
         ui_auth_session_id: Optional[str] = None,
-    ) -> FakeChannel:
+        with_sid: bool = False,
+    ) -> Tuple[FakeChannel, FakeAuthorizationGrant]:
         """Perform an OIDC authentication flow via a mock OIDC provider.
 
         This can be used for either login or user-interactive auth.
@@ -616,6 +647,7 @@ class RestHelper:
                 the login redirect endpoint
             ui_auth_session_id: if set, we will perform a UI Auth flow. The session id
                 of the UI auth.
+            with_sid: if True, generates a random `sid` (OIDC session ID)
 
         Returns:
             A FakeChannel containing the result of calling the OIDC callback endpoint.
@@ -625,14 +657,15 @@ class RestHelper:
 
         cookies: Dict[str, str] = {}
 
-        # if we're doing a ui auth, hit the ui auth redirect endpoint
-        if ui_auth_session_id:
-            # can't set the client redirect url for UI Auth
-            assert client_redirect_url is None
-            oauth_uri = self.initiate_sso_ui_auth(ui_auth_session_id, cookies)
-        else:
-            # otherwise, hit the login redirect endpoint
-            oauth_uri = self.initiate_sso_login(client_redirect_url, cookies)
+        with fake_server.patch_homeserver(hs=self.hs):
+            # if we're doing a ui auth, hit the ui auth redirect endpoint
+            if ui_auth_session_id:
+                # can't set the client redirect url for UI Auth
+                assert client_redirect_url is None
+                oauth_uri = self.initiate_sso_ui_auth(ui_auth_session_id, cookies)
+            else:
+                # otherwise, hit the login redirect endpoint
+                oauth_uri = self.initiate_sso_login(client_redirect_url, cookies)
 
         # we now have a URI for the OIDC IdP, but we skip that and go straight
         # back to synapse's OIDC callback resource. However, we do need the "state"
@@ -640,17 +673,21 @@ class RestHelper:
         # that synapse passes to the client.
 
         oauth_uri_path, _ = oauth_uri.split("?", 1)
-        assert oauth_uri_path == TEST_OIDC_AUTH_ENDPOINT, (
+        assert oauth_uri_path == fake_server.authorization_endpoint, (
             "unexpected SSO URI " + oauth_uri_path
         )
-        return self.complete_oidc_auth(oauth_uri, cookies, user_info_dict)
+        return self.complete_oidc_auth(
+            fake_server, oauth_uri, cookies, user_info_dict, with_sid=with_sid
+        )
 
     def complete_oidc_auth(
         self,
+        fake_serer: FakeOidcServer,
         oauth_uri: str,
         cookies: Mapping[str, str],
         user_info_dict: JsonDict,
-    ) -> FakeChannel:
+        with_sid: bool = False,
+    ) -> Tuple[FakeChannel, FakeAuthorizationGrant]:
         """Mock out an OIDC authentication flow
 
         Assumes that an OIDC auth has been initiated by one of initiate_sso_login or
@@ -661,50 +698,37 @@ class RestHelper:
         Requires the OIDC callback resource to be mounted at the normal place.
 
         Args:
+            fake_server: the fake OIDC server with which the auth should be done
             oauth_uri: the OIDC URI returned by synapse's redirect endpoint (ie,
                from initiate_sso_login or initiate_sso_ui_auth).
             cookies: the cookies set by synapse's redirect endpoint, which will be
                sent back to the callback endpoint.
             user_info_dict: the remote userinfo that the OIDC provider should present.
                 Typically this should be '{"sub": "<remote user id>"}'.
+            with_sid: if True, generates a random `sid` (OIDC session ID)
 
         Returns:
             A FakeChannel containing the result of calling the OIDC callback endpoint.
         """
         _, oauth_uri_qs = oauth_uri.split("?", 1)
         params = urllib.parse.parse_qs(oauth_uri_qs)
+
+        code, grant = fake_serer.start_authorization(
+            scope=params["scope"][0],
+            userinfo=user_info_dict,
+            client_id=params["client_id"][0],
+            redirect_uri=params["redirect_uri"][0],
+            nonce=params["nonce"][0],
+            with_sid=with_sid,
+        )
+        state = params["state"][0]
+
         callback_uri = "%s?%s" % (
             urllib.parse.urlparse(params["redirect_uri"][0]).path,
-            urllib.parse.urlencode({"state": params["state"][0], "code": "TEST_CODE"}),
+            urllib.parse.urlencode({"state": state, "code": code}),
         )
 
-        # before we hit the callback uri, stub out some methods in the http client so
-        # that we don't have to handle full HTTPS requests.
-        # (expected url, json response) pairs, in the order we expect them.
-        expected_requests = [
-            # first we get a hit to the token endpoint, which we tell to return
-            # a dummy OIDC access token
-            (TEST_OIDC_TOKEN_ENDPOINT, {"access_token": "TEST"}),
-            # and then one to the user_info endpoint, which returns our remote user id.
-            (TEST_OIDC_USERINFO_ENDPOINT, user_info_dict),
-        ]
-
-        async def mock_req(
-            method: str,
-            uri: str,
-            data: Optional[dict] = None,
-            headers: Optional[Iterable[Tuple[AnyStr, AnyStr]]] = None,
-        ):
-            (expected_uri, resp_obj) = expected_requests.pop(0)
-            assert uri == expected_uri
-            resp = FakeResponse(
-                code=HTTPStatus.OK,
-                phrase=b"OK",
-                body=json.dumps(resp_obj).encode("utf-8"),
-            )
-            return resp
-
-        with patch.object(self.hs.get_proxied_http_client(), "request", mock_req):
+        with fake_serer.patch_homeserver(hs=self.hs):
             # now hit the callback URI with the right params and a made-up code
             channel = make_request(
                 self.hs.get_reactor(),
@@ -715,7 +739,7 @@ class RestHelper:
                     ("Cookie", "%s=%s" % (k, v)) for (k, v) in cookies.items()
                 ],
             )
-        return channel
+        return channel, grant
 
     def initiate_sso_login(
         self, client_redirect_url: Optional[str], cookies: MutableMapping[str, str]
@@ -806,21 +830,3 @@ class RestHelper:
         assert len(p.links) == 1, "not exactly one link in confirmation page"
         oauth_uri = p.links[0]
         return oauth_uri
-
-
-# an 'oidc_config' suitable for login_via_oidc.
-TEST_OIDC_AUTH_ENDPOINT = "https://issuer.test/auth"
-TEST_OIDC_TOKEN_ENDPOINT = "https://issuer.test/token"
-TEST_OIDC_USERINFO_ENDPOINT = "https://issuer.test/userinfo"
-TEST_OIDC_CONFIG = {
-    "enabled": True,
-    "discover": False,
-    "issuer": "https://issuer.test",
-    "client_id": "test-client-id",
-    "client_secret": "test-client-secret",
-    "scopes": ["profile"],
-    "authorization_endpoint": TEST_OIDC_AUTH_ENDPOINT,
-    "token_endpoint": TEST_OIDC_TOKEN_ENDPOINT,
-    "userinfo_endpoint": TEST_OIDC_USERINFO_ENDPOINT,
-    "user_mapping_provider": {"config": {"localpart_template": "{{ user.sub }}"}},
-}