Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion authentik/providers/saml/processors/authn_request_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ class AuthNRequest:

name_id_policy: str = SAML_NAME_ID_FORMAT_UNSPECIFIED

force_authn: bool = False


class AuthNRequestParser:
"""AuthNRequest Parser"""
Expand Down Expand Up @@ -71,7 +73,14 @@ def _parse_xml(self, decoded_xml: str | bytes, relay_state: str | None) -> AuthN
self.logger.warning(msg)
raise CannotHandleAssertion(msg)

auth_n_request = AuthNRequest(id=root.attrib["ID"], relay_state=relay_state)
# `ForceAuthn` is an optional attribute. When true, the SP requires the IdP to
# actively re-authenticate the user instead of relying on an existing session
# (SAML 2.0 core §3.4.1).
force_authn = root.attrib.get("ForceAuthn", "false").lower() == "true"

auth_n_request = AuthNRequest(
id=root.attrib["ID"], relay_state=relay_state, force_authn=force_authn
)

# Check if AuthnRequest has a NameID Policy object
name_id_policies = root.findall(f"{{{NS_SAML_PROTOCOL}}}NameIDPolicy")
Expand Down
14 changes: 14 additions & 0 deletions authentik/providers/saml/tests/test_auth_n_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -670,3 +670,17 @@ def test_idp_initiated(self):
request = AuthNRequestParser(self.provider).idp_initiated()
self.assertEqual(request.id, None)
self.assertEqual(request.relay_state, self.provider.default_relay_state)
self.assertFalse(request.force_authn)

def test_force_authn(self):
"""Test that ForceAuthn from the SP is parsed from the AuthNRequest"""
self.source.force_authn = True
self.source.save()
http_request = self.request_factory.get("/")

request_proc = RequestProcessor(self.source, http_request, "test_state")
request = request_proc.build_auth_n()
parsed_request = AuthNRequestParser(self.provider).parse(
b64encode(request.encode()).decode(), "test_state"
)
self.assertTrue(parsed_request.force_authn)
35 changes: 34 additions & 1 deletion authentik/providers/saml/views/sso.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from authentik.core.models import Application
from authentik.events.models import Event, EventAction
from authentik.events.signals import get_login_event
from authentik.flows.exceptions import FlowNonApplicableException
from authentik.flows.models import in_memory_stage
from authentik.flows.planner import PLAN_CONTEXT_APPLICATION, PLAN_CONTEXT_SSO, FlowPlanner
Expand All @@ -18,7 +19,10 @@
from authentik.policies.views import PolicyAccessView
from authentik.providers.saml.exceptions import CannotHandleAssertion
from authentik.providers.saml.models import SAMLBindings, SAMLProvider
from authentik.providers.saml.processors.authn_request_parser import AuthNRequestParser
from authentik.providers.saml.processors.authn_request_parser import (
AuthNRequest,
AuthNRequestParser,
)
from authentik.providers.saml.views.flows import (
PLAN_CONTEXT_SAML_AUTH_N_REQUEST,
REQUEST_KEY_RELAY_STATE,
Expand All @@ -34,6 +38,8 @@

LOGGER = get_logger()

SESSION_KEY_LAST_LOGIN_UID = "authentik/providers/saml/last_login_uid"


class SAMLSSOView(PolicyAccessView):
"""SAML SSO Base View, which plans a flow and injects our final stage.
Expand All @@ -60,6 +66,15 @@ def get(self, request: HttpRequest, application_slug: str) -> HttpResponse:
method_response = self.check_saml_request()
if method_response:
return method_response
# If the SP requested ForceAuthn, we must re-authenticate the user regardless of
# any existing session, mirroring the OIDC provider's prompt=login handling.
auth_n_request: AuthNRequest | None = self.plan_context.get(
PLAN_CONTEXT_SAML_AUTH_N_REQUEST
)
if auth_n_request and auth_n_request.force_authn:
reauth_response = self.check_force_authn(request)
if reauth_response:
return reauth_response
# Regardless, we start the planner and return to it
planner = FlowPlanner(self.provider.authorization_flow)
planner.allow_empty_flows = True
Expand Down Expand Up @@ -91,6 +106,24 @@ def post(self, request: HttpRequest, application_slug: str) -> HttpResponse:
override .dispatch easily because PolicyAccessView's dispatch"""
return self.get(request, application_slug)

def check_force_authn(self, request: HttpRequest) -> HttpResponse | None:
"""Trigger re-authentication when the AuthnRequest carries ForceAuthn. Returns a
redirect into the authentication flow if re-auth is still required, otherwise
None to let the SSO flow proceed."""
login_event = get_login_event(request)
login_uid = str(login_event.pk) if login_event else None
# Re-authenticate if it hasn't happened yet: either we have no record of a forced
# login for this session, or the session's login event still matches the one we
# saw before redirecting to the authentication flow. Once the user re-logs in, a
# new login event is created, the UID changes, and we stop forcing re-auth.
if (
SESSION_KEY_LAST_LOGIN_UID not in request.session
or login_uid == request.session[SESSION_KEY_LAST_LOGIN_UID]
):
request.session[SESSION_KEY_LAST_LOGIN_UID] = login_uid
return self.handle_no_permission()
return None


class SAMLSSOBindingRedirectView(SAMLSSOView):
"""SAML Handler for SSO/Redirect bindings, which are sent via GET"""
Expand Down
Loading