22
33import os
44from pathlib import Path
5- from typing import Optional , Self
5+ from typing import Optional , Self , cast
66
77import kubernetes .client
88from fastapi import HTTPException , Request
@@ -80,19 +80,19 @@ def __new__(cls: type[Self]) -> Self:
8080 ce ,
8181 )
8282
83- k8s_config .host = (
84- configuration .authentication_configuration .k8s_cluster_api
85- or k8s_config .host
86- )
83+ k8s_api_url = configuration .authentication_configuration .k8s_cluster_api
84+ if k8s_api_url :
85+ k8s_config .host = str (k8s_api_url )
8786 k8s_config .verify_ssl = (
8887 not configuration .authentication_configuration .skip_tls_verification
8988 )
90- k8s_config . ssl_ca_cert = (
89+ ca_cert_path = (
9190 configuration .authentication_configuration .k8s_ca_cert_path
92- if configuration .authentication_configuration .k8s_ca_cert_path
93- not in {None , Path ()}
94- else k8s_config .ssl_ca_cert
9591 )
92+ if ca_cert_path and ca_cert_path != Path ():
93+ # Kubernetes client library has incomplete type stubs for ssl_ca_cert
94+ k8s_config .ssl_ca_cert = str (ca_cert_path ) # type: ignore[assignment]
95+ # else keep the default k8s_config.ssl_ca_cert
9696 api_client = kubernetes .client .ApiClient (k8s_config )
9797 cls ._api_client = api_client
9898 cls ._custom_objects_api = kubernetes .client .CustomObjectsApi (api_client )
@@ -101,7 +101,8 @@ def __new__(cls: type[Self]) -> Self:
101101 except Exception as e :
102102 logger .info ("Failed to initialize Kubernetes client: %s" , e )
103103 raise
104- return cls ._instance
104+ # At this point _instance is guaranteed to be initialized
105+ return cast (Self , cls ._instance )
105106
106107 @classmethod
107108 def get_authn_api (cls ) -> kubernetes .client .AuthenticationV1Api :
@@ -159,12 +160,28 @@ def _get_cluster_id(cls) -> str:
159160 ClusterIDUnavailableError: If the cluster ID cannot be obtained due
160161 to missing keys, an API error, or any unexpected error.
161162 """
163+ version_data = None
162164 try :
163165 custom_objects_api = cls .get_custom_objects_api ()
164166 version_data = custom_objects_api .get_cluster_custom_object (
165167 "config.openshift.io" , "v1" , "clusterversions" , "version"
166168 )
167- cluster_id = version_data ["spec" ]["clusterID" ]
169+ # Type validation: ensure we got a dict-like object
170+ if not isinstance (version_data , dict ):
171+ raise TypeError (
172+ f"Expected dict for version_data, got { type (version_data )} "
173+ )
174+ # Kubernetes client library returns untyped dict-like objects
175+ spec = version_data .get ("spec" )
176+ if not isinstance (spec , dict ):
177+ raise ClusterIDUnavailableError (
178+ "Missing or invalid 'spec' in ClusterVersion"
179+ )
180+ cluster_id = spec .get ("clusterID" )
181+ if not isinstance (cluster_id , str ) or not cluster_id .strip ():
182+ raise ClusterIDUnavailableError (
183+ "Missing or invalid 'clusterID' in ClusterVersion"
184+ )
168185 cls ._cluster_id = cluster_id
169186 return cluster_id
170187 except KeyError as e :
@@ -212,14 +229,14 @@ def get_cluster_id(cls) -> str:
212229 return cls ._cluster_id
213230
214231
215- def get_user_info (token : str ) -> Optional [kubernetes .client .V1TokenReview ]:
232+ def get_user_info (token : str ) -> Optional [kubernetes .client .V1TokenReviewStatus ]:
216233 """Perform a Kubernetes TokenReview to validate a given token.
217234
218235 Parameters:
219236 token: The bearer token to be validated.
220237
221238 Returns:
222- The user information if the token is valid, None otherwise.
239+ The V1TokenReviewStatus if the token is valid, None otherwise.
223240
224241 Raises:
225242 HTTPException: If unable to connect to Kubernetes API or unexpected error occurs.
@@ -239,8 +256,10 @@ def get_user_info(token: str) -> Optional[kubernetes.client.V1TokenReview]:
239256 )
240257 try :
241258 response = auth_api .create_token_review (token_review )
242- if response .status .authenticated :
243- return response .status
259+ # Kubernetes client library has incomplete type stubs
260+ status = response .status # type: ignore[union-attr]
261+ if status is not None and status .authenticated :
262+ return status
244263 return None
245264 except Exception as e : # pylint: disable=broad-exception-caught
246265 logger .error ("API exception during TokenReview: %s" , e )
@@ -307,9 +326,10 @@ async def __call__(self, request: Request) -> tuple[str, str, bool, str]:
307326 response = UnauthorizedResponse (cause = "Invalid or expired Kubernetes token" )
308327 raise HTTPException (** response .model_dump ())
309328
310- if user_info .user .username == "kube:admin" :
329+ # Kubernetes client library has incomplete type stubs
330+ if user_info .user .username == "kube:admin" : # type: ignore[union-attr]
311331 try :
312- user_info .user .uid = K8sClientSingleton .get_cluster_id ()
332+ user_info .user .uid = K8sClientSingleton .get_cluster_id () # type: ignore[union-attr]
313333 except ClusterIDUnavailableError as e :
314334 logger .error ("Failed to get cluster ID: %s" , e )
315335 response = InternalServerErrorResponse (
@@ -318,12 +338,22 @@ async def __call__(self, request: Request) -> tuple[str, str, bool, str]:
318338 )
319339 raise HTTPException (** response .model_dump ()) from e
320340
341+ # Validate that uid is present and is a string
342+ user_uid = user_info .user .uid # type: ignore[union-attr]
343+ if not isinstance (user_uid , str ) or not user_uid :
344+ logger .error ("Authenticated Kubernetes user is missing a UID" )
345+ response = InternalServerErrorResponse (
346+ response = "Internal server error" ,
347+ cause = "Authenticated Kubernetes user is missing a UID" ,
348+ )
349+ raise HTTPException (** response .model_dump ())
350+
321351 try :
322352 authorization_api = K8sClientSingleton .get_authz_api ()
323353 sar = kubernetes .client .V1SubjectAccessReview (
324354 spec = kubernetes .client .V1SubjectAccessReviewSpec (
325- user = user_info .user .username ,
326- groups = user_info .user .groups ,
355+ user = user_info .user .username , # type: ignore[union-attr]
356+ groups = user_info .user .groups , # type: ignore[union-attr]
327357 non_resource_attributes = kubernetes .client .V1NonResourceAttributes (
328358 path = self .virtual_path , verb = "get"
329359 ),
@@ -339,13 +369,14 @@ async def __call__(self, request: Request) -> tuple[str, str, bool, str]:
339369 )
340370 raise HTTPException (** response .model_dump ()) from e
341371
342- if not response .status .allowed :
343- response = ForbiddenResponse .endpoint (user_id = user_info .user .uid )
372+ # Kubernetes client library has incomplete type stubs
373+ if not response .status .allowed : # type: ignore[union-attr]
374+ response = ForbiddenResponse .endpoint (user_id = user_uid )
344375 raise HTTPException (** response .model_dump ())
345376
346377 return (
347- user_info . user . uid ,
348- user_info .user .username ,
378+ user_uid ,
379+ user_info .user .username , # type: ignore[union-attr]
349380 self .skip_userid_check ,
350381 token ,
351382 )
0 commit comments