11"""Manage authentication flow for FastAPI endpoints with K8S/OCP."""
22
33import os
4- from pathlib import Path
5- from typing import Optional , Self
4+ from typing import Optional , Self , cast
65
76import kubernetes .client
87from fastapi import HTTPException , Request
@@ -80,19 +79,19 @@ def __new__(cls: type[Self]) -> Self:
8079 ce ,
8180 )
8281
83- k8s_config .host = (
84- configuration .authentication_configuration .k8s_cluster_api
85- or k8s_config .host
86- )
82+ k8s_api_url = configuration .authentication_configuration .k8s_cluster_api
83+ if k8s_api_url is not None :
84+ k8s_config .host = str (k8s_api_url )
8785 k8s_config .verify_ssl = (
8886 not configuration .authentication_configuration .skip_tls_verification
8987 )
90- k8s_config . ssl_ca_cert = (
88+ ca_cert_path = (
9189 configuration .authentication_configuration .k8s_ca_cert_path
92- if configuration .authentication_configuration .k8s_ca_cert_path
93- not in {None , Path ()}
94- else k8s_config .ssl_ca_cert
9590 )
91+ if ca_cert_path is not None :
92+ # Kubernetes client library has incomplete type stubs for ssl_ca_cert
93+ k8s_config .ssl_ca_cert = str (ca_cert_path ) # type: ignore[assignment]
94+ # else keep the default k8s_config.ssl_ca_cert
9695 api_client = kubernetes .client .ApiClient (k8s_config )
9796 cls ._api_client = api_client
9897 cls ._custom_objects_api = kubernetes .client .CustomObjectsApi (api_client )
@@ -101,7 +100,8 @@ def __new__(cls: type[Self]) -> Self:
101100 except Exception as e :
102101 logger .info ("Failed to initialize Kubernetes client: %s" , e )
103102 raise
104- return cls ._instance
103+ # At this point _instance is guaranteed to be initialized
104+ return cast (Self , cls ._instance )
105105
106106 @classmethod
107107 def get_authn_api (cls ) -> kubernetes .client .AuthenticationV1Api :
@@ -161,22 +161,30 @@ def _get_cluster_id(cls) -> str:
161161 """
162162 try :
163163 custom_objects_api = cls .get_custom_objects_api ()
164- version_data = custom_objects_api .get_cluster_custom_object (
165- "config.openshift.io" , "v1" , "clusterversions" , "version"
164+ # Kubernetes API always returns dict for custom objects
165+ version_data = cast (
166+ dict ,
167+ custom_objects_api .get_cluster_custom_object (
168+ "config.openshift.io" , "v1" , "clusterversions" , "version"
169+ ),
166170 )
167- cluster_id = version_data ["spec" ]["clusterID" ]
171+ spec = version_data .get ("spec" )
172+ if not isinstance (spec , dict ):
173+ raise ClusterIDUnavailableError (
174+ "Missing or invalid 'spec' in ClusterVersion"
175+ )
176+ cluster_id = spec .get ("clusterID" )
177+ if not isinstance (cluster_id , str ) or not cluster_id .strip ():
178+ raise ClusterIDUnavailableError (
179+ "Missing or invalid 'clusterID' in ClusterVersion"
180+ )
168181 cls ._cluster_id = cluster_id
169182 return cluster_id
170183 except KeyError as e :
171184 logger .error (
172185 "Failed to get cluster_id from cluster, missing keys in version object"
173186 )
174187 raise ClusterIDUnavailableError ("Failed to get cluster ID" ) from e
175- except TypeError as e :
176- logger .error (
177- "Failed to get cluster_id, version object is: %s" , version_data
178- )
179- raise ClusterIDUnavailableError ("Failed to get cluster ID" ) from e
180188 except ApiException as e :
181189 logger .error ("API exception during ClusterInfo: %s" , e )
182190 raise ClusterIDUnavailableError ("Failed to get cluster ID" ) from e
@@ -212,14 +220,14 @@ def get_cluster_id(cls) -> str:
212220 return cls ._cluster_id
213221
214222
215- def get_user_info (token : str ) -> Optional [kubernetes .client .V1TokenReview ]:
223+ def get_user_info (token : str ) -> Optional [kubernetes .client .V1TokenReviewStatus ]:
216224 """Perform a Kubernetes TokenReview to validate a given token.
217225
218226 Parameters:
219227 token: The bearer token to be validated.
220228
221229 Returns:
222- The user information if the token is valid, None otherwise.
230+ The V1TokenReviewStatus if the token is valid, None otherwise.
223231
224232 Raises:
225233 HTTPException: If unable to connect to Kubernetes API or unexpected error occurs.
@@ -238,9 +246,13 @@ def get_user_info(token: str) -> Optional[kubernetes.client.V1TokenReview]:
238246 spec = kubernetes .client .V1TokenReviewSpec (token = token )
239247 )
240248 try :
241- response = auth_api .create_token_review (token_review )
242- if response .status .authenticated :
243- return response .status
249+ response = cast (
250+ kubernetes .client .V1TokenReview ,
251+ auth_api .create_token_review (token_review ),
252+ )
253+ status = response .status
254+ if status is not None and status .authenticated :
255+ return status
244256 return None
245257 except Exception as e : # pylint: disable=broad-exception-caught
246258 logger .error ("API exception during TokenReview: %s" , e )
@@ -307,9 +319,12 @@ async def __call__(self, request: Request) -> tuple[str, str, bool, str]:
307319 response = UnauthorizedResponse (cause = "Invalid or expired Kubernetes token" )
308320 raise HTTPException (** response .model_dump ())
309321
310- if user_info .user .username == "kube:admin" :
322+ # Cast user to proper type for type checking
323+ user = cast (kubernetes .client .V1UserInfo , user_info .user )
324+
325+ if user .username == "kube:admin" :
311326 try :
312- user_info . user .uid = K8sClientSingleton .get_cluster_id ()
327+ user .uid = K8sClientSingleton .get_cluster_id ()
313328 except ClusterIDUnavailableError as e :
314329 logger .error ("Failed to get cluster ID: %s" , e )
315330 response = InternalServerErrorResponse (
@@ -322,14 +337,17 @@ async def __call__(self, request: Request) -> tuple[str, str, bool, str]:
322337 authorization_api = K8sClientSingleton .get_authz_api ()
323338 sar = kubernetes .client .V1SubjectAccessReview (
324339 spec = kubernetes .client .V1SubjectAccessReviewSpec (
325- user = user_info . user .username ,
326- groups = user_info . user .groups ,
340+ user = user .username ,
341+ groups = user .groups ,
327342 non_resource_attributes = kubernetes .client .V1NonResourceAttributes (
328343 path = self .virtual_path , verb = "get"
329344 ),
330345 )
331346 )
332- response = authorization_api .create_subject_access_review (sar )
347+ sar_response = cast (
348+ kubernetes .client .V1SubjectAccessReview ,
349+ authorization_api .create_subject_access_review (sar ),
350+ )
333351
334352 except Exception as e :
335353 logger .error ("API exception during SubjectAccessReview: %s" , e )
@@ -339,13 +357,19 @@ async def __call__(self, request: Request) -> tuple[str, str, bool, str]:
339357 )
340358 raise HTTPException (** response .model_dump ()) from e
341359
342- if not response .status .allowed :
343- response = ForbiddenResponse .endpoint (user_id = user_info .user .uid )
360+ sar_status = cast (
361+ kubernetes .client .V1SubjectAccessReviewStatus , sar_response .status
362+ )
363+ user_uid = cast (str , user .uid )
364+ username = cast (str , user .username )
365+
366+ if not sar_status .allowed :
367+ response = ForbiddenResponse .endpoint (user_id = user_uid )
344368 raise HTTPException (** response .model_dump ())
345369
346370 return (
347- user_info . user . uid ,
348- user_info . user . username ,
371+ user_uid ,
372+ username ,
349373 self .skip_userid_check ,
350374 token ,
351375 )
0 commit comments