11"""Manage authentication flow for FastAPI endpoints with K8S/OCP."""
22
33import os
4- from pathlib import Path
5- from typing import Optional , Self
4+ from typing import Optional , Self , cast
65
76import kubernetes .client
87from fastapi import HTTPException , Request
@@ -80,19 +79,19 @@ def __new__(cls: type[Self]) -> Self:
8079 ce ,
8180 )
8281
83- k8s_config .host = (
84- configuration .authentication_configuration .k8s_cluster_api
85- or k8s_config .host
86- )
82+ k8s_api_url = configuration .authentication_configuration .k8s_cluster_api
83+ if k8s_api_url :
84+ k8s_config .host = str (k8s_api_url )
8785 k8s_config .verify_ssl = (
8886 not configuration .authentication_configuration .skip_tls_verification
8987 )
90- k8s_config . ssl_ca_cert = (
88+ ca_cert_path = (
9189 configuration .authentication_configuration .k8s_ca_cert_path
92- if configuration .authentication_configuration .k8s_ca_cert_path
93- not in {None , Path ()}
94- else k8s_config .ssl_ca_cert
9590 )
91+ if ca_cert_path :
92+ # Kubernetes client library has incomplete type stubs for ssl_ca_cert
93+ k8s_config .ssl_ca_cert = str (ca_cert_path ) # type: ignore[assignment]
94+ # else keep the default k8s_config.ssl_ca_cert
9695 api_client = kubernetes .client .ApiClient (k8s_config )
9796 cls ._api_client = api_client
9897 cls ._custom_objects_api = kubernetes .client .CustomObjectsApi (api_client )
@@ -101,7 +100,8 @@ def __new__(cls: type[Self]) -> Self:
101100 except Exception as e :
102101 logger .info ("Failed to initialize Kubernetes client: %s" , e )
103102 raise
104- return cls ._instance
103+ # At this point _instance is guaranteed to be initialized
104+ return cast (Self , cls ._instance )
105105
106106 @classmethod
107107 def get_authn_api (cls ) -> kubernetes .client .AuthenticationV1Api :
@@ -161,22 +161,30 @@ def _get_cluster_id(cls) -> str:
161161 """
162162 try :
163163 custom_objects_api = cls .get_custom_objects_api ()
164- version_data = custom_objects_api .get_cluster_custom_object (
165- "config.openshift.io" , "v1" , "clusterversions" , "version"
164+ # Kubernetes API always returns dict for custom objects
165+ version_data = cast (
166+ dict ,
167+ custom_objects_api .get_cluster_custom_object (
168+ "config.openshift.io" , "v1" , "clusterversions" , "version"
169+ ),
166170 )
167- cluster_id = version_data ["spec" ]["clusterID" ]
171+ spec = version_data .get ("spec" )
172+ if not isinstance (spec , dict ):
173+ raise ClusterIDUnavailableError (
174+ "Missing or invalid 'spec' in ClusterVersion"
175+ )
176+ cluster_id = spec .get ("clusterID" )
177+ if not isinstance (cluster_id , str ) or not cluster_id .strip ():
178+ raise ClusterIDUnavailableError (
179+ "Missing or invalid 'clusterID' in ClusterVersion"
180+ )
168181 cls ._cluster_id = cluster_id
169182 return cluster_id
170183 except KeyError as e :
171184 logger .error (
172185 "Failed to get cluster_id from cluster, missing keys in version object"
173186 )
174187 raise ClusterIDUnavailableError ("Failed to get cluster ID" ) from e
175- except TypeError as e :
176- logger .error (
177- "Failed to get cluster_id, version object is: %s" , version_data
178- )
179- raise ClusterIDUnavailableError ("Failed to get cluster ID" ) from e
180188 except ApiException as e :
181189 logger .error ("API exception during ClusterInfo: %s" , e )
182190 raise ClusterIDUnavailableError ("Failed to get cluster ID" ) from e
@@ -212,14 +220,14 @@ def get_cluster_id(cls) -> str:
212220 return cls ._cluster_id
213221
214222
215- def get_user_info (token : str ) -> Optional [kubernetes .client .V1TokenReview ]:
223+ def get_user_info (token : str ) -> Optional [kubernetes .client .V1TokenReviewStatus ]:
216224 """Perform a Kubernetes TokenReview to validate a given token.
217225
218226 Parameters:
219227 token: The bearer token to be validated.
220228
221229 Returns:
222- The user information if the token is valid, None otherwise.
230+ The V1TokenReviewStatus if the token is valid, None otherwise.
223231
224232 Raises:
225233 HTTPException: If unable to connect to Kubernetes API or unexpected error occurs.
@@ -238,9 +246,13 @@ def get_user_info(token: str) -> Optional[kubernetes.client.V1TokenReview]:
238246 spec = kubernetes .client .V1TokenReviewSpec (token = token )
239247 )
240248 try :
241- response = auth_api .create_token_review (token_review )
242- if response .status .authenticated :
243- return response .status
249+ response = cast (
250+ kubernetes .client .V1TokenReview ,
251+ auth_api .create_token_review (token_review ),
252+ )
253+ status = response .status
254+ if status is not None and status .authenticated :
255+ return status
244256 return None
245257 except Exception as e : # pylint: disable=broad-exception-caught
246258 logger .error ("API exception during TokenReview: %s" , e )
@@ -307,9 +319,12 @@ async def __call__(self, request: Request) -> tuple[str, str, bool, str]:
307319 response = UnauthorizedResponse (cause = "Invalid or expired Kubernetes token" )
308320 raise HTTPException (** response .model_dump ())
309321
310- if user_info .user .username == "kube:admin" :
322+ # Cast user to proper type for type checking
323+ user = cast (kubernetes .client .V1UserInfo , user_info .user )
324+
325+ if user .username == "kube:admin" :
311326 try :
312- user_info . user .uid = K8sClientSingleton .get_cluster_id ()
327+ user .uid = K8sClientSingleton .get_cluster_id ()
313328 except ClusterIDUnavailableError as e :
314329 logger .error ("Failed to get cluster ID: %s" , e )
315330 response = InternalServerErrorResponse (
@@ -318,18 +333,31 @@ async def __call__(self, request: Request) -> tuple[str, str, bool, str]:
318333 )
319334 raise HTTPException (** response .model_dump ()) from e
320335
336+ # Validate that uid is present and is a string
337+ user_uid = user .uid
338+ if not isinstance (user_uid , str ) or not user_uid :
339+ logger .error ("Authenticated Kubernetes user is missing a UID" )
340+ response = InternalServerErrorResponse (
341+ response = "Internal server error" ,
342+ cause = "Authenticated Kubernetes user is missing a UID" ,
343+ )
344+ raise HTTPException (** response .model_dump ())
345+
321346 try :
322347 authorization_api = K8sClientSingleton .get_authz_api ()
323348 sar = kubernetes .client .V1SubjectAccessReview (
324349 spec = kubernetes .client .V1SubjectAccessReviewSpec (
325- user = user_info . user .username ,
326- groups = user_info . user .groups ,
350+ user = user .username ,
351+ groups = user .groups ,
327352 non_resource_attributes = kubernetes .client .V1NonResourceAttributes (
328353 path = self .virtual_path , verb = "get"
329354 ),
330355 )
331356 )
332- response = authorization_api .create_subject_access_review (sar )
357+ sar_response = cast (
358+ kubernetes .client .V1SubjectAccessReview ,
359+ authorization_api .create_subject_access_review (sar ),
360+ )
333361
334362 except Exception as e :
335363 logger .error ("API exception during SubjectAccessReview: %s" , e )
@@ -339,13 +367,17 @@ async def __call__(self, request: Request) -> tuple[str, str, bool, str]:
339367 )
340368 raise HTTPException (** response .model_dump ()) from e
341369
342- if not response .status .allowed :
343- response = ForbiddenResponse .endpoint (user_id = user_info .user .uid )
370+ sar_status = cast (
371+ kubernetes .client .V1SubjectAccessReviewStatus , sar_response .status
372+ )
373+ if not sar_status .allowed :
374+ response = ForbiddenResponse .endpoint (user_id = user_uid )
344375 raise HTTPException (** response .model_dump ())
345376
377+ username = cast (str , user .username )
346378 return (
347- user_info . user . uid ,
348- user_info . user . username ,
379+ user_uid ,
380+ username ,
349381 self .skip_userid_check ,
350382 token ,
351383 )
0 commit comments