diff --git a/synapse/handlers/oidc_handler.py b/synapse/handlers/oidc_handler.py
index 6835c6c462..1607e12935 100644
--- a/synapse/handlers/oidc_handler.py
+++ b/synapse/handlers/oidc_handler.py
@@ -14,7 +14,7 @@
# limitations under the License.
import inspect
import logging
-from typing import TYPE_CHECKING, Dict, Generic, List, Optional, Tuple, TypeVar
+from typing import TYPE_CHECKING, Dict, Generic, List, Optional, TypeVar
from urllib.parse import urlencode
import attr
@@ -35,7 +35,7 @@ from typing_extensions import TypedDict
from twisted.web.client import readBody
from synapse.config import ConfigError
-from synapse.handlers._base import BaseHandler
+from synapse.config.oidc_config import OidcProviderConfig
from synapse.handlers.sso import MappingException, UserAttributes
from synapse.http.site import SynapseRequest
from synapse.logging.context import make_deferred_yieldable
@@ -71,6 +71,144 @@ JWK = Dict[str, str]
JWKS = TypedDict("JWKS", {"keys": List[JWK]})
+class OidcHandler:
+ """Handles requests related to the OpenID Connect login flow.
+ """
+
+ def __init__(self, hs: "HomeServer"):
+ self._sso_handler = hs.get_sso_handler()
+
+ provider_confs = hs.config.oidc.oidc_providers
+ # we should not have been instantiated if there is no configured provider.
+ assert provider_confs
+
+ self._token_generator = OidcSessionTokenGenerator(hs)
+ self._providers = {
+ p.idp_id: OidcProvider(hs, self._token_generator, p) for p in provider_confs
+ } # type: Dict[str, OidcProvider]
+
+ async def load_metadata(self) -> None:
+ """Validate the config and load the metadata from the remote endpoint.
+
+ Called at startup to ensure we have everything we need.
+ """
+ for idp_id, p in self._providers.items():
+ try:
+ await p.load_metadata()
+ await p.load_jwks()
+ except Exception as e:
+ raise Exception(
+ "Error while initialising OIDC provider %r" % (idp_id,)
+ ) from e
+
+ async def handle_oidc_callback(self, request: SynapseRequest) -> None:
+ """Handle an incoming request to /_synapse/oidc/callback
+
+ Since we might want to display OIDC-related errors in a user-friendly
+ way, we don't raise SynapseError from here. Instead, we call
+ ``self._sso_handler.render_error`` which displays an HTML page for the error.
+
+ Most of the OpenID Connect logic happens here:
+
+ - first, we check if there was any error returned by the provider and
+ display it
+ - then we fetch the session cookie, decode and verify it
+ - the ``state`` query parameter should match with the one stored in the
+ session cookie
+
+ Once we know the session is legit, we then delegate to the OIDC Provider
+ implementation, which will exchange the code with the provider and complete the
+ login/authentication.
+
+ Args:
+ request: the incoming request from the browser.
+ """
+
+ # The provider might redirect with an error.
+ # In that case, just display it as-is.
+ if b"error" in request.args:
+ # error response from the auth server. see:
+ # https://tools.ietf.org/html/rfc6749#section-4.1.2.1
+ # https://openid.net/specs/openid-connect-core-1_0.html#AuthError
+ error = request.args[b"error"][0].decode()
+ description = request.args.get(b"error_description", [b""])[0].decode()
+
+ # Most of the errors returned by the provider could be due by
+ # either the provider misbehaving or Synapse being misconfigured.
+ # The only exception of that is "access_denied", where the user
+ # probably cancelled the login flow. In other cases, log those errors.
+ if error != "access_denied":
+ logger.error("Error from the OIDC provider: %s %s", error, description)
+
+ self._sso_handler.render_error(request, error, description)
+ return
+
+ # otherwise, it is presumably a successful response. see:
+ # https://tools.ietf.org/html/rfc6749#section-4.1.2
+
+ # Fetch the session cookie
+ session = request.getCookie(SESSION_COOKIE_NAME) # type: Optional[bytes]
+ if session is None:
+ logger.info("No session cookie found")
+ self._sso_handler.render_error(
+ request, "missing_session", "No session cookie found"
+ )
+ return
+
+ # Remove the cookie. There is a good chance that if the callback failed
+ # once, it will fail next time and the code will already be exchanged.
+ # Removing it early avoids spamming the provider with token requests.
+ request.addCookie(
+ SESSION_COOKIE_NAME,
+ b"",
+ path="/_synapse/oidc",
+ expires="Thu, Jan 01 1970 00:00:00 UTC",
+ httpOnly=True,
+ sameSite="lax",
+ )
+
+ # Check for the state query parameter
+ if b"state" not in request.args:
+ logger.info("State parameter is missing")
+ self._sso_handler.render_error(
+ request, "invalid_request", "State parameter is missing"
+ )
+ return
+
+ state = request.args[b"state"][0].decode()
+
+ # Deserialize the session token and verify it.
+ try:
+ session_data = self._token_generator.verify_oidc_session_token(
+ session, state
+ )
+ except (MacaroonDeserializationException, ValueError) as e:
+ logger.exception("Invalid session")
+ self._sso_handler.render_error(request, "invalid_session", str(e))
+ return
+ except MacaroonInvalidSignatureException as e:
+ logger.exception("Could not verify session")
+ self._sso_handler.render_error(request, "mismatching_session", str(e))
+ return
+
+ oidc_provider = self._providers.get(session_data.idp_id)
+ if not oidc_provider:
+ logger.error("OIDC session uses unknown IdP %r", oidc_provider)
+ self._sso_handler.render_error(request, "unknown_idp", "Unknown IdP")
+ return
+
+ if b"code" not in request.args:
+ logger.info("Code parameter is missing")
+ self._sso_handler.render_error(
+ request, "invalid_request", "Code parameter is missing"
+ )
+ return
+
+ code = request.args[b"code"][0].decode()
+
+ await oidc_provider.handle_oidc_callback(request, session_data, code)
+
+
class OidcError(Exception):
"""Used to catch errors when calling the token_endpoint
"""
@@ -85,44 +223,56 @@ class OidcError(Exception):
return self.error
-class OidcHandler(BaseHandler):
- """Handles requests related to the OpenID Connect login flow.
+class OidcProvider:
+ """Wraps the config for a single OIDC IdentityProvider
+
+ Provides methods for handling redirect requests and callbacks via that particular
+ IdP.
"""
- def __init__(self, hs: "HomeServer"):
- super().__init__(hs)
+ def __init__(
+ self,
+ hs: "HomeServer",
+ token_generator: "OidcSessionTokenGenerator",
+ provider: OidcProviderConfig,
+ ):
+ self._store = hs.get_datastore()
+
+ self._token_generator = token_generator
+
self._callback_url = hs.config.oidc_callback_url # type: str
- self._scopes = hs.config.oidc_scopes # type: List[str]
- self._user_profile_method = hs.config.oidc_user_profile_method # type: str
+
+ self._scopes = provider.scopes
+ self._user_profile_method = provider.user_profile_method
self._client_auth = ClientAuth(
- hs.config.oidc_client_id,
- hs.config.oidc_client_secret,
- hs.config.oidc_client_auth_method,
+ provider.client_id, provider.client_secret, provider.client_auth_method,
) # type: ClientAuth
- self._client_auth_method = hs.config.oidc_client_auth_method # type: str
+ self._client_auth_method = provider.client_auth_method
self._provider_metadata = OpenIDProviderMetadata(
- issuer=hs.config.oidc_issuer,
- authorization_endpoint=hs.config.oidc_authorization_endpoint,
- token_endpoint=hs.config.oidc_token_endpoint,
- userinfo_endpoint=hs.config.oidc_userinfo_endpoint,
- jwks_uri=hs.config.oidc_jwks_uri,
+ issuer=provider.issuer,
+ authorization_endpoint=provider.authorization_endpoint,
+ token_endpoint=provider.token_endpoint,
+ userinfo_endpoint=provider.userinfo_endpoint,
+ jwks_uri=provider.jwks_uri,
) # type: OpenIDProviderMetadata
- self._provider_needs_discovery = hs.config.oidc_discover # type: bool
- self._user_mapping_provider = hs.config.oidc_user_mapping_provider_class(
- hs.config.oidc_user_mapping_provider_config
- ) # type: OidcMappingProvider
- self._skip_verification = hs.config.oidc_skip_verification # type: bool
- self._allow_existing_users = hs.config.oidc_allow_existing_users # type: bool
+ self._provider_needs_discovery = provider.discover
+ self._user_mapping_provider = provider.user_mapping_provider_class(
+ provider.user_mapping_provider_config
+ )
+ self._skip_verification = provider.skip_verification
+ self._allow_existing_users = provider.allow_existing_users
self._http_client = hs.get_proxied_http_client()
self._server_name = hs.config.server_name # type: str
- self._macaroon_secret_key = hs.config.macaroon_secret_key
# identifier for the external_ids table
- self.idp_id = "oidc"
+ self.idp_id = provider.idp_id
# user-facing name of this auth provider
- self.idp_name = "OIDC"
+ self.idp_name = provider.idp_name
+
+ # MXC URI for icon for this auth provider
+ self.idp_icon = provider.idp_icon
self._sso_handler = hs.get_sso_handler()
@@ -519,11 +669,14 @@ class OidcHandler(BaseHandler):
if not client_redirect_url:
client_redirect_url = b""
- cookie = self._generate_oidc_session_token(
+ cookie = self._token_generator.generate_oidc_session_token(
state=state,
- nonce=nonce,
- client_redirect_url=client_redirect_url.decode(),
- ui_auth_session_id=ui_auth_session_id,
+ session_data=OidcSessionData(
+ idp_id=self.idp_id,
+ nonce=nonce,
+ client_redirect_url=client_redirect_url.decode(),
+ ui_auth_session_id=ui_auth_session_id,
+ ),
)
request.addCookie(
SESSION_COOKIE_NAME,
@@ -546,22 +699,16 @@ class OidcHandler(BaseHandler):
nonce=nonce,
)
- async def handle_oidc_callback(self, request: SynapseRequest) -> None:
+ async def handle_oidc_callback(
+ self, request: SynapseRequest, session_data: "OidcSessionData", code: str
+ ) -> None:
"""Handle an incoming request to /_synapse/oidc/callback
- Since we might want to display OIDC-related errors in a user-friendly
- way, we don't raise SynapseError from here. Instead, we call
- ``self._sso_handler.render_error`` which displays an HTML page for the error.
+ By this time we have already validated the session on the synapse side, and
+ now need to do the provider-specific operations. This includes:
- Most of the OpenID Connect logic happens here:
-
- - first, we check if there was any error returned by the provider and
- display it
- - then we fetch the session cookie, decode and verify it
- - the ``state`` query parameter should match with the one stored in the
- session cookie
- - once we known this session is legit, exchange the code with the
- provider using the ``token_endpoint`` (see ``_exchange_code``)
+ - exchange the code with the provider using the ``token_endpoint`` (see
+ ``_exchange_code``)
- once we have the token, use it to either extract the UserInfo from
the ``id_token`` (``_parse_id_token``), or use the ``access_token``
to fetch UserInfo from the ``userinfo_endpoint``
@@ -571,88 +718,12 @@ class OidcHandler(BaseHandler):
Args:
request: the incoming request from the browser.
+ session_data: the session data, extracted from our cookie
+ code: The authorization code we got from the callback.
"""
-
- # The provider might redirect with an error.
- # In that case, just display it as-is.
- if b"error" in request.args:
- # error response from the auth server. see:
- # https://tools.ietf.org/html/rfc6749#section-4.1.2.1
- # https://openid.net/specs/openid-connect-core-1_0.html#AuthError
- error = request.args[b"error"][0].decode()
- description = request.args.get(b"error_description", [b""])[0].decode()
-
- # Most of the errors returned by the provider could be due by
- # either the provider misbehaving or Synapse being misconfigured.
- # The only exception of that is "access_denied", where the user
- # probably cancelled the login flow. In other cases, log those errors.
- if error != "access_denied":
- logger.error("Error from the OIDC provider: %s %s", error, description)
-
- self._sso_handler.render_error(request, error, description)
- return
-
- # otherwise, it is presumably a successful response. see:
- # https://tools.ietf.org/html/rfc6749#section-4.1.2
-
- # Fetch the session cookie
- session = request.getCookie(SESSION_COOKIE_NAME) # type: Optional[bytes]
- if session is None:
- logger.info("No session cookie found")
- self._sso_handler.render_error(
- request, "missing_session", "No session cookie found"
- )
- return
-
- # Remove the cookie. There is a good chance that if the callback failed
- # once, it will fail next time and the code will already be exchanged.
- # Removing it early avoids spamming the provider with token requests.
- request.addCookie(
- SESSION_COOKIE_NAME,
- b"",
- path="/_synapse/oidc",
- expires="Thu, Jan 01 1970 00:00:00 UTC",
- httpOnly=True,
- sameSite="lax",
- )
-
- # Check for the state query parameter
- if b"state" not in request.args:
- logger.info("State parameter is missing")
- self._sso_handler.render_error(
- request, "invalid_request", "State parameter is missing"
- )
- return
-
- state = request.args[b"state"][0].decode()
-
- # Deserialize the session token and verify it.
- try:
- (
- nonce,
- client_redirect_url,
- ui_auth_session_id,
- ) = self._verify_oidc_session_token(session, state)
- except MacaroonDeserializationException as e:
- logger.exception("Invalid session")
- self._sso_handler.render_error(request, "invalid_session", str(e))
- return
- except MacaroonInvalidSignatureException as e:
- logger.exception("Could not verify session")
- self._sso_handler.render_error(request, "mismatching_session", str(e))
- return
-
# Exchange the code with the provider
- if b"code" not in request.args:
- logger.info("Code parameter is missing")
- self._sso_handler.render_error(
- request, "invalid_request", "Code parameter is missing"
- )
- return
-
- logger.debug("Exchanging code")
- code = request.args[b"code"][0].decode()
try:
+ logger.debug("Exchanging code")
token = await self._exchange_code(code)
except OidcError as e:
logger.exception("Could not exchange code")
@@ -674,14 +745,14 @@ class OidcHandler(BaseHandler):
else:
logger.debug("Extracting userinfo from id_token")
try:
- userinfo = await self._parse_id_token(token, nonce=nonce)
+ userinfo = await self._parse_id_token(token, nonce=session_data.nonce)
except Exception as e:
logger.exception("Invalid id_token")
self._sso_handler.render_error(request, "invalid_token", str(e))
return
# first check if we're doing a UIA
- if ui_auth_session_id:
+ if session_data.ui_auth_session_id:
try:
remote_user_id = self._remote_id_from_userinfo(userinfo)
except Exception as e:
@@ -690,7 +761,7 @@ class OidcHandler(BaseHandler):
return
return await self._sso_handler.complete_sso_ui_auth_request(
- self.idp_id, remote_user_id, ui_auth_session_id, request
+ self.idp_id, remote_user_id, session_data.ui_auth_session_id, request
)
# otherwise, it's a login
@@ -698,133 +769,12 @@ class OidcHandler(BaseHandler):
# Call the mapper to register/login the user
try:
await self._complete_oidc_login(
- userinfo, token, request, client_redirect_url
+ userinfo, token, request, session_data.client_redirect_url
)
except MappingException as e:
logger.exception("Could not map user")
self._sso_handler.render_error(request, "mapping_error", str(e))
- def _generate_oidc_session_token(
- self,
- state: str,
- nonce: str,
- client_redirect_url: str,
- ui_auth_session_id: Optional[str],
- duration_in_ms: int = (60 * 60 * 1000),
- ) -> str:
- """Generates a signed token storing data about an OIDC session.
-
- When Synapse initiates an authorization flow, it creates a random state
- and a random nonce. Those parameters are given to the provider and
- should be verified when the client comes back from the provider.
- It is also used to store the client_redirect_url, which is used to
- complete the SSO login flow.
-
- Args:
- state: The ``state`` parameter passed to the OIDC provider.
- nonce: The ``nonce`` parameter passed to the OIDC provider.
- client_redirect_url: The URL the client gave when it initiated the
- flow.
- ui_auth_session_id: The session ID of the ongoing UI Auth (or
- None if this is a login).
- duration_in_ms: An optional duration for the token in milliseconds.
- Defaults to an hour.
-
- Returns:
- A signed macaroon token with the session information.
- """
- macaroon = pymacaroons.Macaroon(
- location=self._server_name, identifier="key", key=self._macaroon_secret_key,
- )
- macaroon.add_first_party_caveat("gen = 1")
- macaroon.add_first_party_caveat("type = session")
- macaroon.add_first_party_caveat("state = %s" % (state,))
- macaroon.add_first_party_caveat("nonce = %s" % (nonce,))
- macaroon.add_first_party_caveat(
- "client_redirect_url = %s" % (client_redirect_url,)
- )
- if ui_auth_session_id:
- macaroon.add_first_party_caveat(
- "ui_auth_session_id = %s" % (ui_auth_session_id,)
- )
- now = self.clock.time_msec()
- expiry = now + duration_in_ms
- macaroon.add_first_party_caveat("time < %d" % (expiry,))
-
- return macaroon.serialize()
-
- def _verify_oidc_session_token(
- self, session: bytes, state: str
- ) -> Tuple[str, str, Optional[str]]:
- """Verifies and extract an OIDC session token.
-
- This verifies that a given session token was issued by this homeserver
- and extract the nonce and client_redirect_url caveats.
-
- Args:
- session: The session token to verify
- state: The state the OIDC provider gave back
-
- Returns:
- The nonce, client_redirect_url, and ui_auth_session_id for this session
- """
- macaroon = pymacaroons.Macaroon.deserialize(session)
-
- v = pymacaroons.Verifier()
- v.satisfy_exact("gen = 1")
- v.satisfy_exact("type = session")
- v.satisfy_exact("state = %s" % (state,))
- v.satisfy_general(lambda c: c.startswith("nonce = "))
- v.satisfy_general(lambda c: c.startswith("client_redirect_url = "))
- # Sometimes there's a UI auth session ID, it seems to be OK to attempt
- # to always satisfy this.
- v.satisfy_general(lambda c: c.startswith("ui_auth_session_id = "))
- v.satisfy_general(self._verify_expiry)
-
- v.verify(macaroon, self._macaroon_secret_key)
-
- # Extract the `nonce`, `client_redirect_url`, and maybe the
- # `ui_auth_session_id` from the token.
- nonce = self._get_value_from_macaroon(macaroon, "nonce")
- client_redirect_url = self._get_value_from_macaroon(
- macaroon, "client_redirect_url"
- )
- try:
- ui_auth_session_id = self._get_value_from_macaroon(
- macaroon, "ui_auth_session_id"
- ) # type: Optional[str]
- except ValueError:
- ui_auth_session_id = None
-
- return nonce, client_redirect_url, ui_auth_session_id
-
- def _get_value_from_macaroon(self, macaroon: pymacaroons.Macaroon, key: str) -> str:
- """Extracts a caveat value from a macaroon token.
-
- Args:
- macaroon: the token
- key: the key of the caveat to extract
-
- Returns:
- The extracted value
-
- Raises:
- Exception: if the caveat was not in the macaroon
- """
- prefix = key + " = "
- for caveat in macaroon.caveats:
- if caveat.caveat_id.startswith(prefix):
- return caveat.caveat_id[len(prefix) :]
- raise ValueError("No %s caveat in macaroon" % (key,))
-
- def _verify_expiry(self, caveat: str) -> bool:
- prefix = "time < "
- if not caveat.startswith(prefix):
- return False
- expiry = int(caveat[len(prefix) :])
- now = self.clock.time_msec()
- return now < expiry
-
async def _complete_oidc_login(
self,
userinfo: UserInfo,
@@ -901,8 +851,8 @@ class OidcHandler(BaseHandler):
# and attempt to match it.
attributes = await oidc_response_to_user_attributes(failures=0)
- user_id = UserID(attributes.localpart, self.server_name).to_string()
- users = await self.store.get_users_by_id_case_insensitive(user_id)
+ user_id = UserID(attributes.localpart, self._server_name).to_string()
+ users = await self._store.get_users_by_id_case_insensitive(user_id)
if users:
# If an existing matrix ID is returned, then use it.
if len(users) == 1:
@@ -954,6 +904,157 @@ class OidcHandler(BaseHandler):
return str(remote_user_id)
+class OidcSessionTokenGenerator:
+ """Methods for generating and checking OIDC Session cookies."""
+
+ def __init__(self, hs: "HomeServer"):
+ self._clock = hs.get_clock()
+ self._server_name = hs.hostname
+ self._macaroon_secret_key = hs.config.key.macaroon_secret_key
+
+ def generate_oidc_session_token(
+ self,
+ state: str,
+ session_data: "OidcSessionData",
+ duration_in_ms: int = (60 * 60 * 1000),
+ ) -> str:
+ """Generates a signed token storing data about an OIDC session.
+
+ When Synapse initiates an authorization flow, it creates a random state
+ and a random nonce. Those parameters are given to the provider and
+ should be verified when the client comes back from the provider.
+ It is also used to store the client_redirect_url, which is used to
+ complete the SSO login flow.
+
+ Args:
+ state: The ``state`` parameter passed to the OIDC provider.
+ session_data: data to include in the session token.
+ duration_in_ms: An optional duration for the token in milliseconds.
+ Defaults to an hour.
+
+ Returns:
+ A signed macaroon token with the session information.
+ """
+ macaroon = pymacaroons.Macaroon(
+ location=self._server_name, identifier="key", key=self._macaroon_secret_key,
+ )
+ macaroon.add_first_party_caveat("gen = 1")
+ macaroon.add_first_party_caveat("type = session")
+ macaroon.add_first_party_caveat("state = %s" % (state,))
+ macaroon.add_first_party_caveat("idp_id = %s" % (session_data.idp_id,))
+ macaroon.add_first_party_caveat("nonce = %s" % (session_data.nonce,))
+ macaroon.add_first_party_caveat(
+ "client_redirect_url = %s" % (session_data.client_redirect_url,)
+ )
+ if session_data.ui_auth_session_id:
+ macaroon.add_first_party_caveat(
+ "ui_auth_session_id = %s" % (session_data.ui_auth_session_id,)
+ )
+ now = self._clock.time_msec()
+ expiry = now + duration_in_ms
+ macaroon.add_first_party_caveat("time < %d" % (expiry,))
+
+ return macaroon.serialize()
+
+ def verify_oidc_session_token(
+ self, session: bytes, state: str
+ ) -> "OidcSessionData":
+ """Verifies and extract an OIDC session token.
+
+ This verifies that a given session token was issued by this homeserver
+ and extract the nonce and client_redirect_url caveats.
+
+ Args:
+ session: The session token to verify
+ state: The state the OIDC provider gave back
+
+ Returns:
+ The data extracted from the session cookie
+
+ Raises:
+ ValueError if an expected caveat is missing from the macaroon.
+ """
+ macaroon = pymacaroons.Macaroon.deserialize(session)
+
+ v = pymacaroons.Verifier()
+ v.satisfy_exact("gen = 1")
+ v.satisfy_exact("type = session")
+ v.satisfy_exact("state = %s" % (state,))
+ v.satisfy_general(lambda c: c.startswith("nonce = "))
+ v.satisfy_general(lambda c: c.startswith("idp_id = "))
+ v.satisfy_general(lambda c: c.startswith("client_redirect_url = "))
+ # Sometimes there's a UI auth session ID, it seems to be OK to attempt
+ # to always satisfy this.
+ v.satisfy_general(lambda c: c.startswith("ui_auth_session_id = "))
+ v.satisfy_general(self._verify_expiry)
+
+ v.verify(macaroon, self._macaroon_secret_key)
+
+ # Extract the session data from the token.
+ nonce = self._get_value_from_macaroon(macaroon, "nonce")
+ idp_id = self._get_value_from_macaroon(macaroon, "idp_id")
+ client_redirect_url = self._get_value_from_macaroon(
+ macaroon, "client_redirect_url"
+ )
+ try:
+ ui_auth_session_id = self._get_value_from_macaroon(
+ macaroon, "ui_auth_session_id"
+ ) # type: Optional[str]
+ except ValueError:
+ ui_auth_session_id = None
+
+ return OidcSessionData(
+ nonce=nonce,
+ idp_id=idp_id,
+ client_redirect_url=client_redirect_url,
+ ui_auth_session_id=ui_auth_session_id,
+ )
+
+ def _get_value_from_macaroon(self, macaroon: pymacaroons.Macaroon, key: str) -> str:
+ """Extracts a caveat value from a macaroon token.
+
+ Args:
+ macaroon: the token
+ key: the key of the caveat to extract
+
+ Returns:
+ The extracted value
+
+ Raises:
+ ValueError: if the caveat was not in the macaroon
+ """
+ prefix = key + " = "
+ for caveat in macaroon.caveats:
+ if caveat.caveat_id.startswith(prefix):
+ return caveat.caveat_id[len(prefix) :]
+ raise ValueError("No %s caveat in macaroon" % (key,))
+
+ def _verify_expiry(self, caveat: str) -> bool:
+ prefix = "time < "
+ if not caveat.startswith(prefix):
+ return False
+ expiry = int(caveat[len(prefix) :])
+ now = self._clock.time_msec()
+ return now < expiry
+
+
+@attr.s(frozen=True, slots=True)
+class OidcSessionData:
+ """The attributes which are stored in a OIDC session cookie"""
+
+ # the Identity Provider being used
+ idp_id = attr.ib(type=str)
+
+ # The `nonce` parameter passed to the OIDC provider.
+ nonce = attr.ib(type=str)
+
+ # The URL the client gave when it initiated the flow. ("" if this is a UI Auth)
+ client_redirect_url = attr.ib(type=str)
+
+ # The session ID of the ongoing UI Auth (None if this is a login)
+ ui_auth_session_id = attr.ib(type=Optional[str], default=None)
+
+
UserAttributeDict = TypedDict(
"UserAttributeDict", {"localpart": Optional[str], "display_name": Optional[str]}
)
|