11"""Red Hat Identity header authentication for FastAPI endpoints.
22
3- This module provides authentication via the x-rh-identity header, supporting both
4- User and System identity types with optional entitlement validation.
3+ This module provides authentication via the x-rh-identity header, supporting
4+ User, System, and ServiceAccount identity types with optional entitlement
5+ validation.
56"""
67
78import base64
@@ -36,9 +37,10 @@ def _get_request_id(request: Request) -> str:
3637class RHIdentityData :
3738 """Extracts and validates Red Hat Identity header data.
3839
39- Supports two identity types:
40+ Supports three identity types:
4041 - User: Console users with user_id, username, is_org_admin
4142 - System: Certificate-authenticated RHEL systems with cn as identifier
43+ - ServiceAccount: OAuth service accounts with client_id, username, user_id
4244 """
4345
4446 def __init__ (
@@ -82,6 +84,8 @@ def _validate_structure(self) -> None:
8284 self ._validate_user_fields (identity )
8385 elif identity_type == "System" :
8486 self ._validate_system_fields (identity )
87+ elif identity_type == "ServiceAccount" :
88+ self ._validate_service_account_fields (identity )
8589 else :
8690 logger .warning ("Identity validation failed: unsupported identity type" )
8791 raise HTTPException (status_code = 400 , detail = "Invalid identity data" )
@@ -153,6 +157,31 @@ def _validate_system_fields(self, identity: dict) -> None:
153157 if account_number is not None and account_number != "" :
154158 self ._validate_string_field ("account_number" , account_number )
155159
160+ def _validate_service_account_fields (self , identity : dict ) -> None :
161+ """Validate required fields for ServiceAccount identity type.
162+
163+ Args:
164+ identity: The identity dict containing service_account data
165+
166+ Raises:
167+ HTTPException: 400 if required ServiceAccount fields are missing or malformed
168+ """
169+ if "service_account" not in identity :
170+ logger .warning (
171+ "Identity validation failed: missing 'service_account' field "
172+ "for ServiceAccount type"
173+ )
174+ raise HTTPException (status_code = 400 , detail = "Invalid identity data" )
175+ service_account = identity ["service_account" ]
176+ for field in ("client_id" , "username" ):
177+ if field not in service_account :
178+ logger .warning (
179+ "Identity validation failed: missing '%s' in service_account data" ,
180+ field ,
181+ )
182+ raise HTTPException (status_code = 400 , detail = "Invalid identity data" )
183+ self ._validate_string_field (field , service_account [field ])
184+
156185 def _validate_string_field (
157186 self , field_name : str , value : Any , max_length : int = 256
158187 ) -> None :
@@ -194,7 +223,7 @@ def _validate_string_field(
194223 raise HTTPException (status_code = 400 , detail = "Invalid identity data" )
195224
196225 def _get_identity_type (self ) -> str :
197- """Get the identity type (User or System ).
226+ """Get the identity type (User, System, or ServiceAccount ).
198227
199228 Returns:
200229 Identity type string
@@ -205,12 +234,16 @@ def get_user_id(self) -> str:
205234 """Extract user ID based on identity type.
206235
207236 Returns:
208- User ID (user.user_id for User type, system.cn for System type)
237+ User ID (user.user_id for User type, system.cn for System type,
238+ service_account.client_id for ServiceAccount type)
209239 """
210240 identity = self .identity_data ["identity" ]
241+ identity_type = self ._get_identity_type ()
211242
212- if self . _get_identity_type () == "User" :
243+ if identity_type == "User" :
213244 return identity ["user" ]["user_id" ]
245+ if identity_type == "ServiceAccount" :
246+ return identity ["service_account" ]["client_id" ]
214247 return identity ["system" ]["cn" ]
215248
216249 def get_username (self ) -> str :
@@ -222,14 +255,19 @@ def get_username(self) -> str:
222255 a stable non-empty identifier in that case.
223256
224257 Returns:
225- Username (user.username for User type; account_number or system.cn
226- for System type)
258+ Username (user.username for User type;
259+ service_account.username for ServiceAccount type;
260+ account_number or system.cn for System type)
227261 """
228262 identity = self .identity_data ["identity" ]
263+ identity_type = self ._get_identity_type ()
229264
230- if self . _get_identity_type () == "User" :
265+ if identity_type == "User" :
231266 return identity ["user" ]["username" ]
232267
268+ if identity_type == "ServiceAccount" :
269+ return identity ["service_account" ]["username" ]
270+
233271 account_number = identity .get ("account_number" )
234272 if account_number :
235273 return account_number
@@ -302,7 +340,8 @@ class RHIdentityAuthDependency(AuthInterface): # pylint: disable=too-few-public
302340 """Red Hat Identity header authentication dependency for FastAPI.
303341
304342 Authenticates requests using the x-rh-identity header with base64-encoded JSON.
305- Supports both User and System identity types with optional entitlement validation.
343+ Supports User, System, and ServiceAccount identity types with optional
344+ entitlement validation.
306345 """
307346
308347 def __init__ (
0 commit comments