Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions docs/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -11613,6 +11613,16 @@
"type": "null"
}
]
},
"trusted_proxy_config": {
"anyOf": [
{
"$ref": "#/components/schemas/TrustedProxyConfiguration"
},
{
"type": "null"
}
]
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
},
"additionalProperties": false,
Expand Down Expand Up @@ -19757,6 +19767,57 @@
}
]
},
"TrustedProxyConfiguration": {
"properties": {
"user_header": {
"type": "string",
"title": "User identity header",
"description": "HTTP header containing the forwarded user identity.",
"default": "X-Forwarded-User"
},
"allowed_service_accounts": {
"anyOf": [
{
"items": {
"$ref": "#/components/schemas/TrustedProxyServiceAccount"
},
"type": "array"
},
{
"type": "null"
}
],
"title": "Allowed service accounts",
"description": "Optional allowlist of Kubernetes ServiceAccount identities permitted to act as trusted proxies. When set to null/omitted, any ServiceAccount with a valid token is accepted. When set to a non-empty list, only the listed ServiceAccounts are allowed. An empty list behaves the same as null (no restriction)."
}
},
"additionalProperties": false,
"type": "object",
"title": "TrustedProxyConfiguration",
"description": "Configuration for trusted-proxy auth module."
},
"TrustedProxyServiceAccount": {
"properties": {
"namespace": {
"type": "string",
"title": "Namespace",
"description": "Kubernetes namespace of the ServiceAccount."
},
"name": {
"type": "string",
"title": "Name",
"description": "Name of the Kubernetes ServiceAccount."
}
},
"additionalProperties": false,
"type": "object",
"required": [
"namespace",
"name"
],
"title": "TrustedProxyServiceAccount",
"description": "A Kubernetes ServiceAccount identity for trusted-proxy allowlist."
},
"UnauthorizedResponse": {
"properties": {
"status_code": {
Expand Down
8 changes: 7 additions & 1 deletion src/authentication/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
noop,
noop_with_token,
rh_identity,
trusted_proxy,
)
from authentication.interface import AuthInterface
from configuration import LogicError, configuration
Expand All @@ -18,7 +19,7 @@
logger = get_logger(__name__)


def get_auth_dependency(
def get_auth_dependency( # pylint: disable=too-many-return-statements
virtual_path: str = constants.DEFAULT_VIRTUAL_PATH,
) -> AuthInterface:
"""Select the configured authentication dependency interface.
Expand Down Expand Up @@ -82,6 +83,11 @@ def get_auth_dependency(
config=configuration.authentication_configuration.api_key_configuration,
virtual_path=virtual_path,
)
case constants.AUTH_MOD_TRUSTED_PROXY:
return trusted_proxy.TrustedProxyAuthDependency(
config=configuration.authentication_configuration.trusted_proxy_configuration,
virtual_path=virtual_path,
)
case _:
err_msg = f"Unsupported authentication module '{module}'"
logger.error(err_msg)
Expand Down
127 changes: 127 additions & 0 deletions src/authentication/trusted_proxy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
"""Trusted-proxy authentication module for requests forwarded by a K8s proxy."""

from typing import cast

import kubernetes.client
from fastapi import HTTPException, Request

from authentication.interface import NO_AUTH_TUPLE, AuthInterface, AuthTuple
from authentication.k8s import get_user_info
from authentication.utils import extract_user_token
from configuration import configuration
from constants import DEFAULT_VIRTUAL_PATH, NO_USER_TOKEN
from log import get_logger
from models.api.responses.error import ForbiddenResponse, UnauthorizedResponse
from models.config import TrustedProxyConfiguration

logger = get_logger(__name__)


class TrustedProxyAuthDependency(
AuthInterface
): # pylint: disable=too-few-public-methods
"""FastAPI dependency for trusted-proxy authentication.

Validates that the caller is an expected Kubernetes ServiceAccount
via TokenReview, then extracts the end user's identity from a
configurable HTTP header set by the proxy.
"""

def __init__(
self,
config: TrustedProxyConfiguration,
virtual_path: str = DEFAULT_VIRTUAL_PATH,
) -> None:
"""Initialize the trusted-proxy authentication dependency.

Parameters:
----------
config: Trusted-proxy configuration with user header
and optional SA allowlist.
virtual_path: The request path used for authorization checks;
defaults to DEFAULT_VIRTUAL_PATH.
"""
self.config = config
self.virtual_path = virtual_path
self.skip_userid_check = True

async def __call__(self, request: Request) -> AuthTuple:
"""Validate the proxy's SA token and extract forwarded user identity.

Parameters:
----------
request: The FastAPI request object.

Returns:
-------
AuthTuple with the forwarded user identity.

Raises:
------
HTTPException: If authentication fails.
"""
if not request.headers.get("Authorization"):
if configuration.authentication_configuration.skip_for_health_probes:
if request.url.path in ("/readiness", "/liveness"):
return NO_AUTH_TUPLE
if configuration.authentication_configuration.skip_for_metrics:
if request.url.path == "/metrics":
return NO_AUTH_TUPLE
Comment thread
coderabbitai[bot] marked this conversation as resolved.
response = UnauthorizedResponse(cause="Missing Authorization header")
raise HTTPException(**response.model_dump())

token = extract_user_token(request.headers)
user_info = get_user_info(token)
Comment thread
coderabbitai[bot] marked this conversation as resolved.

if user_info is None:
response = UnauthorizedResponse(
cause="Invalid or expired proxy service account token"
)
raise HTTPException(**response.model_dump())

user = cast(kubernetes.client.V1UserInfo, user_info.user)
if not user or not hasattr(user, "username"):
response = UnauthorizedResponse(
cause="Invalid service account token: missing user information"
)
raise HTTPException(**response.model_dump())

sa_username = cast(str, user.username)
if not sa_username:
response = UnauthorizedResponse(
cause="Invalid service account token: missing username"
)
raise HTTPException(**response.model_dump())

Comment thread
coderabbitai[bot] marked this conversation as resolved.
if self.config.allowed_service_accounts:
allowed = {
f"system:serviceaccount:{sa.namespace}:{sa.name}"
for sa in self.config.allowed_service_accounts
}
if sa_username not in allowed:
logger.warning(
"Service account '%s' is not in the trusted-proxy allowlist",
sa_username,
)
response = ForbiddenResponse.endpoint(user_id=sa_username)
raise HTTPException(**response.model_dump())

forwarded_user = (request.headers.get(self.config.user_header) or "").strip()
if not forwarded_user:
response = UnauthorizedResponse(
cause=f"Missing required header '{self.config.user_header}'"
)
raise HTTPException(**response.model_dump())

logger.debug(
"Trusted-proxy auth: proxy='%s', forwarded_user_present=%s",
sa_username,
True,
)
Comment thread
coderabbitai[bot] marked this conversation as resolved.

return (
forwarded_user,
forwarded_user,
self.skip_userid_check,
NO_USER_TOKEN,
)
2 changes: 1 addition & 1 deletion src/authorization/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def get_authorization_resolvers() -> tuple[RolesResolver, AccessResolver]:
GenericAccessResolver(authorization_cfg.access_rules),
)

case constants.AUTH_MOD_RH_IDENTITY:
case constants.AUTH_MOD_RH_IDENTITY | constants.AUTH_MOD_TRUSTED_PROXY:
# rh-identity uses access rules for authorization, but doesn't extract
# roles from the identity header - all authenticated users get the "*" role
if len(authorization_cfg.access_rules) == 0:
Expand Down
2 changes: 2 additions & 0 deletions src/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@
AUTH_MOD_APIKEY_TOKEN: Final[str] = "api-key-token"
AUTH_MOD_JWK_TOKEN: Final[str] = "jwk-token"
AUTH_MOD_RH_IDENTITY: Final[str] = "rh-identity"
AUTH_MOD_TRUSTED_PROXY: Final[str] = "trusted-proxy"
# Supported authentication modules
SUPPORTED_AUTHENTICATION_MODULES: Final[frozenset[str]] = frozenset(
{
Expand All @@ -128,6 +129,7 @@
AUTH_MOD_JWK_TOKEN,
AUTH_MOD_APIKEY_TOKEN,
AUTH_MOD_RH_IDENTITY,
AUTH_MOD_TRUSTED_PROXY,
}
)
DEFAULT_AUTHENTICATION_MODULE: Final[str] = AUTH_MOD_NOOP
Expand Down
62 changes: 62 additions & 0 deletions src/models/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -1265,6 +1265,40 @@ class APIKeyTokenConfiguration(ConfigurationBase):
)


class TrustedProxyServiceAccount(ConfigurationBase):
"""A Kubernetes ServiceAccount identity for trusted-proxy allowlist."""

namespace: str = Field(
...,
title="Namespace",
description="Kubernetes namespace of the ServiceAccount.",
)
name: str = Field(
...,
title="Name",
description="Name of the Kubernetes ServiceAccount.",
)


class TrustedProxyConfiguration(ConfigurationBase):
"""Configuration for trusted-proxy auth module."""

user_header: str = Field(
"X-Forwarded-User",
title="User identity header",
description="HTTP header containing the forwarded user identity.",
)
allowed_service_accounts: Optional[list[TrustedProxyServiceAccount]] = Field(
None,
title="Allowed service accounts",
description="Optional allowlist of Kubernetes ServiceAccount identities "
"permitted to act as trusted proxies. "
"When set to null/omitted, any ServiceAccount with a valid token is accepted. "
"When set to a non-empty list, only the listed ServiceAccounts are allowed. "
"An empty list behaves the same as null (no restriction).",
)
Comment thread
coderabbitai[bot] marked this conversation as resolved.


class AuthenticationConfiguration(ConfigurationBase):
"""Authentication configuration."""

Expand All @@ -1287,6 +1321,7 @@ class AuthenticationConfiguration(ConfigurationBase):
jwk_config: Optional[JwkConfiguration] = None
api_key_config: Optional[APIKeyTokenConfiguration] = None
rh_identity_config: Optional[RHIdentityConfiguration] = None
trusted_proxy_config: Optional[TrustedProxyConfiguration] = None

@model_validator(mode="after")
def check_authentication_model(self) -> Self:
Expand Down Expand Up @@ -1335,6 +1370,13 @@ def check_authentication_model(self) -> Self:
"api_key parameter must be specified when using API_KEY token authentication"
)

if self.module == constants.AUTH_MOD_TRUSTED_PROXY:
if self.trusted_proxy_config is None:
raise ValueError(
"Trusted proxy configuration must be specified "
"when using trusted-proxy authentication"
)

return self

@property
Expand Down Expand Up @@ -1388,6 +1430,26 @@ def api_key_configuration(self) -> APIKeyTokenConfiguration:
raise ValueError("API Key configuration should not be None")
return self.api_key_config

@property
def trusted_proxy_configuration(self) -> TrustedProxyConfiguration:
"""Return trusted-proxy configuration if the module is trusted-proxy.

Returns:
TrustedProxyConfiguration: The configured trusted-proxy settings.

Raises:
ValueError: If the active authentication module is not trusted-proxy.
ValueError: If the trusted-proxy configuration is missing.
"""
if self.module != constants.AUTH_MOD_TRUSTED_PROXY:
raise ValueError(
"Trusted proxy configuration is only available "
"for trusted-proxy authentication module"
)
if self.trusted_proxy_config is None:
raise ValueError("Trusted proxy configuration should not be None")
return self.trusted_proxy_config


@dataclass
class CustomProfile:
Expand Down
27 changes: 25 additions & 2 deletions tests/unit/authentication/test_auth.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,20 @@
"""Unit tests for functions defined in authentication/__init__.py"""

from authentication import get_auth_dependency, k8s, noop, noop_with_token
from authentication import (
get_auth_dependency,
k8s,
noop,
noop_with_token,
trusted_proxy,
)
from configuration import configuration
from constants import AUTH_MOD_K8S, AUTH_MOD_NOOP, AUTH_MOD_NOOP_WITH_TOKEN
from constants import (
AUTH_MOD_K8S,
AUTH_MOD_NOOP,
AUTH_MOD_NOOP_WITH_TOKEN,
AUTH_MOD_TRUSTED_PROXY,
)
from models.config import TrustedProxyConfiguration


def test_get_auth_dependency_noop() -> None:
Expand All @@ -27,3 +39,14 @@ def test_get_auth_dependency_k8s() -> None:
configuration.authentication_configuration.module = AUTH_MOD_K8S
auth_dependency = get_auth_dependency()
assert isinstance(auth_dependency, k8s.K8SAuthDependency)


def test_get_auth_dependency_trusted_proxy() -> None:
"""Test getting trusted-proxy authentication dependency."""
assert configuration.authentication_configuration is not None
configuration.authentication_configuration.module = AUTH_MOD_TRUSTED_PROXY
configuration.authentication_configuration.trusted_proxy_config = (
TrustedProxyConfiguration()
)
auth_dependency = get_auth_dependency()
assert isinstance(auth_dependency, trusted_proxy.TrustedProxyAuthDependency)
Loading
Loading