11"""Manage authentication flow for FastAPI endpoints with K8S/OCP."""
22
33import os
4- from pathlib import Path
5- from typing import Optional , Self
4+ from typing import Optional , Self , cast
65
76import kubernetes .client
87from fastapi import HTTPException , Request
@@ -80,19 +79,19 @@ def __new__(cls: type[Self]) -> Self:
8079 ce ,
8180 )
8281
83- k8s_config .host = (
84- configuration .authentication_configuration .k8s_cluster_api
85- or k8s_config .host
86- )
82+ k8s_api_url = configuration .authentication_configuration .k8s_cluster_api
83+ if k8s_api_url :
84+ k8s_config .host = str (k8s_api_url )
8785 k8s_config .verify_ssl = (
8886 not configuration .authentication_configuration .skip_tls_verification
8987 )
90- k8s_config . ssl_ca_cert = (
88+ ca_cert_path = (
9189 configuration .authentication_configuration .k8s_ca_cert_path
92- if configuration .authentication_configuration .k8s_ca_cert_path
93- not in {None , Path ()}
94- else k8s_config .ssl_ca_cert
9590 )
91+ if ca_cert_path :
92+ # Kubernetes client library has incomplete type stubs for ssl_ca_cert
93+ k8s_config .ssl_ca_cert = str (ca_cert_path ) # type: ignore[assignment]
94+ # else keep the default k8s_config.ssl_ca_cert
9695 api_client = kubernetes .client .ApiClient (k8s_config )
9796 cls ._api_client = api_client
9897 cls ._custom_objects_api = kubernetes .client .CustomObjectsApi (api_client )
@@ -101,7 +100,8 @@ def __new__(cls: type[Self]) -> Self:
101100 except Exception as e :
102101 logger .info ("Failed to initialize Kubernetes client: %s" , e )
103102 raise
104- return cls ._instance
103+ # At this point _instance is guaranteed to be initialized
104+ return cast (Self , cls ._instance )
105105
106106 @classmethod
107107 def get_authn_api (cls ) -> kubernetes .client .AuthenticationV1Api :
@@ -161,22 +161,30 @@ def _get_cluster_id(cls) -> str:
161161 """
162162 try :
163163 custom_objects_api = cls .get_custom_objects_api ()
164- version_data = custom_objects_api .get_cluster_custom_object (
165- "config.openshift.io" , "v1" , "clusterversions" , "version"
164+ # Kubernetes API always returns dict for custom objects
165+ version_data = cast (
166+ dict ,
167+ custom_objects_api .get_cluster_custom_object (
168+ "config.openshift.io" , "v1" , "clusterversions" , "version"
169+ ),
166170 )
167- cluster_id = version_data ["spec" ]["clusterID" ]
171+ spec = version_data .get ("spec" )
172+ if not isinstance (spec , dict ):
173+ raise ClusterIDUnavailableError (
174+ "Missing or invalid 'spec' in ClusterVersion"
175+ )
176+ cluster_id = spec .get ("clusterID" )
177+ if not isinstance (cluster_id , str ) or not cluster_id .strip ():
178+ raise ClusterIDUnavailableError (
179+ "Missing or invalid 'clusterID' in ClusterVersion"
180+ )
168181 cls ._cluster_id = cluster_id
169182 return cluster_id
170183 except KeyError as e :
171184 logger .error (
172185 "Failed to get cluster_id from cluster, missing keys in version object"
173186 )
174187 raise ClusterIDUnavailableError ("Failed to get cluster ID" ) from e
175- except TypeError as e :
176- logger .error (
177- "Failed to get cluster_id, version object is: %s" , version_data
178- )
179- raise ClusterIDUnavailableError ("Failed to get cluster ID" ) from e
180188 except ApiException as e :
181189 logger .error ("API exception during ClusterInfo: %s" , e )
182190 raise ClusterIDUnavailableError ("Failed to get cluster ID" ) from e
@@ -212,14 +220,14 @@ def get_cluster_id(cls) -> str:
212220 return cls ._cluster_id
213221
214222
215- def get_user_info (token : str ) -> Optional [kubernetes .client .V1TokenReview ]:
223+ def get_user_info (token : str ) -> Optional [kubernetes .client .V1TokenReviewStatus ]:
216224 """Perform a Kubernetes TokenReview to validate a given token.
217225
218226 Parameters:
219227 token: The bearer token to be validated.
220228
221229 Returns:
222- The user information if the token is valid, None otherwise.
230+ The V1TokenReviewStatus if the token is valid, None otherwise.
223231
224232 Raises:
225233 HTTPException: If unable to connect to Kubernetes API or unexpected error occurs.
@@ -239,8 +247,10 @@ def get_user_info(token: str) -> Optional[kubernetes.client.V1TokenReview]:
239247 )
240248 try :
241249 response = auth_api .create_token_review (token_review )
242- if response .status .authenticated :
243- return response .status
250+ # Kubernetes client library has incomplete type stubs
251+ status = response .status # type: ignore[union-attr]
252+ if status is not None and status .authenticated :
253+ return status
244254 return None
245255 except Exception as e : # pylint: disable=broad-exception-caught
246256 logger .error ("API exception during TokenReview: %s" , e )
@@ -307,9 +317,10 @@ async def __call__(self, request: Request) -> tuple[str, str, bool, str]:
307317 response = UnauthorizedResponse (cause = "Invalid or expired Kubernetes token" )
308318 raise HTTPException (** response .model_dump ())
309319
310- if user_info .user .username == "kube:admin" :
320+ # Kubernetes client library has incomplete type stubs
321+ if user_info .user .username == "kube:admin" : # type: ignore[union-attr]
311322 try :
312- user_info .user .uid = K8sClientSingleton .get_cluster_id ()
323+ user_info .user .uid = K8sClientSingleton .get_cluster_id () # type: ignore[union-attr]
313324 except ClusterIDUnavailableError as e :
314325 logger .error ("Failed to get cluster ID: %s" , e )
315326 response = InternalServerErrorResponse (
@@ -318,12 +329,22 @@ async def __call__(self, request: Request) -> tuple[str, str, bool, str]:
318329 )
319330 raise HTTPException (** response .model_dump ()) from e
320331
332+ # Validate that uid is present and is a string
333+ user_uid = user_info .user .uid # type: ignore[union-attr]
334+ if not isinstance (user_uid , str ) or not user_uid :
335+ logger .error ("Authenticated Kubernetes user is missing a UID" )
336+ response = InternalServerErrorResponse (
337+ response = "Internal server error" ,
338+ cause = "Authenticated Kubernetes user is missing a UID" ,
339+ )
340+ raise HTTPException (** response .model_dump ())
341+
321342 try :
322343 authorization_api = K8sClientSingleton .get_authz_api ()
323344 sar = kubernetes .client .V1SubjectAccessReview (
324345 spec = kubernetes .client .V1SubjectAccessReviewSpec (
325- user = user_info .user .username ,
326- groups = user_info .user .groups ,
346+ user = user_info .user .username , # type: ignore[union-attr]
347+ groups = user_info .user .groups , # type: ignore[union-attr]
327348 non_resource_attributes = kubernetes .client .V1NonResourceAttributes (
328349 path = self .virtual_path , verb = "get"
329350 ),
@@ -339,13 +360,14 @@ async def __call__(self, request: Request) -> tuple[str, str, bool, str]:
339360 )
340361 raise HTTPException (** response .model_dump ()) from e
341362
342- if not response .status .allowed :
343- response = ForbiddenResponse .endpoint (user_id = user_info .user .uid )
363+ # Kubernetes client library has incomplete type stubs
364+ if not response .status .allowed : # type: ignore[union-attr]
365+ response = ForbiddenResponse .endpoint (user_id = user_uid )
344366 raise HTTPException (** response .model_dump ())
345367
346368 return (
347- user_info . user . uid ,
348- user_info .user .username ,
369+ user_uid ,
370+ user_info .user .username , # type: ignore[union-attr]
349371 self .skip_userid_check ,
350372 token ,
351373 )
0 commit comments